LiveListenerBus
首先, 它定义了 4 个 消息堵塞队列, 队列的名字分别为 shared,appStatus,executorManagement,eventLog. 队列的类型是 org.apache.spark.scheduler.AsyncEventQueue#AsyncEventQueue, 保存在 queues 变量中. 每一个队列上都可以注册监听器, 如果队列没有监听器, 则会被移除.
它有启动和 stop 和 start 两个标志位来指示 监听总线的的启动停止状态. 如果总线没有启动, 有事件过来, 先放到 一个待添加的可变数组中, 否则直接将事件 post 到每一个队列中.
其直接依赖类是 AsyncEventQueue, 相当于 LiveListenerBus 的多事件队列是对 AsyncEventQueue 进一步的封装.
AsyncEventQueue
其继承关系如下:
它有启动和 stop 和 start 两个标志位来指示 监听总线的的启动停止状态.
其内部维护了 listenersPlusTimers 主要就是用来保存注册到这个总线上的监听器对象的.
post 操作将事件放入内部的 LinkedBlockingQueue 中, 默认大小是 10000.
有一个事件分发器, 它不停地从 LinkedBlockingQueue 执行 take 操作, 获取事件, 并将事件进一步分发给所有的监听器, 由 org.apache.spark.scheduler.SparkListenerBus#doPostEvent 方法实现事件转发, 具体代码如下:
- protected override def doPostEvent(
- listener: SparkListenerInterface,
- event: SparkListenerEvent): Unit = {
- event match {
- case stageSubmitted: SparkListenerStageSubmitted =>
- listener.onStageSubmitted(stageSubmitted)
- case stageCompleted: SparkListenerStageCompleted =>
- listener.onStageCompleted(stageCompleted)
- case jobStart: SparkListenerJobStart =>
- listener.onJobStart(jobStart)
- case jobEnd: SparkListenerJobEnd =>
- listener.onJobEnd(jobEnd)
- case taskStart: SparkListenerTaskStart =>
- listener.onTaskStart(taskStart)
- case taskGettingResult: SparkListenerTaskGettingResult =>
- listener.onTaskGettingResult(taskGettingResult)
- case taskEnd: SparkListenerTaskEnd =>
- listener.onTaskEnd(taskEnd)
- case environmentUpdate: SparkListenerEnvironmentUpdate =>
- listener.onEnvironmentUpdate(environmentUpdate)
- case blockManagerAdded: SparkListenerBlockManagerAdded =>
- listener.onBlockManagerAdded(blockManagerAdded)
- case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
- listener.onBlockManagerRemoved(blockManagerRemoved)
- case unpersistRDD: SparkListenerUnpersistRDD =>
- listener.onUnpersistRDD(unpersistRDD)
- case applicationStart: SparkListenerApplicationStart =>
- listener.onApplicationStart(applicationStart)
- case applicationEnd: SparkListenerApplicationEnd =>
- listener.onApplicationEnd(applicationEnd)
- case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
- listener.onExecutorMetricsUpdate(metricsUpdate)
- case executorAdded: SparkListenerExecutorAdded =>
- listener.onExecutorAdded(executorAdded)
- case executorRemoved: SparkListenerExecutorRemoved =>
- listener.onExecutorRemoved(executorRemoved)
- case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
- listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
- case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
- listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
- case executorBlacklisted: SparkListenerExecutorBlacklisted =>
- listener.onExecutorBlacklisted(executorBlacklisted)
- case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
- listener.onExecutorUnblacklisted(executorUnblacklisted)
- case nodeBlacklisted: SparkListenerNodeBlacklisted =>
- listener.onNodeBlacklisted(nodeBlacklisted)
- case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
- listener.onNodeUnblacklisted(nodeUnblacklisted)
- case blockUpdated: SparkListenerBlockUpdated =>
- listener.onBlockUpdated(blockUpdated)
- case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
- listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
- case _ => listener.onOtherEvent(event)
- }
- }
然后去调用 listener 的相对应的方法.
就这样, 事件总线上的消息事件被监听器消费了.
来源: http://www.bubuko.com/infodetail-3110619.html