SparkSql系列(17/25) 持久化操作

356次阅读
没有评论

前言

数据持久化是spark 运行提速的有个小技巧。经常你会碰到有很多操作是依赖于相同的变量,如果这个变量没有做持久化处理,那么你的action触发的时候,它会根据DAG重新计算一遍,这样白白消耗了很多资源,也耽误计算的时间。那么对于这些可能会在后面重复使用的中间变量,我们可以对它进行持久化,规避不必要的重复计算,加速程序的运行。毕竟一般情况下使用spark处理的数据对象都是很庞大的。

持久化方法

spark 提供了两种方法 cache 和 persist,但是二者其实是共通的,看下API就知道了。

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
 */
def cache(): this.type = persist()

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

上面的代码里,cache竟然调用的就是persist

  val spark:SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  //read csv with options
  val df = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true"))
    .csv("src/main/resources/zipcodes.csv")

  val df2 = df.where(col("State") === "PR").cache()
  // 调用 persist也是可以的
    val df4 = df.where(col("State") === "PR").persist()
  df2.show(false)

  println(df2.count())

  val df3 = df2.where(col("Zipcode") === 704)

  println(df2.count())

取消持久化

  val dfPersist = dfPersist.unpersist()

Spark 持久化级别

每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象给 persist() 方法进行设置。 详细的存储级别介绍如下:

  • MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY : 只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP: 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。
admin
版权声明:本站原创文章,由admin2021-09-08发表,共计1643字。
转载提示:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)