SparkSql系列(6/25) collect 使用

107次阅读
没有评论

Spark collect()collectAsList() 是用于将 RDD/DataFrame/Dataset 的所有元素(从所有节点)汇合到Driver节点。 在较小的数据集上可以使用 collect(),但是通常操作在 filter()、group()、count() 等之后。这也是尽可能避免在在Driver节点上出现OOM。如果存在很大的数据就不要执行这个操作了。

  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
    Row(Row("Michael ","Rose",""),"40288","M",4000),
    Row(Row("Robert ","","Williams"),"42114","M",4000),
    Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
    Row(Row("Jen","Mary","Brown"),"","F",-1)
  )

  val schema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("id",StringType)
    .add("gender",StringType)
    .add("salary",IntegerType)

  val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
  df.printSchema()
  df.show(false)

printSchema 是打印DataFrame 的数据样式

show 从DataFrame中随机挑选部分数据展示

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------------------+-----+------+------+
|name                 |id   |gender|salary|
+---------------------+-----+------+------+
|[James , , Smith]    |36636|M     |3000  |
|[Michael , Rose, ]   |40288|M     |4000  |
|[Robert , , Williams]|42114|M     |4000  |
|[Maria , Anne, Jones]|39192|F     |4000  |
|[Jen, Mary, Brown]   |     |F     |-1    |
+---------------------+-----+------+------+

使用collect() 和 collectAsList()

collect() 返回的数据格式是Array[Row]

collectAsList() 返回的格式是 Java.util.list.

语法:

collect() : scala.Array[T]
collectAsList() : java.util.List[T]

collect() 示例

  val colList = df.collectAsList()
  val colData = df.collect()
  colData.foreach(row=>
  {
    val salary = row.getInt(3)//Index starts from zero
    println(salary)
  })

deptDF.collect() retrieves all elements in a DataFrame as an array to the driver. From the array, I’ve retried the firstName element and printed on the console.

3000
4000
4000
4000
-1

解析 Row

从 Row 的结构体中解析数据需要使用到 getStruct() 函数

  //Retrieving data from Struct column
  colData.foreach(row=>
  {
    val salary = row.getInt(3)
    val fullName:Row = row.getStruct(0) //Index starts from zero
    val firstName = fullName.getString(0)//In struct row, again index starts from zero
    val middleName = fullName.get(1).toString
    val lastName = fullName.getAs[String]("lastname")
    println(firstName+","+middleName+","+lastName+","+salary)
  })

getInt() 获取一个整形, getString() 获取一个 String 列, getAs[String]() 获取一个String 列表。

James ,,Smith,3000
Michael ,Rose,,4000
Robert ,,Williams,4000
Maria ,Anne,Jones,4000
Jen,Mary,Brown,-1

如果你直接调用 collect 是会返回所有的列数据,如果你只是想要其中某些列的数据,那么你可以先 select 这个列,然后再做上述的操作。

dataCollect = df.select("name").collect()

何时使用 Collect()

在前面也提到,执行这个操作,所有的数据会聚合到 Driver 节点上,数据量稍微大一点可能就会导致 OOM , 一般情况下就是小数据量可以执行这个操作,或者执行一个count聚合操作之后打印一些数据可以,其他情况暂时不要执行这个操作。

collect () vs select ()

select() 是转化操作, collect() 行动 , 解释起来有点别扭。

完整的例子

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object CollectExample extends App {

  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
    Row(Row("Michael ","Rose",""),"40288","M",4000),
    Row(Row("Robert ","","Williams"),"42114","M",4000),
    Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
    Row(Row("Jen","Mary","Brown"),"","F",-1)
  )

  val schema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("id",StringType)
    .add("gender",StringType)
    .add("salary",IntegerType)

  val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
  df.printSchema()
  df.show(false)

  val colData = df.collect()

  colData.foreach(row=>
  {
    val salary = row.getInt(3)//Index starts from zero
    println(salary)
  })

  //Retrieving data from Struct column
  colData.foreach(row=>
  {
    val salary = row.getInt(3)
    val fullName:Row = row.getStruct(0) //Index starts from zero
    val firstName = fullName.getString(0)//In struct row, again index starts from zero
    val middleName = fullName.get(1).toString
    val lastName = fullName.getAs[String]("lastname")
    println(firstName+","+middleName+","+lastName+","+salary)
  })
}
admin
版权声明:本站原创文章,由admin2021-08-23发表,共计3969字。
转载提示:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)