本文主要基于 SkyWalking 3.2.6 正式版
1. 概述
2. TraceSegmentServiceClient
2.1 实现 BootService 接口
2.2 实现 GRPCChannelListener 接口
2.3 实现 TracingContextListener 接口
2.4 实现 IConsumer 接口
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
分布式链路追踪系统, 链路的追踪大体流程如下:
Agent 收集 Trace 数据
Agent 发送 Trace 数据给 Collector
Collector 接收 Trace 数据
Collector 存储 Trace 数据到存储器, 例如, 数据库
本文主要分享第二部分 SkyWalking Agent 发送 Trace 数据
考虑到减少外部组件的依赖, Agent 收集到 Trace 数据后, 不是写入外部消息队列 ( 例如, Kafka ) 或者日志文件, 而是 Agent 写入内存消息队列, 后台线程异步发送给 Collector
本文涉及的类非常少, 如下图所示:
2. TraceSegmentServiceClient
org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 发送服务客户端它是一个服务, 也是一个客户端, 负责将 TraceSegment 异步发送到 Collector
我们先来看看 TraceSegmentServiceClient 的属性:
TIMEOUT 静态属性, 发送等待超时时长, 单位: 毫秒
lastLogTime 属性, 最后打印日志时间该属性主要用于开发调试
segmentUplinkedCounter
属性, TraceSegment 发送数量
segmentAbandonedCounter
属性, TraceSegment 被丢弃数量在 Agent 未连接上 Collector 时, 产生的 TraceSegment 将被丢弃
carrier 属性, 内存队列在 SkyWalking 源码分析 DataCarrier 异步处理库 有对 DataCarrier 的详细解析
serviceStub 属性, 非阻塞 Stub
status 属性, 连接状态
下面, 我们来介绍 TraceSegmentServiceClient 实现的接口以及对应的方法
2.1 实现 BootService 接口
#beforeBoot() 方法, 代码如下:
第 86 行: 调用
GRPCChannelManager#addChannelListener(this)
方法, 将自己添加到 GRPCChannelManager 中, 作为一个监听器, 从而调用
#statusChanged(GRPCChannelStatus)
方法, 实现对连接状态 ( status ) 的监听处理
#boot() 方法, 代码如下:
第 95 至 97 行: 创建 DataCarrier 对象, 作为内存队列, 并设置自己作为消费者, 从而调用
#consume(List < TraceSegment > )
方法, 实现异步发送 TraceSegment 到 Collector
#afterBoot() 方法, 代码如下:
第 102 行: 调用
TracingContext.ListenerManager#add(this)
方法, 将自己添加到 ListenerManager 中, 作为一个监听器, 从而调用
#afterFinished(TraceSegment)
方法, 实现收集到新的 TraceSegment , 添加到内存队列
#shutdown() 方法, 代码如下:
第 107 行: 调用
DataCarrier#shutdownConsumers()
方法, 停止消费
2.2 实现 GRPCChannelListener 接口
#statusChanged(GRPCChannelStatus)
方法, 代码如下:
第 211 至 214 行: 连接成功, 创建 Stub 对象
第 215 行: 记录连接状态
2.3 实现 TracingContextListener 接口
#afterFinished(TraceSegment)
方法, 代码如下:
第 197 至 199 行: 当
TraceSegment.ignore = true
时, 忽略该 TraceSegment
第 201 行: 提交 TraceSegment 到内存队列
2.4 实现 IConsumer 接口
#consume(List < TraceSegment > )
方法, 代码如下:
------ 连接中 ------
第 119 行: 创建 org.skywalking.apm.agent.core.remoteGRPCStreamServiceStatus 对象
第 122 至 141 行: 创建 StreamObserver 对象在下面, 我们可以看到 Agent 发送 TraceSegment 给 Collector 是非阻塞的方式, 通过该对象, 观察执行结果
第 130 行 || 第 139 行: 当发生错误或者完成时, 调用
GRPCStreamServiceStatus#finished()
方法, 标记完成为什么呢? 下面会看到
第 134 行: 调用
GRPCChannelManager#reportError(Throwable)
方法, 汇报错误如果是连接错误, GRPCChannelManager 会负责断开重连
第 144 至 151 行: 逐条非阻塞发送 TraceSegment 请求
第 146 行: 调用
TraceSegment#transform()
方法, 将 TraceSegment 转换成
org.skywalking.apm.network.proto.UpstreamSegment
对象, 用于 gRPC 传输, 参见 TraceSegmentService.proto 的数据结构定义
- DistributedTraceId#toUniqueId()
- ID#transform()
- AbstractTracingSpan#transform()
- ExitSpan#transform()
- LogDataEntity#transform()
- TraceSegmentRef#transform()
- KeyValuePair#transform()
第 154 行: 调用
StreamObserver#onCompleted()
方法, 标记全部请求发送完成
第 157 至 159 行: 调用
GRPCStreamServiceStatus#wait4Finish(maxTimeout)
方法, 等待 Collector 处理完成这就是为什么上面需要调用
GRPCStreamServiceStatus#finished()
方法完成后, 记录数量到
segmentUplinkedCounter
注意, 此处若等待完成超时, TraceSegment 依然在发送, 或者被 Collector 处理中, 直到最终的成功或失败
------ 未连接 ------
第 161 行: 记录数量到
- segmentAbandonedCounter
- ------ ALL ------
调用
#printUplinkStatus()
方法, 每三十秒, 打印一次 segmentUplinkedCounter 和 segmentAbandonedCounter 数据主要用于开发调试另外, 该方法会重置
segmentUplinkedCounter
和
segmentAbandonedCounter
计数
ps: 目前 DataCarrier 最长每 20 秒消费一次
#onError(List < TraceSegment > , Throwable)
方法, 当消费发生异常时, 打印日志
来源: https://juejin.im/entry/5a8360a46fb9a0633b211149