[TOC]
Spark RDD 持久化
RDD 持久化工作原理
Spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中. 当对 RDD 执行持久化操作时, 每个节点都会将自己操作的 RDD 的 partition 持久化到内存中, 并且在之后对该 RDD 的反复使用中, 直接使用内存缓存的 partition. 这样的话, 对于针对一个 RDD 反复执行多个操作的场景, 就只要对 RDD 计算一次即可, 后面直接使用该 RDD, 而不需要反复计算多次该 RDD.
巧妙使用 RDD 持久化, 甚至在某些场景下, 可以将 spark 应用程序的性能提升 10 倍. 对于迭代式算法和快速交互式应用来说, RDD 持久化, 是非常重要的.
要持久化一个 RDD, 只要调用其 cache() 或者 persist() 方法即可. 在该 RDD 第一次被计算出来时, 就会直接缓存在每个节点中. 而且 Spark 的持久化机制还是自动容错的, 如果持久化的 RDD 的任何 partition 丢失了, 那么 Spark 会自动通过其源 RDD, 使用 transformation 操作重新计算该 partition.
cache() 和 persist() 的区别在于, cache() 是 persist() 的一种简化方式, cache() 的底层就是调用的 persist() 的无参版本, 同时就是调用 persist(MEMORY_ONLY), 将数据持久化到内存中. 如果需要从内存中去除缓存, 那么可以使用 unpersist() 方法.
RDD 持久化使用场景
1, 第一次加载大量的数据到 RDD 中
2, 频繁的动态更新 RDD Cache 数据, 不适合使用 Spark Cache,Spark lineage
RDD 持久化策略
持久化策略的选择
? 默认情况下, 性能最高的当然是 MEMORY_ONLY, 但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个 RDD 的所有数据. 因为不进行序列化与反序列化操作, 就避免了这部分的性能开销; 对这个 RDD 的后续算子操作, 都是基于纯内存中的数据的操作, 不需要从磁盘文件中读取数据, 性能也很高; 而且不需要复制一份数据副本, 并远程传送到其他节点上. 但是这里必须要注意的是, 在实际的生产环境中, 恐怕能够直接用这种策略的场景还是有限的, 如果 RDD 中数据比较多时 (比如几十亿), 直接用这种持久化级别, 会导致 JVM 的 OOM 内存溢出异常.
? 如果使用 MEMORY_ONLY 级别时发生了内存溢出, 那么建议尝试使用 MEMORY_ONLY_SER 级别. 该级别会将 RDD 数据序列化后再保存在内存中, 此时每个 partition 仅仅是一个字节数组而已, 大大减少了对象数量, 并降低了内存占用. 这种级别比 MEMORY_ONLY 多出来的性能开销, 主要就是序列化与反序列化的开销. 但是后续算子可以基于纯内存进行操作, 因此性能总体还是比较高的. 此外, 可能发生的问题同上, 如果 RDD 中的数据量过多的话, 还是可能会导致 OOM 内存溢出的异常.
? 如果纯内存的级别都无法使用, 那么建议使用 MEMORY_AND_DISK_SER 策略, 而不是 MEMORY_AND_DISK 策略. 因为既然到了这一步, 就说明 RDD 的数据量很大, 内存无法完全放下. 序列化后的数据比较少, 可以节省内存和磁盘的空间开销. 同时该策略会优先尽量尝试将数据缓存在内存中, 内存缓存不下才会写入磁盘.
? 通常不建议使用 DISK_ONLY 和后缀为_2 的级别: 因为完全基于磁盘文件进行数据的读写, 会导致性能急剧降低, 有时还不如重新计算一次所有 RDD. 后缀为_2 的级别, 必须将所有数据都复制一份副本, 并发送到其他节点上, 数据复制以及网络传输会导致较大的性能开销, 除非是要求作业的高可用性, 否则不建议使用.
测试案例
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark RDD 的持久化
*/
- object _01SparkPersistOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
- val sc = new SparkContext(conf)
- Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
- Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
- var start = System.currentTimeMillis()
val linesRDD = sc.textFile("D:/data/spark/sequences.txt")
- // linesRDD.cache()
- // linesRDD.persist(StorageLevel.MEMORY_ONLY)
- // 执行第一次 RDD 的计算
- val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
- // retRDD.cache()
- // retRDD.persist(StorageLevel.DISK_ONLY)
- retRDD.count()
- println("第一次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")
- // 执行第二次 RDD 的计算
- start = System.currentTimeMillis()
- // linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count()
- retRDD.count()
- println("第二次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")
- // 持久化使用结束之后, 要想卸载数据
- // linesRDD.unpersist()
- sc.stop()
- }
- }
设置相关的持久化策略, 再观察执行时间就可以有一个较为直观的理解.
共享变量
提供了两种有限类型的共享变量, 广播变量和累加器.
介绍之前, 先直接看下面一个例子:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.{SparkConf, SparkContext}
/**
* 共享变量
* 我们在 dirver 中声明的这些局部变量或者成员变量, 可以直接在 transformation 中使用,
* 但是经过 transformation 操作之后, 是不会将最终的结果重新赋值给 dirver 中的对应的变量.
* 因为通过 action, 触发了 transformation 的操作, transformation 的操作, 都是通过
* DAGScheduler 将代码打包 序列化 交由 TaskScheduler 传送到各个 Worker 节点中的 Executor 去执行,
* 在 transformation 中执行的这些变量, 是自己节点上的变量, 不是 dirver 上最初的变量, 我们只不过是将
* driver 上的对应的变量拷贝了一份而已.
*
*
* 这个案例也反映出, 我们需要有一些操作对应的变量, 在 driver 和 executor 上面共享
*
* spark 给我们提供了两种解决方案 -- 两种共享变量
* 广播变量
* 累加器
*/
- object _02SparkShareVariableOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
- val sc = new SparkContext(conf)
- Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
- Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val linesRDD = sc.textFile("D:/data/spark/hello.txt")
- val wordsRDD = linesRDD.flatMap(_.split(" "))
- var num = 0
- val parisRDD = wordsRDD.map(word => {
- num += 1
- println("map--->num =" + num)
- (word, 1)
- })
- val retRDD = parisRDD.reduceByKey(_ + _)
- println("num =" + num)
- retRDD.foreach(println)
- println("num =" + num)
- sc.stop()
- }
- }
输出结果如下:
- num = 0
- map--->num = 1
- map--->num = 1
- map--->num = 2
- map--->num = 2
- map--->num = 3
- map--->num = 4
- (hello,3)
- (you,1)
- (me,1)
- (he,1)
- num = 0
广播变量
Spark 的另一种共享变量是广播变量. 通常情况下, 当一个 RDD 的很多操作都需要使用 driver 中定义的变量时, 每次操作, driver 都要把变量发送给 worker 节点一次, 如果这个变量中的数据很大的话, 会产生很高的传输负载, 导致执行效率降低. 使用广播变量可以使程序高效地将一个很大的只读数据发送给多个 worker 节点, 而且对每个 worker 节点只需要传输一次, 每次操作时 executor 可以直接获取本地保存的数据副本, 不需要多次传输.
这样理解, 一个 worker 中的 executor, 有 5 个 task 运行, 假如 5 个 task 都需要这从份共享数据, 就需要向 5 个 task 都传递这一份数据, 那就十分浪费网络资源和内存资源了. 使用了广播变量后, 只需要向该 worker 传递一次就可以了.
创建并使用广播变量的过程如下:
在一个类型 T 的对象 obj 上使用 SparkContext.brodcast(obj) 方法, 创建一个 Broadcast[T] 类型的广播变量, obj 必须满足 Serializable. 通过广播变量的. value() 方法访问其值. 另外, 广播过程可能由于变量的序列化时间过程或者序列化变量的传输过程过程而成为瓶颈, 而 Spark Scala 中使用的默认的 Java 序列化方法通常是低效的, 因此可以通过 spark.serializer 属性为不同的数据类型实现特定的序列化方法 (如 Kryo) 来优化这一过程.
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用 Spark 广播变量
*
* 需求:
* 用户表:
* id name age gender(0|1)
*
* 要求, 输出用户信息, gender 必须为男或者女, 不能为 0,1
*/
- object _03SparkBroadcastOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
- val sc = new SparkContext(conf)
- Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
- Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
- val userList = List(
- "001, 刘向前, 18,0",
- "002, 冯 剑, 28,1",
- "003, 李志杰, 38,0",
- "004, 郭 鹏, 48,2"
- )
- val genderMap = Map("0" -> "女", "1" -> "男")
- val genderMapBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap)
- val userRDD = sc.parallelize(userList)
- val retRDD = userRDD.map(info => {
- val prefix = info.substring(0, info.lastIndexOf(",")) // "001, 刘向前, 18"
- val gender = info.substring(info.lastIndexOf(",") + 1)
- val genderMapValue = genderMapBC.value
- val newGender = genderMapValue.getOrElse(gender, "男")
- prefix + "," + newGender
- })
- retRDD.foreach(println)
- sc.stop()
- }
- }
输出结果如下:
001, 刘向前, 18, 女
003, 李志杰, 38, 女
002, 冯 剑, 28, 男
004, 郭 鹏, 48, 男
当然这个案例只是演示一下代码的使用, 并不能看出其运行的机制.
累加器
Spark 提供的 Accumulator, 主要用于多个节点对一个变量进行共享性的操作. Accumulator 只提供了累加的功能. 但是确给我们提供了多个 task 对一个变量并行操作的功能. 但是 task 只能对 Accumulator 进行累加操作, 不能读取它的值. 只有 Driver 程序可以读取 Accumulator 的值.
非常类似于在 MR 中的一个 Counter 计数器, 主要用于统计各个程序片段被调用的次数, 和整体进行比较, 来对数据进行一个评估.
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark 共享变量之累加器 Accumulator
*
* 需要注意的是, 累加器的执行必须需要 Action 触发
*/
- object _04SparkAccumulatorOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
- val sc = new SparkContext(conf)
- Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
- Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
- // 要对这些变量都 * 7, 同时统计能够被 3 整除的数字的个数
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
- val listRDD:RDD[Int] = sc.parallelize(list)
- var counter = 0
- val counterAcc = sc.accumulator[Int](0)
- val mapRDD = listRDD.map(num => {
- counter += 1
- if(num % 3 == 0) {
- counterAcc.add(1)
- }
- num * 7
- })
- // 下面这种操作又执行了一次 RDD 计算, 所以可以考虑上面的方案, 减少一次 RDD 的计算
- // val ret = mapRDD.filter(num => num % 3 == 0).count()
- mapRDD.foreach(println)
- println("counter===" + counter)
- println("counterAcc===" + counterAcc.value)
- sc.stop()
- }
- }
输出结果如下:
- 49
- 56
- 7
- 63
- 14
- 70
- 21
- 77
- 28
- 84
- 35
- 91
- 42
- counter===0
- counterAcc===4
来源: http://www.bubuko.com/infodetail-2577596.html