精简版
1
0
0
云栖社区>博客>正文
黛砚 2019-09-23 14:51:57 浏览 298
云栖社区
数据存储与数据库
系统研发与运维
算法
监控
函数
系统监控
数据库
索引
index
存储
时序数据库
时序
展开阅读全文
简介
时间序列数据库是目前技术发展最快的数据库类型之一. 作为业界最为流行的时序数据库 InfluxDB, 其部署运行十分简洁方便, 支持高性能时序数据的读写, 在应用程序监控, 物联网 (IoT) 领域有着广泛的应用.
阿里云 InfluxDB® 是基于开源版 InfluxDB 优化的版本, 内部的数据组织保持与开源版本一致, 可以简单概括为索引与数据两部分. 用户使用 Influxql 语句进行数据查询, InfluxDB 会解析这条语句并生成 AST 语法树, 通过语法树的遍历, 找到需要查询的 measurement,tag kv 等关键元数据信息, 通过 InfluxDB 的索引找到对应的时序数据所在的文件(TSM), 获得数据点, 返回给用户. 而索引的实现有两种, 一种是基于内存的倒排索引(inmem), 使用上受限于内存, 如果出现宕机的情况, 需要扫描解析所有 TSM 文件并在内存中重新构建, 恢复时间很长; 另一种则是基于文件的倒排索引(TSI), 内存占用较小, 可以支持百万甚至千万级别的时间线, 宕机恢复影响也很小.
本文将基于文件倒排索引 (TSI) 的索引类型, 深入介绍用户一次数据查询, 在 InfluxDB 内部的流程细节.
流程概述
Query 入口
InfluxDB 内部注册了许多服务, 其中 httpd service 是负责处理外部请求的服务, 通常情况下, 读写请求都会到达 services/httpd/handler.go 这里. 对于 select 语句来说, 调用的是 serveQuery.
查询准备
查询的准备工作, 包括计数器的更新, 参数解析, 解析生成 AST 树, 认证 4 个方面.
1, 计数统计
计数统计方面, 查询计数器 h.stats.QueryRequests 自增 1. 其次定义一个 defer, 用于在结束时统计当前查询的耗时.
- defer func(start time.Time) {
- atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
- }(time.Now())
2, 参数解析
Influxdb 需要对 query 提交的表单解析若干关键配置参数, 才能确定访问的数据库, 存储策略, 输出格式等详细信息:
参数名 | 作用 |
---|---|
q | 查询语句 |
node_id | 集群版节点 id,单机版无效 |
epoch | 查询输出格式,可选值有 epoch=[h,m,s,ms,u,ns] |
db | 待查询数据库 |
rp | 待查询存储策略 |
params | 附加参数 |
chunked | 用于控制返回流式批处理中的点而不是单个响应中的点。如果设置为 true,InfluxDB 将按系列或每 chunk_size 个点(默认 10000)分组响应。 |
chunk_size | 分组相应点数大小 |
async | 是否同步查询返回 |
3, 生成 AST 树
对于从表单提取的查询语句, InfluxDB 有自己的一套 Influxql 解析框架, 可生成类似传统关系型数据库内的 AST 树. 由于 Influxql 部分代码庞大, 与查询逻辑核心关系不大, 本文不做展开.
4, 认证
出于安全考虑, 用户可能开启了 InfluxDB 的认证配置. 只有对应权限的用户以及匹配正确密码之后, 才能访问相关的数据库.
执行查询
准备工作完成后, 最终会执行 coordinator/statement_executor.go 的 executeSelectStatement 函数, 这个函数是处理查询语句的. 从 services/httpd/handler.go 的 serveQuery 到 coordinator/statement_executor.go 的 executeSelectStatement, 经过了多层的函数调用. 为了方便读者阅读代码, 下图展示了调用的堆栈(如若遇到 go interface 部分的函数调用, 下图已替换为真正实现 interface 的结构体的函数).
在 executeSelectStatement 函数内部, 有几个关键操作:
(1)创建迭代器 createIterators, 创建过程中内部通过 TSI 访问 TSM 并 decode 部分数据内容("细节探究: 迭代器与 TSI,TSM" 小节深入分析);
(2)创建基于迭代器的 Emitter, 通过 Emitter 函数中循环调用 cursor 的 Scan()函数从 cursor 中一条一条数据地进行读取.
以下是 Emitter 读取数据时可能遇到的几种情况:
(a)如果是多个 series, 则一般是第 1 个 series 拿 1 个数据, 然后第 2 个拿 1 个数据...... 拿完后, 第 1 个 series 再拿第 2 个数据, 以此类推;
(b)当返回数据达到了查询设置的 chunkSize(查询参数中没有, 则使用默认的 10000)时, 提前先返回, 以此来做到分批返回结果;
(c)如果是查询类似 count 这种聚合操作, 那可能从 cur 中一共也就返回 1 条结果数据, 聚合工作放在了相关 iterator 的 reducer 操作中;
(d)如果已经 decode 的数据被读取完, 还需要继续读取, 则会迫使 cur 再次做底层 TSM 文件的 decode. 直到完成需要的数据读取.
查询返回
当查询结束后, 同步情况下从管道依次取出结果. 并处理 chunked 协议等返回协议, 最终返回给用户.
细节探究: 迭代器与 TSI,TSM
CreateIterator
我们提到执行 select 的主函数 executeSelectStatement 一个关键操作是创建迭代器(或者说是游标)cursor, 这一思想与传统关系型数据库执行计划树的执行过程有相似之处. cursor 的创建依赖于查询准备工作中由 InfluxQL 解析生成的 AST 树(SelectStatement). 创建过程也类似于传统关系型数据库, 分为 Prepare,Select 两部, 其中 Prepare 过程还可以细分为 Compile 与 Compile 之后的 Prepare;Select 则是基于 Prepare 后的 SelectStatement 来构建迭代器.
Prepare 过程
Prepare 过程先执行了 Compile,Compile 主要进行以下操作:
1, 预处理, 主要是解析, 校验和记录当前查询状态的全局属性, 例如解析查询的时间范围, 校验查询条件合法性, 校验聚合条件的合法性等等.
2,fields 预处理, 例如查询带 time 字段的会自动替换为 timstamp.
3, 重写 distinct 查询条件.
4, 重写正则表达式查询条件.
Compile 之后的 Prepare 主要进行以下操作:
1, 如果查询带聚合, 且配置了 max-select-buckets 限制, 且查询时间范围下界未指定时, 需要根据限制重写查询时间范围下界.
2, 如果配置了额外的查询间隔配置, 修正查询时间范围上下界.
3, 根据时间上下界获取需要查询的 shards map(LocalShardMapping 结构体对象).
4, 如果是模糊查询则替换 * 为所有可能的 tag key 与 field key(获取 tag key 时, 实际上已经访问了 TSI 索引).
5, 校验查询类型是否合法.
6, 确定与查询间隔 (group by time()) 匹配的开始和结束时间并再次校验查询 buckets 是否超过了 max-select-buckets 配置项.
Select 过程
在 InfluxDB 内部, 由 5 种 Iterator, 它们分别是 buildFieldIterator,buildAuxIterator,buildExprIterator,buildCallIterator 和 buildVarRefIterator. 它们根据不同的查询产生不同的创建过程, 彼此组成互相调用的关系, 并组成了最终的 cursor.
在 Select 函数构建 cursor 的过程中, 调用不断向下, 我们最终会来到 Engine.CreateIterator 函数的调用. Engine 与一个 shard 对应, 如果查询跨多个 shard, 在外层会遍历所有涉及本次查询的 shards(LocalShardMapping), 对每个 shard 对应的 Engine 执行 CreateIterator, 负责查询落在本 shard 的数据. 根据查询究竟是查数据, 还是聚合函数, Engine.CreateIterator 里会调用 createVarRefIterator 或者 createCallIterator.
它们最终都会调用 createTagSetIterators 函数, 调用之前, 会查询出所有的 series 作为调用参数(这里访问了索引 TSI). 接下来, 程序会以 series 和 CPU 核数的较小值为除数平分所有的 series, 然后调用 createTagSetGroupIterators 函数继续处理, 其内部会对分配到的 series 进行遍历, 然后对每一个 seriesKey 调用 createVarRefSeriesIterator 函数. 在 createVarRefSeriesIterator 函数中, 如果 ref 有值, 则直接调用 buildCursor 函数. 如果 ref 为 nil, 则 opt.Aux 参数包含了要查询的 fields, 所以对其进行遍历, 对每个 feild, 再调用 buildCursor 函数.
buildCursor 函数中, 先根据 measurement 查询到对应的 fields, 再根据 field name 从 fields 中查询到对应的 Field 结构 (包含 feild 的 ID, 类型等). 然后, 根据 field 是什么类型进行区别, 比如是 float 类型, 则调用 buildFloatCursor 函数. 各类型的 buildCursor 底层调用的实际是 TSM 文件访问的函数, 新建 cursor 对象时(newKeyCursor) 使用 fs.locations 函数返回所有匹配的 TSM 文件及其中的 block(KeyCursor.seeks), 读取数据时则是根据数据类型调用 peekTSM(),nextTSM()等 TSM 访问函数.
查询倒排索引 TSI
我们先看一张基于 TSI 的 InfluxDB 索引组织图(如下所示). 其中 db(数据库),rp(存储策略),shard,Index 在文件组织下都是以目录形式表现, TSI 使用了分区策略, 所以在 Index 文件夹下是 0~7 共计 8 个 partition 文件夹, partition 文件夹则是 TSI 文件与它的 WAL(TSL):
TSI 采用了类 LSM 机制, 操作都是以 append-only 方式以 LogEntry 格式写入到 WAL(TSL 文件, 其对应的 level 为 0), 修改和删除操作也是如此, 在后台 compaction(Level0 ~ Level 1)的过程中会进行真正的数据删除. 当 TSL 文件执行 compaction 操作时, 实际上是将 WAL 以 TSI 格式转化写入一个新的 TSI 文件中. TSI 文件 (Level1-Level6) 会定期由低层向高层作 compact, compact 的过程本质是将相同 measurement 的 tagbock 作在一起, 相同 measurement 的相同 tagkey 对应的所有 tagvalue 放在一起, 相同 measurement 的相同 tagkey 又相同 tagvalue 的不同 series id 作合并后放在一起.
我们来看一下 TSI 文件的内部结构, 以便了解当执行查询时, InfluxDB 是如何使用它来找到对应 measurement,tag key,tag value 以及相关数据文件 TSM 的.
index_file
首先我们看到 tsdb/index/tsi1/index_file.go 有一个很有意思的常量 IndexFileTrailerSize.
- IndexFileTrailerSize = IndexFileVersionSize +
- 8 + 8 + // measurement block offset + size
- 8 + 8 + // series id set offset + size
- 8 + 8 + // tombstone series id set offset + size
- 8 + 8 + // series sketch offset + size
- 8 + 8 + // tombstone series sketch offset + size
- 0
从它的定义我们很容易得出:
1,IndexFileTrailerSize 在 TSI 文件结尾处占固定的 82bytes, 我们在解析 TSI 时很容易读取到这个 Trailer.
2, 通过它的定义我们基本知道了一个 TSI 包含哪些部分:
这里我们仔细分析 1.7 以后的代码, 发现一个很有趣的问题, series id set 这个区域在查询期间没有起到由 series id 去查 series key 的作用. 实际上在 1.7 的 influxdb, 有一个特殊文件夹_series, 在这个文件夹中才存放了 series id 到 series key 的映射.
measurement block
我们再来看 measurement block, 其定义是在 tsdb/index/tsi1/measurement_block.go, 我们也很容易发现 measurement block 也是由存储类似 meta 信息的 Trailer 以及其他各部分组成.
- MeasurementTrailerSize = 0 +
- 2 + // version
- 8 + 8 + // data offset/size
- 8 + 8 + // hash index offset/size
- 8 + 8 + // measurement sketch offset/size
- 8 + 8 // tombstone measurement sketch offset/size
(1)Trailer 部分是整个 MeasuermentBlock 的索引, 存储着其他部分的 offset 和 size.
(2)data offset/size 部分, 是所有的 MeasurementBlockElement 的集合. MeasurementBlockElement 内包含了 measurement 的名字, 对应 tag 的集合以及在文件中的 offset 和 size, 当前 measurement 下所有的 series id 信息.
(3)hash index 部分以 hash 索引方式存储了 MeasurementBlockElement 在文件中的 offset, 可在不读取整体 tsi 的情况下快速定位某个 MeasurementBlockElement 文件偏移.
(4)measurement sketch 和 tombstone measurement sketch 是使用 HyperLogLog++ 算法来作基数统计用.
Tag block
我们再来看 Tag block, 其定义是在 tsdb/index/tsi1/tag_block.go 中, 同样有类似的 trailer 定义:
- const TagBlockTrailerSize = 0 +
- 8 + 8 + // value data offset/size
- 8 + 8 + // key data offset/size
- 8 + 8 + // hash index offset/size
- 8 + // size
- 2 // version
(1)Trailer 相当于 tag block 的 meta 信息, 保存其它各组成部分的 offset 和大小.
(2)key data 部分是 tag key 数据块部分, 其内部有二级 hash index, 可以通过 tag key 快速定位到指定 tag key block 部分, Data offset,Data size 部分指向了当前 tag key 对应的所有的 tag value block 文件区域.
(3)value data 的设计与 key data 部分类似. 在 tag value block 的内部, 有我们最关注的 series id set.
(4)hash index 部分, 可以通过 tag key 快速定位到 tag key block 的 offsset.
_series 文件夹
原 series id set 区域用于存储整个数据库中所有的 SeriesKey, 这可能是历史遗留的问题. 1.7.7 版本的 influxdb, 有一个特殊文件夹_series, 在这个文件夹中才存放了 series id 到 series key 的映射.
查看_series 文件夹目录结构, 也是和 tsi 类似, 分为 8 个 partition. 最新版本的 influxdb 通过_series 文件夹的 series 文件检索 series id 到 series key 的映射.
分组并发
根据 TSI 文件倒排索引查询得到所有的 SeriesKey 之后, 要根据 groupby 条件对 SeriesKey 进行分组, 分组算法为 hash. 分组后不同 group 的 SeriesKey 允许并行独立执行查询并最终执行聚合的, 借此大幅提升整个查询的性能.
小结
最后我们来总结一下, TSI 文件格式的设计, 是一种多级索引的方式, 每一层级都设计了 Trailer, 方便快速找到不同区域的偏移; 每个分区内又有各自的 Trailer,measurement block,Tag block,Tag block 下的 key data 都设计了 hash 索引加快查询文件偏移.
对于一次查询, 我们根据 measurement 在 measurement block 中找到对应 MeasurementBlockElement 下的 tag set, 根据查询条件中 tag key 进行过滤, 再在对应的 tag block 中找到关联的全部 tag value block. 在 tag value block 中取得 series id set, 根据 series id 到_serise 文件夹找到本次查询涉及到的全部 SeriesKey.
TSM 数据检索
我们先来看一下 TSM 的设计. 一个 TSM 被设计为 4 个区域: Header,Blocks,Index 和 Footer.
其中 Header 是 4 位的 magic number(用于定义文件类型)以及 1 个 1 位的版本号.
在 Blocks 区域, 是一系列独立的数据 Block 组成, 每个 Block 包含一个 CRC32 算法生成的 checksum 保证 block 的完整性. data 内部, 时间戳与数据分开存储, 按不同的压缩算法进行压缩. 拆解图如下:
在 Index 区域, 存放的是 Blocks 区域的索引. Index 区由一系列的 index entries 组成, 它们先按 key 后按时间戳, 以字典顺序排序. 一条 index entry 内部组成包括: key 长度, key, 数据类型, 当前 blocks 数量, 本 block 的最大最小时间, block 文件偏移, block 长度.
footer 区域存储索引区的文件偏移.
TSM 文件的索引层, 在 InfluxDB 启动之后就会全部加载到内存之中, 数据部分则因消耗内存过大不加载. 数据检索一般安装以下几个步骤执行:
1, 首先根据 TSI 查询获得的所有 SeriesKey 找到所有对应的 Index 中的 Index Entry, 由于 Key 是有序的, 因此可以使用二分查找来具体实现.
2, 找到所有 Index Entry 之后, 再根据查找的时间范围, 使用 [MinTime, MaxTime] 过滤剩余需要的 Index Entries.
3, 通过 Index Entries 定位到可能的 Data Blocks 列表.
4, 将满足条件的 Data Blocks 加载到内存中, 用对应数据类型的解压算法解压数据, 进一步使用二分查找算法查找即可找到.
总结
InfluxDB 的查询流程是一个较为复杂的过程, 源码实现的逻辑精妙, 模块划分清晰, 很适合时序数据库领域的开发者深入学习其中思想. 由于篇幅有限, 本文涉及到的部分并不完整, 很多细节需要读者在阅读过程中参照社区开源代码对比研究, 由于笔者学识有限也可能有描述失误的地方, 欢迎大家斧正.
阿里云 InfluxDB® 现已正式商业化, 欢迎访问购买页面 (https://common-buy.aliyun.com/?commodityCode=hitsdb_influxdb_pre#/buy) 与文档(https://help.aliyun.com/document_detail/113093.html?spm=a2c4e.11153940.0.0.57b04a02biWzGa).
来源: https://yq.aliyun.com/articles/719105