本文主要基于 SkyWalking 3.2.6 正式版
1. 概述
2. collector-queue-define
2.1 QueueModule
2.2 QueueCreatorService
2.3 MessageHolder
2.4 QueueEventHandler
2.5 DaemonThreadFactory
3. collector-queue-disruptor-provider
3.1 QueueModuleDisruptorProvider
2.2 DisruptorQueueCreatorService
3.3 DisruptorEventHandler
4. collector-queue-datacarrier-provider
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知.每周更新一篇左右.
认真的源码交流微信群.
1. 概述
本文主要分享 SkyWalking Collector Queue Module,队列组件.该组件被 Collector Streaming Module 流式处理使用,提供异步执行的特性.
友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系.
Cluster Module 在 SkyWalking 架构图处于如下位置 (红框) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构,如下图所示 :
collector - queue - define
:定义队列组件接口.
collector - queue - datacarrier - provider
:基于 apm-datacarrier 的队列组件实现.目前暂未完成.
collector - queue - zookeeper - provider
:基于 Disruptor 的队列组件实现.
下面,我们从接口到实现的顺序进行分享.
:定义队列组件接口.项目结构如下 :
2. collector-queue-define
collector - queue - define
,实现 Module 抽象类,队列 Module .
2.1 QueueModule
org.skywalking.apm.collector.queue.QueueModule
,继承 Service 接口,队列创建服务接口.
#name() 实现方法,返回模块名为 "queue" .
#services() 实现方法,返回 Service 类名:QueueCreatorService .
2.2 QueueCreatorService
org.skywalking.apm.collector.queue.service.QueueCreatorService
#create(queueSize, executor) 接口方法,创建队列处理器.
一般情况下,实现该接口方法,调用 org.skywalking.apm.collector.queue.base.QueueCreator#create(queueSize, executor) 方法,创建队列处理器.
,消息持有者.
2.3 MessageHolder
org.skywalking.apm.collector.queue.base.MessageHolder
message 属性,持有的消息.
org.skywalking.apm.collector.queue.base.QueueEventHandler ,队列处理器接口.它定义了 #tell(message) 接口方法,输入消息给自己.最终,QueueEventHandler 会 "提交" 消息给 org.skywalking.apm.collector.queue.base.QueueExecutor ,执行处理该消息.
#reset() 方法,清空消息.为什么会有这个方法,下文胖友会看到.
2.4 QueueEventHandler
LocalAsyncWorkerRef 实现 QueueEventHandler 接口,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.1.2 LocalAsyncWorkerRef」 有详细解析.
2.5 DaemonThreadFactory
org.skywalking.apm.collector.queue.base.DaemonThreadFactory ,守护进程线程工厂,被用于创建消息处理器的线程.
,基于 Disruptor 的队列组件实现.
3. collector-queue-disruptor-provider
collector - queue - disruptor - provider
项目结构如下 :
默认配置,在 application-default.yml 已经配置如下:
,实现 ModuleProvider 抽象类,基于 Disruptor 的队列服务提供者.
queue:
disruptor:
3.1 QueueModuleDisruptorProvider
org.skywalking.apm.collector.queue.disruptor.CQueueModuleDisruptorProvider
#name() 实现方法,返回组件服务提供者名为 "disruptor" .
module() 实现方法,返回组件类为 QueueModule .
第 44 行 :创建 DisruptorQueueCreatorService 对象,并调用
#requiredModules() 实现方法,返回依赖组件为空.
#prepare(Properties) 实现方法,执行准备阶段逻辑.
#registerServiceImplementation()
父类方法,注册到 services .
,实现 QueueCreatorService 接口,基于 Disruptor 的队列创建服务实现类.
#start() 实现方法,方法为空.
#notifyAfterCompleted() 实现方法,方法为空.
2.2 DisruptorQueueCreatorService
org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService
方法,创建队列处理器.
#create(queueSize, executor)实现方法,
调用DisruptorQueueCreator#register(queueSize, executor)
3.2.1 DisruptorQueueCreator
友情提示:如果胖友对 Disruptor 暂时不了解,建议先使用 Disruptor 写个小 Demo .
如下是笔者阅读的文章:
《三步创建 Disruptor 应用》
《Disruptor 入门》
《剖析 Disruptor: 为什么会这么快?(一)Ringbuffer 的特别之处》
org.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator
,实现 QueueCreator 接口,基于 Disruptor 的队列创建器实现类.
#create(queueSize, executor) 实现方法,代码如下:
第 42 至 45 行:校验队列大小为 2 的指数,否则创建 Disruptor 对象会报
"bufferSize must be a power of 2"
的异常,参见 AbstractSequencer 的代码.
第 49 行:创建 Disruptor 对象.
org.skywalking.apm.collector.queue.disruptor.base.MessageHolderFactory ,MessageHolder 工厂.
第 51 至 64 行:设置 Disruptor 对象的默认异常处理器.
第 67 至 70 行:创建 DisruptorEventHandler 对象,并设置为 Disruptor 对象的事件处理器.
第 74 行:启动 Disruptor 对象.
为什么 Disruptor 要求队列大小为 2 的指数呢?如下是相关资料,感兴趣的同学可以看看 (可跳过):
FROM 《环形缓冲器》
SingleProducerSequencer#hasAvailableCapacity(requiredCapacity)
方法,代码如下:
,基于 Disruptor 的队列处理器实现类.
3.3 DisruptorEventHandler
org.skywalking.apm.collector.queue.disruptor.base.DisruptorEventHandler
ringBuffer 属性,Disruptor RingBuffer 对象.
executor 属性,执行器.
实现
org.skywalking.apm.collector.queue.base.QueueEventHandler
接口 的 #tell(message) 接口方法,标准的 Disruptor 发布事件的代码.
实现 com.lmax.disruptor.EventHandler 的 #onEvent(event, sequence, endOfBatch) 接口方法,代码如下:
endOfBatch 方法参数,标记该事件 (消息) 是否是 Disruptor 每次批处理的最后一个事件.胖友可以参见 《LMAX Disruptor - what determines the batch size?》 这篇文章,自己搭建一个 Demo 理解下该参数.
第 66 行:调用
MessageHolder#reset()
方法,清空消息,因为在 Disruptor RingBuffer 里,事件 (消息) 对象是重用的,虽然后续发布事件 (消息) 可以进行覆盖,考虑到安全性进行清空.
第 69 行:设置消息为该批量的结尾 (最后一条).为什么?在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 揭晓答案.
第 72 行:调用
QueueExecutor#execute(message)
方法,执行处理消息.
:基于 apm-datacarrier 的队列组件实现.
4. collector-queue-datacarrier-provider
collector - queue - datacarrier - provider
来源: http://www.suo.im/3Ipn8V