SparkSql系列(15/25) map操作

120次阅读
没有评论

基本概念

Spark map() and mapPartitions() 都是 spark 中的transformer操作,是日常开发中使用比较多的函数。

但是两者之间也存在着差别。

  • map() – Spark map() 执行的是对每一行做相应的转化操作。
  • mapPartitions() – 最终的效果是和 map 一样,它是将数据分成多个分区分别执行操作。

那为什么还是会存在两个?

之前在听隔壁组分享spark优化的时候也是会提到这两个函数,在日常的开发中会遇到这样的情况,比如数据写到 Redis里,如果你使用map,那么每一条数据都建立一个redis连接负责写数据,那么在大数据的情况下给redis的压力很大,connections会急速上升,如果这个redis是线上正在使用的说不定会影响线上的业务。此时mappartion就会缓解这个问题,我们可以在每一个partition里对这个分区的数据建立一个connection即可。

这也是spark代码优化的一点吧,后续也会提到foreachpartition

构建数据

  val structureData = Seq(
    Row("James","","Smith","36636","NewYork",3100),
    Row("Michael","Rose","","40288","California",4300),
    Row("Robert","","Williams","42114","Florida",1400),
    Row("Maria","Anne","Jones","39192","Florida",5500),
    Row("Jen","Mary","Brown","34561","NewYork",3000)
  )

  val structureSchema = new StructType()
    .add("firstname",StringType)
    .add("middlename",StringType)
    .add("lastname",StringType)
    .add("id",StringType)
    .add("location",StringType)
    .add("salary",IntegerType)

  val df2 = spark.createDataFrame(
    spark.sparkContext.parallelize(structureData),structureSchema)
  df2.printSchema()
  df2.show(false)

Yields below output

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+----------+------+
|firstname|middlename|lastname|id   |location  |salary|
+---------+----------+--------+-----+----------+------+
|James    |          |Smith   |36636|NewYork   |3100  |
|Michael  |Rose      |        |40288|California|4300  |
|Robert   |          |Williams|42114|Florida   |1400  |
|Maria    |Anne      |Jones   |39192|Florida   |5500  |
|Jen      |Mary      |Brown   |34561|NewYork   |3000  |
+---------+----------+--------+-----+----------+------+

构建一个combine函数,函数的功能就是把多个列的组合在一起拼成一个字符串。

class Util extends Serializable {
  def combine(fname:String,mname:String,lname:String):String = {
    fname+","+mname+","+lname
  }
}

Spark map() 操作

由于只是转化操作,所以操作的前后数据量大小一般情况下是不会变的,但是schema可能会变,你可以新增一列。

语法:

1) map[U](func : scala.Function1[T, U])(implicit evidence$6 : org.apache.spark.sql.Encoder[U]) 
        : org.apache.spark.sql.Dataset[U]
2) map[U](func : org.apache.spark.api.java.function.MapFunction[T, U], encoder : org.apache.spark.sql.Encoder[U]) 
        : org.apache.spark.sql.Dataset[U]

对 DataFrame执行map操作之后返回的并不是DataFrame,是 Dataset[Row],如果你想要变成DataFrame那么就得使用toDF的操作。

import spark.implicits._
val df3 = df2.map(row=>{
  // This initialization happens to every records
  // If it is heavy initilizations like Database connects
  // It degrates the performance 
  val util = new Util() 
  val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
             (fullName, row.getString(3),row.getInt(5))
})
val df3Map =  df3.toDF("fullName","id","salary")

df3Map.printSchema()
df3Map.show(false)

输出如下

root
 |-- fullName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- salary: integer (nullable = false)

+----------------+-----+------+
|fullName        |id   |salary|
+----------------+-----+------+
|James,,Smith    |36636|3100  |
|Michael,Rose,   |40288|4300  |
|Robert,,Williams|42114|1400  |
|Maria,Anne,Jones|39192|5500  |
|Jen,Mary,Brown  |34561|3000  |
+----------------+-----+------+

mapPartitions()

mapPartitions的优势在前面也提到了,也举例了 redis 连接的问题。

语法:

1) mapPartitions[U](func : scala.Function1[scala.Iterator[T], scala.Iterator[U]])(implicit evidence$7 : org.apache.spark.sql.Encoder[U]) 
        : org.apache.spark.sql.Dataset[U]
2) mapPartitions[U](f : org.apache.spark.api.java.function.MapPartitionsFunction[T, U], encoder : org.apache.spark.sql.Encoder[U]) 
        : org.apache.spark.sql.Dataset[U]
  val df4 = df2.mapPartitions(iterator => {
    // Do the heavy initialization here
    // Like database connections e.t.c
    val util = new Util()
    val res = iterator.map(row=>{
      val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
      (fullName, row.getString(3),row.getInt(5))
    })
    res
  })
  val df4part = df4.toDF("fullName","id","salary")
  df4part.printSchema()
  df4part.show(false)

完整的代码

import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{IntegerType, StringType, StructType,ArrayType,MapType}object MapTransformation extends App{  val spark:SparkSession = SparkSession.builder()    .master("local[5]")    .appName("SparkByExamples.com")    .getOrCreate()  val structureData = Seq(    Row("James","","Smith","36636","NewYork",3100),    Row("Michael","Rose","","40288","California",4300),    Row("Robert","","Williams","42114","Florida",1400),    Row("Maria","Anne","Jones","39192","Florida",5500),    Row("Jen","Mary","Brown","34561","NewYork",3000)  )  val structureSchema = new StructType()    .add("firstname",StringType)    .add("middlename",StringType)    .add("lastname",StringType)    .add("id",StringType)    .add("location",StringType)    .add("salary",IntegerType)  val df2 = spark.createDataFrame(    spark.sparkContext.parallelize(structureData),structureSchema)  df2.printSchema()  df2.show(false)  import spark.implicits._  val util = new Util()  val df3 = df2.map(row=>{    val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))    (fullName, row.getString(3),row.getInt(5))  })  val df3Map =  df3.toDF("fullName","id","salary")  df3Map.printSchema()  df3Map.show(false)  val df4 = df2.mapPartitions(iterator => {    val util = new Util()    val res = iterator.map(row=>{      val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))      (fullName, row.getString(3),row.getInt(5))    })    res  })  val df4part = df4.toDF("fullName","id","salary")  df4part.printSchema()  df4part.show(false)}
admin
版权声明:本站原创文章,由admin2021-09-05发表,共计5056字。
转载提示:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)