概览
Spark 程序在运行的过程中, Driver 端的很多功能都依赖于事件的传递和处理, 而事件总线在这中间发挥着至关重要的纽带作用. 事件总线通过异步线程, 提高了 Driver 执行的效率.
Spark 定义了一个特质[1]ListenerBus, 可以接收事件并且将事件提交到对应事件的监听器. 为了对 ListenerBus 有个直观的理解, 我们先来看看它的代码实现, 见代码清单 1.
代码清单 1 ListenerBus 的定义
- private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
- private[spark] val listeners = new CopyOnWriteArrayList[L]
- final def addListener(listener: L): Unit = {
- listeners.add(listener)
- }
- final def removeListener(listener: L): Unit = {
- listeners.remove(listener)
- }
- final def postToAll(event: E): Unit = {
- val iter = listeners.iterator
- while (iter.hasNext) {
- val listener = iter.next()
- try {
- doPostEvent(listener, event)
- } catch {
- case NonFatal(e) =>
- logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
- }
- }
- }
- protected def doPostEvent(listener: L, event: E): Unit
- private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
- val c = implicitly[ClassTag[T]].runtimeClass
- listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
- }
- }
代码清单 1 中展示了 ListenerBus 是个泛型特质, 其泛型参数为 [L <: AnyRef, E], 其中 L 是代表监听器的泛型参数, 可以看到 ListenerBus 支持任何类型的监听器, E 是代表事件的泛型参数. ListenerBus 中各个成员的作用如下:
listeners: 用于维护所有注册的监听器, 其数据结构为 CopyOnWriteArrayList[L];
addListener: 向 listeners 中添加监听器的方法, 由于 listeners 采用 CopyOnWriteArrayList 来实现, 所以 addListener 方法是线程安全的;
removeListener: 从 listeners 中移除监听器的方法, 由于 listeners 采用 CopyOnWriteArrayList 来实现, 所以 removeListener 方法是线程安全的;
postToAll: 此方法的作用是将事件投递给所有的监听器. 虽然 CopyOnWriteArrayList 本身是线程的安全的, 但是由于 postToAll 方法内部引入了 "先检查后执行" 的逻辑, 因而 postToAll 方法不是线程安全的, 所以所有对 postToAll 方法的调用应当保证在同一个线程中;
doPostEvent: 用于将事件投递给指定的监听器, 此方法只提供了接口定义, 具体实现需要子类提供;
findListenersByClass: 查找与指定类型相同的监听器列表.
下面将分别对以下内容进行介绍:
ListenerBus 的继承体系
SparkListenerBus 详解
LiveListenerBus 详解
[1] 特质是 Scala 语言中提供真正的多重继承的语法特性, 类似于 Java 的 Interface, 但是又可以实现方法. 有关 Scala 特质的更多介绍请访问 Scala 官网 http://www.scala-lang.org.
ListenerBus 的继承体系
理解了 ListenerBus 的定义后, 本小节一起来看看有哪些类继承了它. ListenerBus 的类继承体系如图 1 所示.
图 1 ListenerBus 的类继承体系
从图 1 中可以看到有三种 ListenerBus 的具体实现, 分别为:
SparkListenerBus: 用于将 SparkListenerEvent 类型的事件投递到 SparkListenerInterface 类型的监听器;
StreamingQueryListenerBus: 用于将 StreamingQueryListener.Event 类型的事件投递到 StreamingQueryListener 类型的监听器, 此外还会将 StreamingQueryListener.Event 类型的事件交给 SparkListenerBus;
StreamingListenerBus: 用于将 StreamingListenerEvent 类型的事件投递到 StreamingListener 类型的监听器, 此外还会将 StreamingListenerEvent 类型的事件交给 SparkListenerBus.
SparkListenerBus 也有两种实现:
LiveListenerBus: 采用异步线程将 SparkListenerEvent 类型的事件投递到 SparkListener 类型的监听器;
ReplayListenerBus: 用于从序列化的事件数据中重播事件.
有了对事件总线的这些介绍, 读者已经在宏观上对其有所认识. 但是如果没有具体的实现, ListenerBus 本身也无法发挥作用. 下一小节我们将选择对 SparkListenerBus 从更加微观的角度说明如何使用事件总线.
SparkListenerBus 详解
有了上一节对 ListenerBus 类继承体系的介绍, 本小节将详细介绍 SparkListenerBus 的实现, 见代码清单 2.
代码清单 2 SparkListenerBus 的实现
- private[spark] trait SparkListenerBus
- extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
- protected override def doPostEvent(
- listener: SparkListenerInterface,
- event: SparkListenerEvent): Unit = {
- event match {
- case stageSubmitted: SparkListenerStageSubmitted =>
- listener.onStageSubmitted(stageSubmitted)
- case stageCompleted: SparkListenerStageCompleted =>
- listener.onStageCompleted(stageCompleted)
- case jobStart: SparkListenerJobStart =>
- listener.onJobStart(jobStart)
- case jobEnd: SparkListenerJobEnd =>
- listener.onJobEnd(jobEnd)
- case taskStart: SparkListenerTaskStart =>
- listener.onTaskStart(taskStart)
- case taskGettingResult: SparkListenerTaskGettingResult =>
- listener.onTaskGettingResult(taskGettingResult)
- case taskEnd: SparkListenerTaskEnd =>
- listener.onTaskEnd(taskEnd)
- case environmentUpdate: SparkListenerEnvironmentUpdate =>
- listener.onEnvironmentUpdate(environmentUpdate)
- case blockManagerAdded: SparkListenerBlockManagerAdded =>
- listener.onBlockManagerAdded(blockManagerAdded)
- case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
- listener.onBlockManagerRemoved(blockManagerRemoved)
- case unpersistRDD: SparkListenerUnpersistRDD =>
- listener.onUnpersistRDD(unpersistRDD)
- case applicationStart: SparkListenerApplicationStart =>
- listener.onApplicationStart(applicationStart)
- case applicationEnd: SparkListenerApplicationEnd =>
- listener.onApplicationEnd(applicationEnd)
- case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
- listener.onExecutorMetricsUpdate(metricsUpdate)
- case executorAdded: SparkListenerExecutorAdded =>
- listener.onExecutorAdded(executorAdded)
- case executorRemoved: SparkListenerExecutorRemoved =>
- listener.onExecutorRemoved(executorRemoved)
- case blockUpdated: SparkListenerBlockUpdated =>
- listener.onBlockUpdated(blockUpdated)
- case logStart: SparkListenerLogStart => // ignore event log metadata
- case _ => listener.onOtherEvent(event)
- }
- }
- }
我们看到 SparkListenerBus 已经实现了 ListenerBus 的 doPostEvent 方法, 通过对 SparkListenerEvent 事件的匹配, 执行 SparkListenerInterface 监听器的相应方法.
这里的 SparkListenerEvent 其实是个特质, 代码清单 2 中列出的 SparkListenerStageSubmitted,SparkListenerStageCompleted 等都是继承了 SparkListenerEvent 特质的样例类[2]. 为说明问题, 这里仅仅摘选 SparkListenerEvent 及部分 SparkListenerEvent 子类的实现如下:
- @DeveloperApi
- @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
- trait SparkListenerEvent {
- protected[spark] def logEvent: Boolean = true
- }
- @DeveloperApi
- case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
- extends SparkListenerEvent
- @DeveloperApi
- case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
- @DeveloperApi
- case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
- extends SparkListenerEvent
- // 省略其他 SparkListenerEvent 的实现
- private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
SparkListenerInterface 也是一个特质, 其中定义了所有 SparkListener 应当遵守的接口规范. 由于 SparkListenerInterface 中定义了很多接口, 为说明问题只摘抄 SparkListenerInterface 中的部分接口定义, 代码如下:
- private[spark] trait SparkListenerInterface {
- def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
- def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
- // 省略其他接口方法
- def onOtherEvent(event: SparkListenerEvent): Unit
- }
结合代码清单 2, 我们知道以上代码片段中的 onStageCompleted 和 onStageSubmitted 将在 SparkListenerBus 的 doPostEvent 方法中分别匹配到 SparkListenerStageCompleted 和 SparkListenerStageSubmitted 事件时执行, 而对于 doPostEvent 中无法匹配的事件, 都将执行 onOtherEvent 方法.
在详细介绍了 ListenerBus 及 SparkListenerBus 后, 我们知道当有事件需要通知监听器的时候, 可以调用 ListenerBus 的 postToAll 方法, postToAll 方法遍历所有监听器并调用 SparkListenerBus 实现的 doPostEvent 方法, doPostEvent 方法对事件类型进行匹配后调用监听器的不同方法. 整个投递事件的过程是通过方法调用实现的, 所以这是一个同步调用. 在监听器比较多的时候这个过程会相对比较耗时 (比如用于写日志的 EventLoggingListener 在调度频繁的时候, 有可能导致写入延迟, 这将导致部分事件的丢失. 此问题已在 spark2.3.0 版本中得到改进.), 在 Spark UI(在《Spark 内核设计的艺术 架构设计与实现》一书的第 4 章中详细介绍) 中为了达到页面的即时刷新 , 实现了 SparkListenerBus 的子类 LiveListenerBus. 下一小节将围绕 LiveListenerBus 来详细说明异步投递消息的实现细节.
[2] 样例类是 Scala 语言的语法特性. 样例类是一种特殊的类型, 常用作事件, 参数, 模式匹配等. 有关样例类的更多介绍, 请读者阅读 Scala 语言的相关资料.
LiveListenerBus 详解
LiveListenerBus 继承了 SparkListenerBus, 并实现了将事件异步投递给监听器, 达到实时刷新 UI 界面数据的效果. LiveListenerBus 主要由以下部分组成:
eventQueue: 是 SparkListenerEvent 事件的阻塞队列, 队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置, 默认为 10000(Spark 早期版本中属于静态属性, 固定为 10000, 这导致队列堆满时, 只得移除一些最老的事件, 最终导致各种问题与 bug);
started: 标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;
stopped: 标记 LiveListenerBus 的停止状态的 AtomicBoolean 类型的变量;
droppedEventsCounter: 使用 AtomicLong 类型对删除的事件进行计数, 每当日志打印了 droppedEventsCounter 后, 会将 droppedEventsCounter 重置为 0;
lastReportTimestamp: 用于记录最后一次日志打印 droppedEventsCounter 的时间戳;
processingEvent: 用来标记当前正有事件被 listenerThread 线程处理;
logDroppedEvent:AtomicBoolean 类型的变量, 用于标记是否由于 eventQueue 已满, 导致新的事件被删除;
eventLock: 用于当有新的事件到来时释放信号量, 当对事件进行处理时获取信号量;
listeners: 继承自 LiveListenerBus 的监听器数组;
listenerThread: 处理事件的线程.
异步事件处理线程
listenerThread 用于异步处理 eventQueue 中的事件, 为了便于说明, 这里将展示 listenerThread 及 LiveListenerBus 中的主要代码片段, 见代码清单 3.
代码清单 3 LiveListenerBus 主要逻辑的代码片段
- private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
- private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
- private def validateAndGetQueueSize(): Int = {
- val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
- if (queueSize <= 0) {
- throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be> 0!")
- }
- queueSize
- }
- private val started = new AtomicBoolean(false)
- private val stopped = new AtomicBoolean(false)
- private val droppedEventsCounter = new AtomicLong(0L)
- @volatile private var lastReportTimestamp = 0L
- private var processingEvent = false
- private val logDroppedEvent = new AtomicBoolean(false)
- private val eventLock = new Semaphore(0)
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- LiveListenerBus.withinListenerThread.withValue(true) {
- while (true) {
- eventLock.acquire() // 获取信号量
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll // 从 eventQueue 中获取事件
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from eventQueue means" +
- "the listener bus has been stopped. So `stopped` must be true")
- }
- return
- }
- postToAll(event) // 事件处理
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
- }
- }
- }
- }
通过分析代码清单 3,listenerThread 的工作步骤为:
不断获取信号量(当可以获取信号量时, 说明还有事件未处理);
通过同步控制, 将 processingEvent 设置为 true;
从 eventQueue 中获取事件;
调用超类 ListenerBus 的 postToAll 方法(postToAll 方法对监听器进行遍历, 并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法);
每次循环结束依然需要通过同步控制, 将 processingEvent 设置为 false;
值得一提的是, listenerThread 的 run 方法中调用了 Utils 的 tryOrStopSparkContext,tryOrStopSparkContext 方法可以保证当 listenerThread 的内部循环抛出异常后启动一个新的线程停止 SparkContext(SparkContext 的内容将在第 4 章详细介绍, tryOrStopSparkContext 方法的具体实现请阅读 Utils 工具类的实现).
LiveListenerBus 的消息投递
在解释了异步线程 listenerThread 的工作内容后, 还有一个要点没有解释: eventQueue 中的事件是如何放进去的呢? 由于 eventQueue 定义在 LiveListenerBus 中, 因此 ListenerBus 和 SparkListenerBus 中并没有操纵 eventQueue 的方法, 要将事件放入 eventQueue 只能依靠 LiveListenerBus 自己了, 其 post 方法就是为此目的而生的, 见代码清单 4.
代码清单 4 向 LiveListenerBus 投递 SparkListenerEvent 事件
- def post(event: SparkListenerEvent): Unit = {
- if (stopped.get) {
- logError(s"$name has already stopped! Dropping event $event")
- return
- }
- val eventAdded = eventQueue.offer(event) // 向 eventQueue 中添加事件
- if (eventAdded) {
- eventLock.release()
- } else {
- onDropEvent(event)
- droppedEventsCounter.incrementAndGet()
- }
- // 打印删除事件数的日志
- val droppedEvents = droppedEventsCounter.get
- if (droppedEvents> 0) {
- if (System.currentTimeMillis() - lastReportTimestamp>= 60 * 1000) {
- if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
- val prevLastReportTimestamp = lastReportTimestamp
- lastReportTimestamp = System.currentTimeMillis()
- logWarning(s"Dropped $droppedEvents SparkListenerEvents since" +
- new java.util.Date(prevLastReportTimestamp))
- }
- }
- }
- }
从代码清单 4 看到 post 方法的处理步骤如下:
判断 LiveListenerBus 是否已经处于停止状态;
向 eventQueue 中添加事件. 如果添加成功, 则释放信号量进而催化 listenerThread 能够有效工作. 如果 eventQueue 已满造成添加失败, 则移除事件, 并对删除事件计数器 droppedEventsCounter 进行自增;
如果有事件被删除, 并且当前系统时间距离上一次打印 droppedEventsCounter 超过了 60 秒则将 droppedEventsCounter 打印到日志.
LiveListenerBus 与监听器
与 LiveListenerBus 配合使用的监听器, 并非是父类 SparkListenerBus 的类型参数 SparkListenerInterface, 而是继承自 SparkListenerInterface 的 SparkListener 及其子类. 图 2 列出了 Spark 中监听器 SparkListener 以及它的 6 种最常用的实现[3].
图 2 SparkListener 的类继承体系
SparkListener 虽然实现了 SparkListenerInterface 中的每个方法, 但是其实都是空实现, 具体的实现需要交给子类去完成.
本文首先对事件总线的接口定义进行了一些介绍, 之后选择 ListenerBus 的子类 SparkListenerBus 与 LiveListenerBus 作为具体的实现例子进行分析, 最后本文选择 LiveListenerBus 作为具体的实现例子进行分析, 这里将通过图 3 更加直观的展示 ListenerBus,SparkListenerBus 及 LiveListenerBus 的工作原理.
图 3 LiveListenerBus 的工作流程图
最后对于图 3 作一些补充说明: 图中的 DAGScheduler,SparkContext,BlockManagerMasterEndpoint,DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源, 它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的.
[3] 除了本节列出的的六种 SparkListener 的子类外, 还有很多其他的子类, 这里就不一一列出了, 感兴趣的读者可以查阅 Spark 相关文档或阅读源码知晓.
关于《Spark 内核设计的艺术 架构设计与实现》
经过近一年的准备, 基于 Spark2.1.0 版本的《Spark 内核设计的艺术 架构设计与实现》一书现已出版发行, 图书如图:
纸质版售卖链接如下:
京东: https://item.jd.com/12302500.html
来源: https://www.cnblogs.com/jiaan-geng/p/10137655.html