SparkSql系列(16/25) foreach操作

95次阅读
没有评论

简介

主要介绍两种遍历数据的方法 foreachforeachPartition,二者之间存在的差异跟之前介绍mapmapPartition相同,所以你如果有些写Database的操作,那么还是建议是foreachPartition。下面会介绍在 DataFrame 和 Rdd 上面的操作示例,这两个 API 基本上使用方法都差不多,区别在之前也描述过了。Rdd和DataFrame操作其实也是一样的。

DataFrame 使用 foreachPartition()

语法

foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit

示例

In this example, to make it simple we just print the DataFrame to console.

  // foreachPartition DataFrame
  val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
  df.foreachPartition(partition => {
    // 如果你的数据需要写database,在这里执行初始化
    partition.foreach(fun=>{
      // 处理完的数据插入到 database
    })
    // 此处你也可以执行 batch操作,那么就不需要在上面的foreach 里执行操作了
  })

DataFrame 使用 foreach()

  val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
  df.foreach(f=> {
    longAcc.add(f.getInt(1))
  })
  println("Accumulator value:"+longAcc.value)

RDD 使用 foreachPartition()

语法

foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit

示例

  // foreachPartition DataFrame
  val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
  rdd.foreachPartition(partition => {
    //Initialize any database connection
    partition.foreach(fun=>{
      //apply the function
    })
  })

RDD 使用 foreach()

  //rdd accumulator
  val rdd2 = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
  val longAcc2 = spark.sparkContext.longAccumulator("SumAccumulator2")
  rdd .foreach(f=> {
    longAcc2.add(f)
  })
  println("Accumulator value:"+longAcc2.value)
admin
版权声明:本站原创文章,由admin2021-09-07发表,共计1311字。
转载提示:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)