本文主要基于 SkyWalking 3.2.6 正式版
1. 概述
- 2. collector-queue-define
- 2.1 QueueModule
- 2.2 QueueCreatorService
- 2.3 MessageHolder
- 2.4 QueueEventHandler
- 2.5 DaemonThreadFactory
- 3. collector-queue-disruptor-provider
- 3.1 QueueModuleDisruptorProvider
- 2.2 DisruptorQueueCreatorService
- 3.3 DisruptorEventHandler
- 4. collector-queue-datacarrier-provider
666. 彩蛋
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 SkyWalking Collector Queue Module, 队列组件该组件被 Collector Streaming Module 流式处理使用, 提供异步执行的特性
友情提示: 建议先阅读 SkyWalking 源码分析 Collector 初始化 , 以了解 Collector 组件体系
Cluster Module 在 SkyWalking 架构图处于如下位置( 红框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构, 如下图所示 :
collector-queue-define
: 定义队列组件接口
collector-queue-datacarrier-provider
: 基于 apm-datacarrier 的队列组件实现目前暂未完成
collector-queue-zookeeper-provider
: 基于 Disruptor 的队列组件实现
下面, 我们从接口到实现的顺序进行分享
- 2. collector-queue-define
- collector-queue-define
: 定义队列组件接口项目结构如下 :
- 2.1 QueueModule
- org.skywalking.apm.collector.queue.QueueModule
, 实现 Module 抽象类, 队列 Module
- #name() 实现方法, 返回模块名为 "queue"
- #services() 实现方法, 返回 Service 类名: QueueCreatorService
- 2.2 QueueCreatorService
- org.skywalking.apm.collector.queue.service.QueueCreatorService
, 继承 Service 接口, 队列创建服务接口
#create(queueSize, executor)
接口方法, 创建队列处理器
一般情况下, 实现该接口方法, 调用
org.skywalking.apm.collector.queue.base.QueueCreator#create(queueSize, executor)
方法, 创建队列处理器
- 2.3 MessageHolder
- org.skywalking.apm.collector.queue.base.MessageHolder
, 消息持有者
message 属性, 持有的消息
- #reset() 方法, 清空消息为什么会有这个方法, 下文胖友会看到
- 2.4 QueueEventHandler
- org.skywalking.apm.collector.queue.base.QueueEventHandler
, 队列处理器接口它定义了 #tell(message) 接口方法, 输入消息给自己最终, QueueEventHandler 会 "提交" 消息给
org.skywalking.apm.collector.queue.base.QueueExecutor
, 执行处理该消息
LocalAsyncWorkerRef 实现 QueueEventHandler 接口, 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(一)3.1.2 LocalAsyncWorkerRef 有详细解析
- 2.5 DaemonThreadFactory
- org.skywalking.apm.collector.queue.base.DaemonThreadFactory
, 守护进程线程工厂, 被用于创建消息处理器的线程
- 3. collector-queue-disruptor-provider
- collector-queue-disruptor-provider
, 基于 Disruptor 的队列组件实现
项目结构如下 :
默认配置, 在
application-default.yml
已经配置如下:
- queue: disruptor:
- 3.1 QueueModuleDisruptorProvider
- org.skywalking.apm.collector.queue.disruptor.CQueueModuleDisruptorProvider
, 实现 ModuleProvider 抽象类, 基于 Disruptor 的队列服务提供者
#name() 实现方法, 返回组件服务提供者名为 "disruptor"
module() 实现方法, 返回组件类为 QueueModule
- #requiredModules() 实现方法, 返回依赖组件为空
- #prepare(Properties)
实现方法, 执行准备阶段逻辑
第 44 行 : 创建 DisruptorQueueCreatorService 对象, 并调用
#registerServiceImplementation()
父类方法, 注册到 services
- #start() 实现方法, 方法为空
- #notifyAfterCompleted()
实现方法, 方法为空
- 2.2 DisruptorQueueCreatorService
- org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService
, 实现 QueueCreatorService 接口, 基于 Disruptor 的队列创建服务实现类
#create(queueSize, executor)
实现方法, 调用
DisruptorQueueCreator#register(queueSize, executor)
方法, 创建队列处理器
3.2.1 DisruptorQueueCreator
友情提示: 如果胖友对 Disruptor 暂时不了解, 建议先使用 Disruptor 写个小 Demo
如下是笔者阅读的文章:
三步创建 Disruptor 应用
Disruptor 入门
剖析 Disruptor: 为什么会这么快?(一)Ringbuffer 的特别之处
org.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator
, 实现 QueueCreator 接口, 基于 Disruptor 的队列创建器实现类
#create(queueSize, executor)
实现方法, 代码如下:
第 42 至 45 行: 校验队列大小为 2 的指数, 否则创建 Disruptor 对象会报
"bufferSize must be a power of 2"
的异常, 参见 AbstractSequencer 的代码
第 49 行: 创建 Disruptor 对象
org.skywalking.apm.collector.queue.disruptor.base.MessageHolderFactory
,MessageHolder 工厂
第 51 至 64 行: 设置 Disruptor 对象的默认异常处理器
第 67 至 70 行: 创建 DisruptorEventHandler 对象, 并设置为 Disruptor 对象的事件处理器
第 74 行: 启动 Disruptor 对象
为什么 Disruptor 要求队列大小为 2 的指数呢? 如下是相关资料, 感兴趣的同学可以看看( 可跳过 ):
FROM 环形缓冲器
SingleProducerSequencer#hasAvailableCapacity(requiredCapacity)
方法, 代码如下:
- 3.3 DisruptorEventHandler
- org.skywalking.apm.collector.queue.disruptor.base.DisruptorEventHandler
, 基于 Disruptor 的队列处理器实现类
ringBuffer 属性, Disruptor RingBuffer 对象
executor 属性, 执行器
实现
org.skywalking.apm.collector.queue.base.QueueEventHandler
接口 的 #tell(message) 接口方法, 标准的 Disruptor 发布事件的代码
实现
com.lmax.disruptor.EventHandler
的
#onEvent(event, sequence, endOfBatch)
接口方法, 代码如下:
endOfBatch 方法参数, 标记该事件 ( 消息 ) 是否是 Disruptor 每次批处理的最后一个事件胖友可以参见 LMAX Disruptor - what determines the batch size? 这篇文章, 自己搭建一个 Demo 理解下该参数
第 66 行: 调用
MessageHolder#reset()
方法, 清空消息, 因为在 Disruptor RingBuffer 里, 事件 ( 消息 ) 对象是重用的, 虽然后续发布事件 ( 消息 ) 可以进行覆盖, 考虑到安全性进行清空
第 69 行: 设置消息为该批量的结尾 ( 最后一条 ) 为什么? 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(二)3. AggregationWorker 揭晓答案
第 72 行: 调用
QueueExecutor#execute(message)
方法, 执行处理消息
- 4. collector-queue-datacarrier-provider
- collector-queue-datacarrier-provider
: 基于 apm-datacarrier 的队列组件实现
目前暂未完成
666. 彩蛋
在地铁里, 夹缝中写代码( 当然有座位 )
不容易, 回家看蜘蛛侠
胖友, 分享一波朋友圈可好
来源: http://www.suo.im/4zM7N2