- RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
本文主要分享 SkyWalking Collector Storage 存储组件。顾名思义,负责将调用链路、应用、应用实例等等信息存储到存储器,例如,ES 、H2 。
友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。
下面我们来看看整体的项目结构,如下图所示 :
:定义存储组件接口。
- collector - storage - define
:基于 H2 的 存储组件实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用。
- collector - storage - h2 - provider
:基于 Elasticsearch 的集群管理实现。生产环境推荐使用。
- collector - storage - es - provider
下面,我们从接口到实现的顺序进行分享。
apm-collector-core 的 data 和 define 包,如下图所示:
我们对类进行梳理分类,如下图:
org.skywalking.apm.collector.core.data.CommonTable ,通用表。
在
的 table 包下,我们可以看到所有 Table 类,以 "Table" 结尾。每个 Table 的表名,在每个实现类里,例如 ApplicationTable 。
- collector - storage - define
,表定义抽象类。
- org.skywalking.apm.collector.core.data.TableDefine
不同的存储组件实现,有不同的 TableDefine 实现类,如下图:
的 define 包下,我们可以看到所有 ES 的 TableDefine 类。
- collector - storage - es - provider
的 define 包下,我们可以看到所有 H2 的 TableDefine 类。
- collector - storage - h2 - provider
org.skywalking.apm.collector.core.data.ColumnDefine ,字段定义抽象类。
在
模块中,H2ColumnDefine 、ElasticSearchColumnDefine 实现 ColumnDefine 。
- collector - storage - xxx - provider
涉及到的类如下图所示:
org.skywalking.apm.collector.core.data.StorageDefineLoader ,调用 org.skywalking.apm.collector.core.define.DefinitionLoader ,从 org.skywalking.apm.collector.core.data.StorageDefinitionFile 中,加载 TableDefine 实现类数组。
另外,在
和
- collector - storage - es - provider
里都有 storage.define 文件,如下图:
- collector - storage - h2 - provider
方法,进行过滤。
- StorageInstaller#defineFilter(List < TableDefine > )
代码比较简单,中文注释已加,胖友自己阅读理解下。
,数据抽象类。
- org.skywalking.apm.collector.core.data.Data
在
的 table 包下,我们可以看到所有 Data 类,非 "Table" 结尾,例如 Application 。
- collector - storage - define
org.skywalking.apm.collector.core.data.Column ,字段。
org.skywalking.apm.collector.core.data.Operation ,操作接口。用于两个值之间的操作,例如,相加等等。目前实现类有:
:定义存储组件接口。项目结构如下 :
- collector - cluster - define
,实现 Module 抽象类,集群管理 Module 。
- org.skywalking.apm.collector.storage.StorageModule
#name() 实现方法,返回模块名为 "storage" 。
#services() 实现方法,返回 Service 类名:在 org.skywalking.apm.collector.storage.dao 包下的所有类 和 IBatchDAO。
在 org.skywalking.apm.collector.storage.table 包下,定义了存储模块所有的 Table 和 Data 实现类。
,存储安装器抽象类,基于 TableDefine ,初始化存储组件的表。
- org.skywalking.apm.collector.storage.StorageInstaller
在
项目结构图,我们看到一共有两个 bao 包:
- collector - storage - define
,系统的 DAO 接口。
- org.skywalking.apm.collector.storage.base.dao
,业务的 DAO 接口。
- org.skywalking.apm.collector.storage.dao
的 dao 包实现。
- collector - storage - xxx - provider
org.skywalking.apm.collector.storage.base.dao.DAO ,继承 Service 接口,DAO 接口。
无任何方法。
org.skywalking.apm.collector.storage.base.dao.AbstractDAO ,实现 DAO 接口,DAO 抽象基类。
在
模块中,H2DAO 、EsDAO 实现 AbstractDAO 。
- collector - storage - xxx - provider
,实现 DAO 接口,持久化 DAO 接口,定义了 Data 的增删改查操作。
- org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO
对象。注意:
- org.elasticsearch.action.index.IndexRequestBuilder
。
- IBatchDAO#batchPersistence(List < ?>)
方法。
- #prepareBatchInsert(data)
,实现 DAO 接口,批量操作 DAO 接口。
- org.skywalking.apm.collector.storage.base.dao.IBatchDAO
或
- IPersistenceDAO#prepareBatchInsert
方法,生成每个操作数组元素。
- IPersistenceDAO#prepareBatchUpdate
或
- PersistenceTimer#extractDataAndSave(...)
方法调用,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4. PersistenceWorker」 详细解析。
- PersistenceWorker#onWork(...)
在
模块中,BatchH2DAO 、BatchEsDAO 实现 IBatchDAO 。
- collector - storage - xxx - provider
在 StorageModule#services() 方法里,我们可以看到,业务 DAO 按照用途可以拆分成四种:
那么整理如下:
Package | Data | Cache / Register | Persistence | UI | 关联文章 |
---|---|---|---|---|---|
register | Application | √ | |||
register | Instance | √ | √ | √ | |
register | ServiceName | √ | |||
jvm | CpuMetric | √ | √ | ||
jvm | CMetric | √ | √ | ||
jvm | MemoryMetric | √ | √ | ||
jvm | MemoryPoolMetric | √ | √ | ||
global | GlobalTrace | √ | √ | ||
instance | InstPerformance | √ | √ | ||
node | NodeComponent | √ | √ | ||
node | NodeMapping | √ | √ | ||
noderef | NodeReference | √ | √ | ||
segment | SegmentCost | √ | √ | ||
segment | Segment | √ | √ | ||
service | ServiceEntry | √ | √ | ||
serviceref | ServiceReference | √ | √ |
,基于 H2 的存储组件实现。项目结构如下 :
- collector - storage - h2 - provider
该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用。
由于生产环境主要使用 ES 的存储组件实现,所以本文暂不解析相关实现,感兴趣的胖友自己嗨起来。
,基于 ES 的存储组件实现。项目结构如下 :
- collector - storage - es - provider
实际使用时,通过 application.yml 配置如下:
、cluster_nodes 、index_shards_number 、
- cluster_transport_sniffer
参数,Elasticsearch 相关参数。
- index_replicas_number
,实现 ModuleProvider 抽象类,基于 ES 的存储组件服务提供者。
- org.skywalking.apm.collector.storage.es.StorageModuleEsProvider
#name() 实现方法,返回组件服务提供者名为 "elasticsearch" 。
module() 实现方法,返回组件类为 StorageModule 。
#requiredModules() 实现方法,返回依赖组件为 "cluster" 。
#prepare(Properties) 实现方法,执行准备阶段逻辑。
父类方法,注册到 services 。
- #registerServiceImplementation()
#start() 实现方法,执行启动阶段逻辑。
方法,初始化 ZookeeperClient 。
- ElasticSearchClient#initialize()
#notifyAfterCompleted() 实现方法,执行启动完成逻辑。
方法,启动 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 详细解析。
- DataTTLKeeperTimer#start()
在
项目结构图,我们看到一共有两个 define 包:
- collector - storage - es - provider
,系统的 TableDefine 抽象类。
- org.skywalking.apm.collector.storage.es.base.define
,业务的 TableDefine 实现类。
- org.skywalking.apm.collector.storage.es.define
,实现 TableDefine 接口,基于 Elasticsearch 的表定义抽象类。
- org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine
,实现 ColumnDefine 抽象类,基于 ES 的字段定义。
- org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine
在 org.apache.skywalking.apm.collector.storage.es.define 包里,我们可以看到,所有基于 ES 的业务 TableDefine 实现类。例如: ApplicationEsTableDefine 。
整体 #refreshInterval() 方法返回的结果如下:
友情提示:ElasticSearchStorageInstaller 主要是对 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。
,实现 StorageInstaller 抽象类, 基于 ES 存储安装器实现类。
- org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller
在
项目结构图,我们看到一共有两个 dao 包:
- collector - storage - es - provider
,系统的 DAO 抽象类。
- org.skywalking.apm.collector.storage.es.base.dao
,业务的 DAO 实现类。
- org.skywalking.apm.collector.storage.es.dao
,实现 AbstractDAO 抽象类,基于 ES 的 DAO 抽象类。
- org.skywalking.apm.collector.storage.es.base.dao.EsDAO
,实现 IBatchDAO 接口,继承 EsDAO 抽象类,基于 ES 批量操作 DAO 实现类。
- org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO
和
- org.elasticsearch.action.index.IndexRequestBuilder
数组,创建成
- org.elasticsearch.action.index.UpdateRequestBuilder
对象,批量持久化。
- org.elasticsearch.action.bulk.BulkRequestBuilder
在 org.apache.skywalking.apm.collector.storage.es.dao 包里,我们可以看到,所有基于 ES 的业务 DAO 实现类。
实现代码易懂,胖友可以自己阅读。良心如我们,按照 DAO 的业务用途,推荐例子如下:
,过期数据删除定时器。通过该定时器,只保留 N 天内的数据。
- org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer
如下是不会删除的数据的表:
来源: http://www.suo.im/9c5ww