本文主要基于 SkyWalking 3.2.6 正式版
1. 概述
- 2. apm-collector-core
- 2.1 Table
- 2.2 TableDefine
- 2.3 Data
- 3. collector-storage-define
- 3.1 StorageModule
3.2 table 包
3.3 StorageInstaller
3.4 dao 包
- 4. collector-storage-h2-provider
- 5. collector-storage-es-provider
- 5.1 StorageModuleEsProvider
5.2 define 包
5.3 dao 包
5.4 DataTTLKeeperTimer
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 SkyWalking Collector Storage 存储组件顾名思义, 负责将调用链路应用应用实例等等信息存储到存储器, 例如, ES H2
友情提示: 建议先阅读 SkyWalking 源码分析 Collector 初始化 , 以了解 Collector 组件体系
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构, 如下图所示 :
apm-collector-core 的 data 和 define 包 : 数据的抽象
collector-storage-define
: 定义存储组件接口
collector-storage-h2-provider
: 基于 H2 的 存储组件实现该实现是单机版, 建议仅用于 SkyWalking 快速上手, 生产环境不建议使用
collector-storage-es-provider
: 基于 Elasticsearch 的集群管理实现生产环境推荐使用
下面, 我们从接口到实现的顺序进行分享
2. apm-collector-core
apm-collector-core 的 data 和 define 包, 如下图所示:
我们对类进行梳理分类, 如下图:
Table :Data 和 TableDefine 之间的桥梁, 每个 Table 定义了该表的表名, 字段名们
TableDefine :Table 的详细定义, 包括表名, 字段定义 ( ColumnDefine ) 们在下文中, StorageInstaller 会基于 TableDefine 初始化表的相关信息
Data : 数据, 包括一条数据的数据值们和数据字段 ( Column ) 们在下文中, Dao 会存储 Data 到存储器中另外, 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(一) 中, 我们也会看到对 Data 的流式处理通用封装
- 2.1 Table
- org.skywalking.apm.collector.core.data.CommonTable
, 通用表
TABLE_TYPE 静态属性, 表类型目前只有 ES 存储组件使用到, 下文详细解析
COLUMN_ 前缀的静态属性, 通用的字段名
在
collector-storage-define
的 table 包下, 我们可以看到所有 Table 类, 以 "Table" 结尾每个 Table 的表名, 在每个实现类里, 例如 ApplicationTable
- 2.2 TableDefine
- org.skywalking.apm.collector.core.data.TableDefine
, 表定义抽象类
name 属性, 表名
columnDefines 属性, ColumnDefine 数组
#initialize() 抽象方法, 初始化表定义例如: ApplicationEsTableDefine
不同的存储组件实现, 有不同的 TableDefine 实现类, 如下图:
ElasticSearchTableDefine : 基于 Elasticsearch 的表定义抽象类, 在
collector-storage-es-provider
的 define 包下, 我们可以看到所有 ES 的 TableDefine 类
H2TableDefine : 基于 H2 的表定义抽象类, 在
collector-storage-h2-provider
的 define 包下, 我们可以看到所有 H2 的 TableDefine 类
- 2.2.1 ColumnDefine
- org.skywalking.apm.collector.core.data.ColumnDefine
, 字段定义抽象类
name 属性, 字段名
type 属性, 字段类型
在
collector-storage-xxx-provider
模块中, H2ColumnDefine ElasticSearchColumnDefine 实现 ColumnDefine
2.2.2 Loader
涉及到的类如下图所示:
org.skywalking.apm.collector.core.data.StorageDefineLoader
, 调用
org.skywalking.apm.collector.core.define.DefinitionLoader
, 从
org.skywalking.apm.collector.core.data.StorageDefinitionFile
中, 加载 TableDefine 实现类数组
另外, 在
collector-storage-es-provider
和
collector-storage-h2-provider
里都有 storage.define 文件, 如下图:
StorageDefinitionFile 声明了读取该文件
注意, DefinitionLoader 在加载时, 两个文件都会被读取, 最终在
StorageInstaller#defineFilter(List<TableDefine>)
方法, 进行过滤
代码比较简单, 中文注释已加, 胖友自己阅读理解下
- 2.3 Data
- org.skywalking.apm.collector.core.data.Data
, 数据抽象类
dataXXX 前缀的属性, 字段值们
dataStrings 属性的第一位, 是 ID 属性参见 构造方法的第 51 行 或者 #setId(id) 方法
xxxColumns 后缀的属性, 字段 ( Column ) 们
通过上述两种属性 + 自身类, 可以确定一条数据记录的表字段类型字段名字段值
继承
org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage
, 带是否消息批处理的最后一条标记的消息抽象类, endOfBatch 属性, 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(二)3. AggregationWorker 详细解析
继承
org.skywalking.apm.collector.core.data.AbstractHashMessage
, 带哈希码的消息抽象类, hashCode 属性, 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(二)3. AggregationWorker 详细解析
- #mergeData(Data) 方法, 合并传入的数据到自身该方法被
- AggregationWorker#aggregate(message)
调用, 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(二)3. AggregationWorker 详细解析
在
collector-storage-define
的 table 包下, 我们可以看到所有 Data 类, 非 "Table" 结尾, 例如 Application
- 2.3.1 Column
- org.skywalking.apm.collector.core.data.Column
, 字段
name 属性, 字段名
operation 属性, 操作( Operation )
- 2.3.2 Operation
- org.skywalking.apm.collector.core.data.Operation
, 操作接口用于两个值之间的操作, 例如, 相加等等目前实现类有:
AddOperation : 值相加操作
CoverOperation : 值覆盖操作, 即以新值为返回
NonOperation : 空操作, 即以老值为返回
- 3. collector-storage-define
- collector-cluster-define
: 定义存储组件接口项目结构如下 :
- 3.1 StorageModule
- org.skywalking.apm.collector.storage.StorageModule
, 实现 Module 抽象类, 集群管理 Module
- #name() 实现方法, 返回模块名为 "storage"
- #services() 实现方法, 返回 Service 类名: 在 org.skywalking.apm.collector.storage.dao 包下的所有类 和 IBatchDAO
3.2 table 包
在
org.skywalking.apm.collector.storage.table
包下, 定义了存储模块所有的 Table 和 Data 实现类
- 3.3 StorageInstaller
- org.skywalking.apm.collector.storage.StorageInstaller
, 存储安装器抽象类, 基于 TableDefine , 初始化存储组件的表
#defineFilter(List<TableDefine>)
抽象方法, 过滤 TableDefine 数组中, 非自身需要的例如说, ElasticSearchStorageInstaller 过滤后, 只保留 ElasticSearchTableDefine 对象
#isExists(Client, TableDefine)
抽象方法, 判断表是否存在
#deleteTable(Client, TableDefine)
抽象方法, 删除表
#createTable(Client, TableDefine)
抽象方法, 创建表
#install(Client) 方法, 基于 TableDefine , 初始化存储组件的表
该方法会被 StorageModuleH2Provider 或 StorageModuleEsProvider 启动时调用
3.4 dao 包
在
collector-storage-define
项目结构图, 我们看到一共有两个 bao 包:
org.skywalking.apm.collector.storage.base.dao
, 系统的 DAO 接口
org.skywalking.apm.collector.storage.dao
, 业务的 DAO 接口
继承系统的 DAO 接口
被
collector-storage-xxx-provider
的 dao 包实现
3.4.1 系统 DAO
org.skywalking.apm.collector.storage.base.dao.DAO
, 继承 Service 接口, DAO 接口
无任何方法
- 3.4.1.1 AbstractDAO
- org.skywalking.apm.collector.storage.base.dao.AbstractDAO
, 实现 DAO 接口, DAO 抽象基类
client 属性, 数据操作客户端例如, H2Client ElasticSearchClient
在
collector-storage-xxx-provider
模块中, H2DAO EsDAO 实现 AbstractDAO
- 3.4.1.2 IPersistenceDAO
- org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO
, 实现 DAO 接口, 持久化 DAO 接口, 定义了 Data 的增删改查操作
- #get(id) 接口方法, 根据 ID 查询一条 Data
- #deleteHistory(startTimestamp, endTimestamp)
接口方法, 删除时间范围内的 Data 们
#prepareBatchInsert(data)
接口方法, 准备批量插入操作对象例如:
CpuMetricEsPersistenceDAO#prepareBatchInsert(CpuMetric)
方法, 返回的是
org.elasticsearch.action.index.IndexRequestBuilder
对象注意:
该方法不会发起具体的 DAO 操作, 仅仅是创建插入操作对象, 最终的执行在
IBatchDAO#batchPersistence(List<?>)
该方法创建的是批量插入操作对象们中的一个
#prepareBatchUpdate(data)
接口方法, 准备批量更新操作对象类似
#prepareBatchInsert(data)
方法
- 3.4.1.3 IBatchDAO
- org.skywalking.apm.collector.storage.base.dao.IBatchDAO
, 实现 DAO 接口, 批量操作 DAO 接口
#batchPersistence(List<?> batchCollection)
接口方法, 通过执行批量操作对象数组, 实现批量持久化数据
batchCollection 方法参数, 通过
IPersistenceDAO#prepareBatchInsert
或
IPersistenceDAO#prepareBatchUpdate
方法, 生成每个操作数组元素
该方法会被
PersistenceTimer#extractDataAndSave(...)
或
PersistenceWorker#onWork(...)
方法调用, 在 SkyWalking 源码分析 Collector Streaming Computing 流式处理(二)4. PersistenceWorker 详细解析
在
collector-storage-xxx-provider
模块中, BatchH2DAO BatchEsDAO 实现 IBatchDAO
3.4.2 业务 DAO
在
StorageModule#services()
方法里, 我们可以看到, 业务 DAO 按照用途可以拆分成四种:
Cache : 缓存应用应用实例服务名
Register : 注册应用应用实例服务名
Persistence : 持久化, 实际可以理解成批量持久化
UI :SkyWaling UI 查询使用
那么整理如下:
Package | Data | Cache / Register | Persistence | UI | 关联文章 |
---|---|---|---|---|---|
register | Application | √ | |||
register | Instance | √ | √ | √ | |
register | ServiceName | √ | |||
jvm | CpuMetric | √ | √ | ||
jvm | CMetric | √ | √ | ||
jvm | MemoryMetric | √ | √ | ||
jvm | MemoryPoolMetric | √ | √ | ||
global | GlobalTrace | √ | √ | ||
instance | InstPerformance | √ | √ | ||
node | NodeComponent | √ | √ | ||
node | NodeMapping | √ | √ | ||
noderef | NodeReference | √ | √ | ||
segment | SegmentCost | √ | √ | ||
segment | Segment | √ | √ | ||
service | ServiceEntry | √ | √ | ||
serviceref | ServiceReference | √ | √ |
- 4. collector-storage-h2-provider
- collector-storage-h2-provider
, 基于 H2 的存储组件实现项目结构如下 :
该实现是单机版, 建议仅用于 SkyWalking 快速上手, 生产环境不建议使用
由于生产环境主要使用 ES 的存储组件实现, 所以本文暂不解析相关实现, 感兴趣的胖友自己嗨起来
- 5. collector-storage-es-provider
- collector-storage-es-provider
, 基于 ES 的存储组件实现项目结构如下 :
实际使用时, 通过 application.yml 配置如下:
- storage:
- elasticsearch:
- cluster_name: elasticsearch
- cluster_transport_sniffer: true
- cluster_nodes: 127.0.0.1:9300
- index_shards_number: 2
- index_replicas_number: 0
- ttl: 7
生产环境下, 推荐 Elasticsearch 配置成集群
- cluster_name
- cluster_transport_sniffer
- cluster_nodes index_shards_number
- index_replicas_number
参数, Elasticsearch 相关参数
ttl : 保留 N 天内的数据超过 N 天的数据, 将被自动滚动删除
该功能目前版本暂未发布, 需要等到 5.0 版本后
部署集群 collector
- 5.1 StorageModuleEsProvider
- org.skywalking.apm.collector.storage.es.StorageModuleEsProvider
, 实现 ModuleProvider 抽象类, 基于 ES 的存储组件服务提供者
#name() 实现方法, 返回组件服务提供者名为 "elasticsearch"
module() 实现方法, 返回组件类为 StorageModule
- #requiredModules() 实现方法, 返回依赖组件为 "cluster"
- #prepare(Properties)
实现方法, 执行准备阶段逻辑
第 71 至 75 行 : 创建
org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient
对象
第 77 至 82 行 : 创建 DAO 对象们, 并调用
#registerServiceImplementation()
父类方法, 注册到 services
#start() 实现方法, 执行启动阶段逻辑
第 90 行 : 调用
ElasticSearchClient#initialize()
方法, 初始化 ZookeeperClient
第 93 至 94 行 : 创建 ElasticSearchStorageInstaller 对象, 初始化存储组件的表在 5.2.4 ElasticSearchStorageInstaller 详细解析
第 100 至 102 行 : 创建
org.skywalking.apm.collector.storage.es.StorageModuleEsRegistration
对象, 并注册信息到集群管理在 SkyWalking 源码分析 Collector Cluster 集群管理 有详细解析
第 105 至 107 行 : 创建
org.skywalking.apm.collector.storage.es.StorageModuleEsNamingListener
对象, 并注册信息到集群管理在 SkyWalking 源码分析 Collector Cluster 集群管理 有详细解析
第 110 至 111 行 : 创建 DataTTLKeeperTimer 对象在 5.4 DataTTLKeeperTimer 详细解析
#notifyAfterCompleted()
实现方法, 执行启动完成逻辑
第 115 行 : 调用
DataTTLKeeperTimer#start()
方法, 启动 DataTTLKeeperTimer 在本文 5.4 DataTTLKeeperTimer 详细解析
5.2 define 包
在
collector-storage-es-provider
项目结构图, 我们看到一共有两个 define 包:
org.skywalking.apm.collector.storage.es.base.define
, 系统的 TableDefine 抽象类
org.skywalking.apm.collector.storage.es.define
, 业务的 TableDefine 实现类
继承系统的 TableDefine 抽象类
- 5.2.1 ElasticSearchTableDefine
- org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine
, 实现 TableDefine 接口, 基于 Elasticsearch 的表定义抽象类
- #type() 方法, 文档元数据 _type 字段, 参见 Elasticsearch 学习笔记_type
- #refreshInterval() 抽象方法, 文档索引刷新频率, 参见 Elasticsearch: 权威指南 » 基础入门 » 分片内部原理 » 近实时搜索 refresh API
- 5.2.2 ElasticSearchColumnDefine
- org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine
, 实现 ColumnDefine 抽象类, 基于 ES 的字段定义
Type 枚举类: 枚举 ES 字段类型
5.2.3 业务 TableDefine 实现类
在
org.apache.skywalking.apm.collector.storage.es.define
包里, 我们可以看到, 所有基于 ES 的业务 TableDefine 实现类例如: ApplicationEsTableDefine
整体 #refreshInterval() 方法返回的结果如下:
- 1 s
- CpuMetricEsTableDefine
- GCMetricEsTableDefine
- MemoryMetricEsTableDefine
- MemoryPoolMetricEsTableDefine
- 2 s
- InstPerformanceEsTableDefine
- NodeComponentEsTableDefine
- NodeMappingEsTableDefine
- NodeReferenceEsTableDefine
- ServiceEntryEsTableDefine
- ServiceReferenceEsTableDefine
- 2 s && WriteRequest.RefreshPolicy.IMMEDIATE
WriteRequest.RefreshPolicy.IMMEDIATE 参见
ApplicationEsRegisterDAO#save(Application)
方法
- ApplicationEsTableDefine
- InstanceEsTableDefine
- ServiceNameEsTableDefine
- 5 s
- GlobalTraceEsTableDefine
- SegmentCostEsTableDefine
- 10 s
- SegmentEsTableDefine
- 5.2.4 ElasticSearchStorageInstaller
友情提示: ElasticSearchStorageInstaller 主要是对 Elasticsearch Java API 的使用, 所以不熟悉的胖友, 可以 Google 下
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller
, 实现 StorageInstaller 抽象类, 基于 ES 存储安装器实现类
#defineFilter(List<TableDefine>)
实现方法, 过滤数组中, 非 ElasticSearchTableDefine 的元素
#createTable(Client, TableDefine)
实现方法, 创建 Elasticsearch 索引
文档数据结构如下:
_id : 数据编号, String 类型
_type :"type"
_index :TableDefine 定义的表名
source :Data 数据
了解 Elasticsearch 的胖友可能有和笔者一样的疑惑, 网络上很多文章把 _index 类比成关系数据库的 DB ,_type 类比成关系数据库的 Table , 和 SkyWalking 目前使用的方式不一致?
SkyWalking 彭勇升 :_index 和 _type 是 ES 特有的, 考虑其他数据库接入, 所以没有用他这个特性
SkyWalking QQ 交流群( 392443393 ) , 小心 群友 :_type 本来就没做物理隔离, Lucene 层面也不存在, ES 6.x 已经废弃了
Elasticsearch 6.0 将移除 Type
#deleteTable(Client, TableDefine)
实现方法, 删除 Elasticsearch 索引
#isExists(Client, TableDefine)
实现方法, 判断 Elasticsearch 索引是否存在
在方法里, 笔者添加了一些 API 的说明, 不熟悉的胖友, 可以仔细阅读理解
5.3 dao 包
在
collector-storage-es-provider
项目结构图, 我们看到一共有两个 dao 包:
org.skywalking.apm.collector.storage.es.base.dao
, 系统的 DAO 抽象类
org.skywalking.apm.collector.storage.es.dao
, 业务的 DAO 实现类
继承系统的 DAO 抽象类
- 5.3.1 EsDAO
- org.skywalking.apm.collector.storage.es.base.dao.EsDAO
, 实现 AbstractDAO 抽象类, 基于 ES 的 DAO 抽象类
#getMaxId(indexName, columnName)
方法, 获得索引名的指定字段的最大值
#getMinId(indexName, columnName)
方法, 获得索引名的指定字段的最小值
- 5.3.2 BatchEsDAO
- org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO
, 实现 IBatchDAO 接口, 继承 EsDAO 抽象类, 基于 ES 批量操作 DAO 实现类
#batchPersistence(List<?>)
实现方法, 将
org.elasticsearch.action.index.IndexRequestBuilder
和
org.elasticsearch.action.index.UpdateRequestBuilder
数组, 创建成
org.elasticsearch.action.bulk.BulkRequestBuilder
对象, 批量持久化
IndexRequestBuilder 和 UpdateRequestBuilder 的创建, 在 5.3.3 业务 DAO 实现类 会看到
5.3.3 业务 DAO 实现类
在
org.apache.skywalking.apm.collector.storage.es.dao
包里, 我们可以看到, 所有基于 ES 的业务 DAO 实现类
实现代码易懂, 胖友可以自己阅读良心如我们, 按照 DAO 的业务用途, 推荐例子如下:
- Cache :ApplicationEsCacheDAO
- Register :ApplicationEsRegisterDAO
- Persistence :SegmentEsPersistenceDAO
此处可见 IndexRequestBuilder 和 UpdateRequestBuilder 的创建
- UI :SegmentEsUIDAO
- 5.4 DataTTLKeeperTimer
- org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer
, 过期数据删除定时器通过该定时器, 只保留 N 天内的数据
#start() 方法, 启动定时任务
第 49 行: 创建延迟 1 小时, 每 8 小时执行一次 #delete() 方法的定时任务目前该行代码被注释, 胖友可以等待 SkyWallking 5.0 版本的发布
#delete() 方法, 删除过期数据
第 54 至 66 行: 计算删除的开始与结束时间, 即指定时间的前一天例如, 2017-12-23 执行时, 删除 2017-12-16 那天的数据
第 69 行: 调用
#deleteJVMRelatedData(startTimestamp, endTimestamp)
方法, 删除 JVM 相关的数据
第 70 行: 调用
#deleteTraceRelatedData(startTimestamp, endTimestamp)
方法, 删除 Trace 相关的数据
如下是不会删除的数据的表:
- Application
- Instance
- ServiceName
- ServiceEntry
来源: http://www.suo.im/156p2R