Driver 端初始化构建 Accumulator 并初始化, 同时完成了 Accumulator 注册, Accumulators.register(this) 时 Accumulator 会在序列化后发送到 Executor 端
Driver 接收到 ResultTask 完成的状态更新后, 会去更新 Value 的值 然后在 Action 操作执行后就可以获取到 Accumulator 的值了
Executor 端
Executor 端接收到 Task 之后会进行反序列化操作, 反序列化得到 RDD 和 function. 同时在反序列化的同时也去反序列化 Accumulator(在 readObject 方法中完成), 同时也会向 TaskContext 完成注册
完成任务计算之后, 随着 Task 结果一起返回给 Driver
结合源码分析
Driver 端初始化
??Driver 端主要经过以下步骤, 完成初始化操作:
- val accum = sparkContext.accumulator(0, "AccumulatorTest")
- val acc = new Accumulator(initialValue, param, Some(name))
- Accumulators.register(this)
Executor 端反序列化得到 Accumulator
?? 反序列化是在调用 ResultTask 的 runTask 方式时候做的操作:
- // 会反序列化出来 RDD 和自己定义的 function
- val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
- ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
?? 在反序列化的过程中, 会调用 Accumulable 中的 readObject 方法:
- private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
- in.defaultReadObject()
- // value 的初始值为 zero; 该值是会被序列化的
- value_ = zero
- deserialized = true
- // Automatically register the accumulator when it is deserialized with the task closure.
- //
- // Note internal accumulators sent with task are deserialized before the TaskContext is created
- // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
- // metrics, still need to register here.
- val taskContext = TaskContext.get()
- if (taskContext != null) {
- // 当前反序列化所得到的对象会被注册到 TaskContext 中
- // 这样 TaskContext 就可以获取到累加器
- // 任务运行结束之后, 就可以通过 context.collectAccumulators() 返回给 executor
- taskContext.registerAccumulator(this)
- }
- }
注意
Accumulable.scala 中的 value_, 是不会被序列化的,@transient 关键词修饰了
@volatile @transient private var value_ : R = initialValue // Current value on master
累加器在各个节点的累加操作
针对传入 function 中不同的操作, 对应有不同的调用方法, 以下列举几种 (在 Accumulator.scala 中):
- def += (term: T) {
- value_ = param.addAccumulator(value_, term)
- }
- def add(term: T) {
- value_ = param.addAccumulator(value_, term)
- }
- def ++= (term: R) {
- value_ = param.addInPlace(value_, term)
- }
根据不同的累加器参数, 有不同实现的 AccumulableParam(在 Accumulator.scala 中):
- trait AccumulableParam[R, T] extends Serializable {
- /**
- def addAccumulator(r: R, t: T): R
- def addInPlace(r1: R, r2: R): R
- def zero(initialValue: R): R
- }
不同的实现如下图所示:
以 IntAccumulatorParam 为例:
- implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
- def addInPlace(t1: Int, t2: Int): Int = t1 + t2
- def zero(initialValue: Int): Int = 0
- }
我们发现 IntAccumulatorParam 实现的是 trait AccumulatorParam[T]:
- trait AccumulatorParam[T] extends AccumulableParam[T, T] {
- def addAccumulator(t1: T, t2: T): T = {
- addInPlace(t1, t2)
- }
- }
在各个节点上的累加操作完成之后, 就会紧跟着返回更新之后的 Accumulators 的 value_值
聚合操作
在 Task.scala 中的 run 方法, 会执行如下:
- // 返回累加器, 并运行 task
- // 调用 TaskContextImpl 的 collectAccumulators, 返回值的类型为一个 Map
- (runTask(context), context.collectAccumulators())
在 Executor 端已经完成了一系列操作, 需要将它们的值返回到 Driver 端进行聚合汇总, 整个顺序如图累加器执行流程:
根据执行流程, 我们可以发现, 在执行完 collectAccumulators 方法之后, 最终会在 DAGScheduler 中调用 updateAccumulators(event), 而在该方法中会调用 Accumulators 的 add 方法, 从而完成聚合操作:
- def add(values: Map[Long, Any]): Unit = synchronized {
- // 遍历传进来的值
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- // Since we are now storing weak references, we must check whether the underlying data
- // is valid.
- // 根据 id 从注册的 Map 中取出对应的累加器
- originals(id).get match {
- // 将值给累加起来, 最终将结果加到 value 里面
- // ++= 是被重载了
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
- case None =>
- throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
- }
- } else {
- logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
- }
- }
- }
获取累加器的值
通过 accum.value 方法可以获取到累加器的值
至此, 累加器执行完毕.
来源: http://www.bubuko.com/infodetail-3105027.html