本文主要基于 SkyWalking 3.2.6 正式版
1. 概述
- 2. collector-remote-define
- 2.1 RemoteModule
- 2.2 RemoteSenderService
- 2.3 RemoteClientService
- 2.4 RemoteClient
- 2.5 CommonRemoteDataRegisterService
- 2.6 RemoteSerializeService
- 2.7 RemoteSerializeService
- 3. collector-remote-grpc-provider
- 3.1 RemoteModuleGRPCProvider
- 3.2 GRPCRemoteSenderService
- 3.3 GRPCRemoteClientService
- 3.4 GRPCRemoteClient
- 3.5 RemoteCommonServiceHandler
- 3.6 GRPCRemoteSerializeService
- 3.7 GRPCRemoteDeserializeService
- 4. collector-remote-grpc-provider
666. 彩蛋
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 SkyWalking Collector Remote 远程通信服务该服务用于 Collector 集群内部通信
目前集群内部通信的目的, 跨节点的流式处理 Remote Module 应用在 SkyWalking 架构图如下位置( 红框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构, 如下图所示 :
collector-remote-define
: 定义远程通信接口
collector-remote-kafka-provider
: 基于 Kafka 的远程通信实现目前暂未完成
collector-remote-grpc-provider
: 基于 Google gRPC 的远程通信实现生产环境目前使用
下面, 我们从接口到实现的顺序进行分享
- 2. collector-remote-define
- collector-remote-define
: 定义远程通信接口项目结构如下 :
整体流程如下图:
我们按照整个流程的处理顺序, 逐个解析涉及到的类与接口
- 2.1 RemoteModule
- org.skywalking.apm.collector.remote.RemoteModule
, 实现 Module 抽象类, 远程通信 Module
- #name() 实现方法, 返回模块名为 "remote"
- #services() 实现方法, 返回 Service 类名: RemoteSenderService RemoteDataRegisterService
- 2.2 RemoteSenderService
- org.skywalking.apm.collector.remote.service.RemoteSenderService
, 继承 Service 接口, 远程发送服务接口, 定义了
#send(graphId, nodeId, data, selector)
接口方法, 调用 RemoteClient , 发送数据
graphId 方法参数, Graph 编号通过 graphId , 可以查找到对应的 Graph 对象
Graph 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(一)2. apm-collector-core/graph 有详细解析
nodeId 方法参数, Worker 编号通过 workerId , 可以查找在 Graph 对象中的 Worker 对象, 从而 Graph 中的流式处理
Worker 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(一)3. apm-collector-stream 有详细解析
data 方法参数, Data 数据对象例如, 流式处理的具体数据对象
Data 在 SkyWalking 源码分析 Collector Storage 存储组件 2. apm-collector-core 有详细解析
selector 方法参数,
org.skywalking.apm.collector.remote.service.Selector
选择器对象根据 Selector 对象, 使用对应的负载均衡策略, 选择集群内的 Collector 节点, 发送数据
RemoteSenderService.Mode 返回值, 发送模式分成 Remote 和 Local 两种方式前者, 发送数据到远程的 Collector 节点; 后者, 发送数据到本地, 即本地处理, 参见
RemoteWorkerRef#in(message)
方法
- 2.3 RemoteClientService
- org.skywalking.apm.collector.remote.service.RemoteClientService
, 继承 Service 接口, 远程客户端服务接口, 定义了
#create(host, port, channelSize, bufferSize)
接口方法, 创建 RemoteClient 对象
- 2.4 RemoteClient
- org.skywalking.apm.collector.remote.service.RemoteClient
, 继承
java.lang.Comparable
接口, 远程客户端接口定义了如下接口方法:
#push(graphId, nodeId, data, selector)
接口方法, 发送数据
- #getAddress() 接口方法, 返回客户端连接的远程 Collector 地址
- #equals(address) 接口方法, 判断 RemoteClient 是否连接了指定的地址
- 2.5 CommonRemoteDataRegisterService
在说 CommonRemoteDataRegisterService 之前, 首先来说下 CommonRemoteDataRegisterService 的意图
在上文中, 我们可以看到发送给 Collector 是 Data 对象, 而 Data 是数据的抽象类, 在具体反序列化 Data 对象之前, 程序是无法得知它是 Data 的哪个实现对象这个时候, 我们可以给 Data 对象的每个实现类, 生成一个对应的数据协议编号
在发送数据之前, 序列化 Data 对象时, 增加该 Data 对应的协议编号, 一起发送
在接收数据之后, 反序列化数据时, 根据协议编号, 创建 Data 对应的实现类对象
org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService
, 通用远程数据注册服务
id 属性, 数据协议自增编号
dataClassMapping 属性, 数据类型 ( Class<? extends Data> ) 与数据协议编号的映射
dataInstanceCreatorMapping
属性, 数据协议编号与数据对象创建器 ( RemoteDataInstanceCreator ) 的映射
- 2.5.1 RemoteDataRegisterService
- org.skywalking.apm.collector.remote.service.RemoteDataRegisterService
, 继承 Service 接口, 远程客户端服务接口, 定义了
#register(Class<? extends Data>, RemoteDataInstanceCreator)
接口方法, 注册数据类型对应的远程数据创建器(
RemoteDataRegisterService.RemoteDataInstanceCreator
)对象
CommonRemoteDataRegisterService 实现了 RemoteDataRegisterService 接口,
#register(Class<? extends Data>, RemoteDataInstanceCreator)
实现方法
另外, AgentStreamRemoteDataRegister 会调用
RemoteDataRegisterService#register(Class<? extends Data>, RemoteDataInstanceCreator)
方法, 注册每个数据类型的 RemoteDataInstanceCreator 对象注意, 例如 Application::new 是 RemoteDataInstanceCreator 的匿名实现类
- 2.5.2 RemoteDataIDGetter
- org.skywalking.apm.collector.remote.service.RemoteDataIDGetter
, 继承 Service 接口, 远程数据协议编号获取器接口, 定义了
#getRemoteDataId(Class<? extends Data>)
接口方法, 根据数据类型获取数据协议编号
CommonRemoteDataRegisterService 实现了 RemoteDataIDGetter 接口,
#getRemoteDataId(Class<? extends Data>)
实现方法
- 2.5.3 RemoteDataInstanceCreatorGetter
- org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter
, 继承 Service 接口, 远程数据创建器的获取器接口, 定义了
#getInstanceCreator(remoteDataId
接口方法, 根据数据协议编号获得远程数据创建器( RemoteDataInstanceCreator )
CommonRemoteDataRegisterService 实现了 RemoteDataInstanceCreatorGetter 接口,
#getInstanceCreator(remoteDataId)
实现方法
- 2.6 RemoteSerializeService
- org.skywalking.apm.collector.remote.service.RemoteSerializeService
, 远程通信序列化服务接口, 定义了 #serialize(Data) 接口方法, 序列化数据, 生成 Builder 对象
- 2.7 RemoteSerializeService
- org.skywalking.apm.collector.remote.service.RemoteDeserializeService
, 远程通信序反列化服务接口, 定义了
#deserialize(RemoteData, Data)
接口方法, 反序列化传输数据
- 3. collector-remote-grpc-provider
- collector-remote-grpc-provider
, 基于 Google gRPC 的远程通信实现
项目结构如下 :
默认配置, 在
application-default.yml
已经配置如下:
- remote:
- gRPC:
- host: localhost
- port: 11800
- 3.1 RemoteModuleGRPCProvider
- org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider
, 实现 ModuleProvider 抽象类, 基于 gRPC 的组件服务提供者实现类
#name() 实现方法, 返回组件服务提供者名为 "gRPC"
module() 实现方法, 返回组件类为 RemoteModule
- #requiredModules() 实现方法, 返回依赖组件为 cluster gRPC_manager
- #prepare(Properties)
实现方法, 执行准备阶段逻辑
第 53 至 56 行 : 创建 CommonRemoteDataRegisterService GRPCRemoteSenderService 对象, 并调用
#registerServiceImplementation()
父类方法, 注册到 services
#start() 实现方法, 执行启动阶段逻辑
Server 相关
第 65 行: 创建 gRPC Server 对象
第 67 行: 注册 RemoteCommonServiceHandler 对象到 gRPC Server 上, 用于接收 gRPC 请求后的处理
SkyWalking 源码分析 Collector Server Component 服务器组件 3. gRPC 实现
SkyWalking 源码分析 Collector gRPC Server Manager
注册发现相关
第 70 至 71 行: 创建
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration
对象, 将自己注册到集群管理这样, 自己可以被 Collector 集群节点发现, 从而被调用
第 73 至 74 行: 注册 GRPCRemoteSenderService 对象到集群管理这样, 自己可以监听到 Collector 集群节点的加入或离开, 从而调用
SkyWalking 源码分析 Collector Cluster 集群管理
#notifyAfterCompleted()
实现方法, 方法为空
- 3.2 GRPCRemoteSenderService
- org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService
, 继承 ClusterModuleListener 抽象类, 实现 RemoteSenderService 接口, 基于 gPRC 的远程发送服务实现类
3.2.1 注册发现
通过继承 ClusterModuleListener 抽象类, 实现了监听 Collector 集群节点的加入或离开
remoteClients 属性, 连接 Collector 集群节点的客户端数组每个 Collector 集群节点, 对应一个客户端
- #path() 实现方法, 返回监听的目录
- "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME
Collector 集群中, 每个节点的 Remote Server 都会注册到该目录下
#serverJoinNotify(serverAddress)
实现方法, 当新的节点加入, 创建新的客户端连接
#serverQuitNotify(serverAddress)
实现方法, 当老的节点离开, 移除对应的客户端连接
3.2.2 负载均衡
RemoteModuleGRPCProvider 基于不同的选择器 ( Selector ) , 提供不同的客户端选择(
org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector
)实现 :
hashCodeSelector 属性, HashCodeSelector , 基于数据的哈希码
foreverFirstSelector
属性, ForeverFirstSelector , 基于客户端数组的顺序, 选择第一个
rollingSelector 属性, RollingSelector , 基于客户端数组的顺序, 顺序向下选择
#send(graphId, nodeId, data, selector)
方法, 代码如下:
第 63 66 69 行: 根据选择器, 调用
RemoteClientSelector#select(clients, data)
方法, 选择客户端
第 64 67 70 行: 调用
#sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data)
方法, 发送请求数据
第 76 至 77 行: 当选择的客户端连接的是本地时, 不发送数据, 交给本地处理, 参见
RemoteWorkerRef#in(message)
方法
第 78 至 81 行: 当选择的客户端连接的是远程时, 调用
RemoteClient#push(graphId, nodeId, data)
方法, 发送数据
- 3.3 GRPCRemoteClientService
- org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService
, 实现 RemoteClientService 接口, 基于 gRPC 的远程客户端服务实现类
#create(host, port, channelSize, bufferSize)
实现方法, 创建 GRPCRemoteClient 对象
3.4 GRPCRemoteClient
友情提示: 本小节会涉及较多 gRPC 相关的知识, 建议不熟悉的胖友自己 Google , 补充下姿势
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient
, 实现 RemoteClient 接口, 基于 gRPC 的远程客户端实现类
client 属性, GRPCClient 对象相比来说, GRPCRemoteClient 偏业务的封装, 内部调用 GRPCClient 对象
carrier 属性, DataCarrier 对象, 本地消息队列 GRPCRemoteClient 在被调用发送数据时, 先提交到本地队列, 异步消费进行发送到远程 Collector 节点 DataCarrier 在 SkyWalking 源码分析 DataCarrier 异步处理库 详细解析
第 63 行: 调用
DataCarrier#consume(IConsumer, num)
方法, 设置消费者为 RemoteMessageConsumer 对象
#push(graphId, nodeId, data)
实现方法, 异步发送消息到远程 Collector
第 73 行: 调用
RemoteDataIDGetter#getRemoteDataId(Class<? extends Data>)
方法, 获得数据协议编号
第 76 至 80 行: 创建传输数据( RemoteMessage.Builder ) 对象 RemoteMessage 通过 Protobuf 创建定义, 如下图所示:
第 83 行: 调用
DataCarrier#produce(data)
方法, 发送数据到本地队列
RemoteMessageConsumer , 批量消费本地队列的数据, 逐条发送数据到远程 Collector 节点
#consume(List<RemoteMessage>)
实现方法, 代码如下:
第 100 行: 创建 StreamObserver 对象 StreamObserver 主要是 gPRC 相关的 API 的调用
第 101 至 103 行: 调用
io.grpc.stub.StreamObserver#onNext(RemoteMessage)
方法, 逐条发送数据
第 106 行: 调用
io.grpc.stub.StreamObserver#onCompleted()
方法, 全部请求数据发送完成
- 3.5 RemoteCommonServiceHandler
- org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler
, 实现
org.skywalking.apm.collector.server.grpc.GRPCHandler
接口, 继承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象类, 远程通信通用逻辑处理器
其中, RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在
RemoteCommonService.proto
文件的定义如下图:
#call(StreamObserver<Empty>)
实现方法, 代码如下:
#onNext(RemoteMessage)
方法, 处理每一条消息, 代码如下:
第 65 行: 调用
RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId)
方法, 获得数据协议编号对应的 RemoteDataInstanceCreator 对象然后, 调用
RemoteDataInstanceCreator#createInstance(id)
方法, 创建数据协议编号对应的 Data 实现类对应的对象
第 70 行: 调用
GraphManager#findGraph(graphId)
方法, 获得 graphId 对应的 Graph 对象然后, 调动
GraphNodeFinder#findNext(nodeId)
方法, 获得 Next 对象
第 71 行: 调用 Next#execute(Data) 方法, 继续流式处理
- 3.6 GRPCRemoteSerializeService
- org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService
, 实现 RemoteSerializeService 接口, 基于 gRPC 的远程通信序列化服务实现类
- 3.7 GRPCRemoteDeserializeService
- org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService
, 实现 GRPCRemoteDeserializeService 接口, 基于 gRPC 的远程通信反序列化服务实现类
- 4. collector-remote-grpc-provider
- collector-remote-kafka-provider
: 基于 Kafka 的远程通信实现
目前暂未完成
TODO 4005collector-remote-grpc-provider
666. 彩蛋
写的有丢丢烦躁, 不清晰或者错误的地方, 胖友望见谅
欢迎微信我一起交流
胖友, 分享一波朋友圈可好
来源: http://www.suo.im/4rKuaw