一, 基本概念
累加器是 Spark 的一种变量, 顾名思义该变量只能增加. 有以下特点:
1, 累加器只能在 Driver 端构建及并只能是 Driver 读取结果, Task 只能累加.
2, 累加器不会改变 Spark Lazy 计算的特点. 只会在 Job 触发的时候进行相关累加操作.
3, 现有累加器的类型.
相信有很多学习大数据的道友, 在这里我给大家说说我滴群哦, 大数据海量知识分享, 784789432. 在此我保证, 绝对大数据的干货, 等待各位的到来, 我们一同从入门到精通吧!
二, 累加器的使用
Driver 端初始化, 并在 Action 之后获取值.
- val accum = sc.accumulator(0, "test Accumulator")
- accum.value
Executor 端进行计算
accum+=1;
三, 累加器的重点类
Class Accumulator extends Accumulable
主要是实现了累加器的初始化及封装了相关的累加器操作方法. 同时在类对象构建的时候向我们的 Accumulators 注册了累加器. 累加器的 add 操作的返回值类型和我们传入的值类型可以不一样. 所以, 我们一定要定义好如何累加和合并值. 也即 add 方法
object Accumulators:
该方法在 Driver 端管理着我们的累加器, 也包含了特定累加器的聚合操作.
trait AccumulatorParam[T] extends AccumulableParam[T, T]:
AccumulatorParam 的 addAccumulator 操作的泛型封装, 具体的实现还是要再具体实现类里面实现 addInPlace 方法.
object AccumulatorParam:
主要是进行隐式类型转换的操作.
TaskContextImpl:
在 Executor 端管理着我们的累加器.
四, 累加器的源码解析
1,Driver 端的初始化
- val accum = sc.accumulator(0, "test Accumulator")
- val acc = new Accumulator(initialValue, param, Some(name))
主要是在 Accumulable(Accumulator) 中调用了, 这样我们就可以使用 Accumulator 使用了.
Accumulators.register(this)
2,Executor 端的反序列化得到我们对象的过程
首先, 我们的 value_ 可以看到其并不支持序列化
@volatile @transient private var value_ : R = initialValue // Current value on master
其初始化是在我们反序列化的时候做的, 反序列化还完成了 Accumulator 向我们的 TaskContextImpl 的注册
反序列化是在调用 ResultTask 的 RunTask 方法的时候做的
- val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
- ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
过程中会调用
- private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
- in.defaultReadObject()
- value_ = zero
- deserialized = true
- // Automatically register the accumulator when it is deserialized with the task closure.
- //
- // Note internal accumulators sent with task are deserialized before the TaskContext is created
- // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
- // metrics, still need to register here.
- val taskContext = TaskContext.get()
- if (taskContext != null) {
- taskContext.registerAccumulator(this)
- }
- }
3, 累加器的累加
- accum+=1;
- param.addAccumulator(value_, term)
根据不同的累加器参数有不同的实现 AccumulableParam
如, int 类型. 最终调用的 AccumulatorParam 特质的 addAccumulator 方法.
- trait AccumulatorParam[T] extends AccumulableParam[T, T] {
- def addAccumulator(t1: T, t2: T): T = {
- addInPlace(t1, t2)
- }
- }
然后, 调用的是各个具体实现的 addInPlace 方法
- implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
- def addInPlace(t1: Int, t2: Int): Int = t1 + t2
- def zero(initialValue: Int): Int = 0
- }
返回后更新了我们的 Accumulators 的 value_的值.
4,Accumulator 的各个节点累加的之后的聚合操作
在 Task 类的 run 方法里面得到并返回的
(runTask(context), context.collectAccumulators())
最终在 DAGScheduler 里面调用了 updateAccumulators(event)
在 updateAccumulators 方法中
Accumulators.add(event.accumUpdates)
- def add(values: Map[Long, Any]): Unit = synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- // Since we are now storing weak references, we must check whether the underlying data
- // is valid.
- originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
- case None =>
- throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
- }
- } else {
- logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
- }
- }
- }
来源: http://www.bubuko.com/infodetail-2632872.html