InfluxDB 因为其特有的双 LSM 引擎而显得内部结构更加复杂, 写入流程相比其他数据库来说更加繁琐. 但只要理解了它的数据文件内部组织格式以及倒排索引文件内部组织格式, 相信对于整体的把握也并不是很难.
InfluxDB 写入总体框架
InfluxDB 提供了多种接口协议供外部应用写入, 比如可以使用 collected 采集数据上传, 可以使用 opentsdb 作为输入, 也可以使用 http 协议以及 udp 协议批量写入数据. 批量数据进入到 InfluxDB 之后总体会经过三个步骤的处理, 如下图所示:
1. 批量时序数据 shard 路由: InfluxDB 首先会将这些数据根据 shard 的不同分成不同的分组, 每个分组的时序数据会发送到对应的 shard. 每个 shard 相当于 HBase 中 region 的概念, 是 InfluxDB 中处理用户读写请求的单机引擎.
2. 倒排索引引擎构建倒排索引: InfluxDB 中 shard 由两个 LSM 引擎构成 - 倒排索引引擎和 TSM 引擎. 时序数据首先会经过倒排索引引擎构建倒排索引, 倒排索引用来实现 InfluxDB 的多维查询.
3. SM 引擎持久化时序数据: 倒排索引构建成功之后时序数据会进入 TSM Engine 处理. TMS Engine 处理流程和通用 LSM Engine 基本一样, 先将写入请求追加写入 WAL 日志, 再写入 cache, 一旦满足特定条件会将 cache 中的时序数据执行 flush 操作落盘形成 TSM File.
批量时序数据 Shard 路由
通常来说时序数据都会以批量的形式写入数据库, 很少会像关系型数据库那样一条一条写入, 这对于追求高吞吐的时序系统来说至关重要. 批量数据写入 InfluxDB 之后做的第一件事情是分组, 将时序数据点按照所属 shard 划分为多组(称为 Shard Map), 每组时序数据点将会发送给对应的 shard 引擎并发处理.
这里我们简单回顾下 InfluxDB 的 Sharding 策略(详见文章时序数据库技术体系 - 初识 InfluxDB中 Sharding 策略一节).InfluxDB 虽说是单机数据库, 但是每个表依然会被分为多个 shard. 简单来说, InfluxDB 中 sharding 属于两层 sharding: 首先按照时间进行 Range Sharding, 即按时间分片, 比如 7 天一个分片的话, 最近 7 天的数据会分到一个 shard, 一周前到两周前的数据会被分到上一个 shard, 以此类推; 在时间分片的基础上还可以再执行 Hash Sharding, 按照 SeriesKey 执行 Hash(保证同一个 SeriesKey 对应的所有数据都落到同一个 shard), 再将数据分散到指定的多个 shard 中.
当然, 经过笔者深进一步了解, 发现单机 InfluxDB 只有第一层 sharding, 即只有根据时间进行 Range Sharding, 并没有执行 Hash Sharding.Hash Sharding 只会在分布式 InfluxDB 中才会用到.
倒排索引引擎构建倒排索引
InfluxDB 中倒排索引引擎使用 LSM 引擎构建, 上篇文章时序数据库技术体系 - InfluxDB 多维查询之倒排索引其实已经对引擎的工作原理进行了深入的介绍. 这里重点将整个流程做一个串联梳理, 其中细节部分不会展开来讲, 有兴趣的话可以参考上一篇文章.
这里首先思考一个问题: 为什么 InfluxDB 倒排索引需要构建成 LSM 引擎? 其实很简单, LSM 引擎天生对写友好, 写多读少的系统第一选择就是 LSM 引擎, 所以大数据时代的各种数据存储系统就是 LSM 引擎的天下, HBase,Kudu,Druid,TiKV 这些系统无一不是这样. InfluxDB 作为一个时序数据库更是写多读少的典型, 无论倒排索引引擎还是时序数据处理引擎选用 LSM 引擎更是无可厚非.
既然是 LSM 引擎, 工作机制必然是这样的: 首先将数据追加写入 WAL 再写入 Cache 就可以返回给用户写入成功, WAL 可以保证即使发生异常宕机也可以恢复出来 Cache 中丢失的数据. 一旦满足特定条件系统会将 Cache 中的时序数据执行 flush 操作落盘形成文件. 文件数量超过一定阈值系统会将这些文件合并形成一个大文件. 那具体到倒排索引引擎整个流程是什么样的, 简单来看一下:
1. WAL 追加写入: Inverted Index WAL 格式很简单, 由一个一个 LogEntry 构成, 如下图所示:
每个 LogEntry 由 Flag,Measurement, 一系列 Key\Value 以及 Checksum 组成. 其中 Flag 表示更新类型, 包括写入, 删除等, Measurement 表示数据表, Key\Value 表示写入的 Tag Set 以及 Checksum, 其中 Checksum 用于根据 WAL 回放数据时验证 LogEntry 的完整性. 注意, LogEntry 中并没有时序数据列, 只有维度列(Tag Set).
2. Inverted Index 在内存中构建
(1)拼 SeriesKey: 时序数据写入到系统之后先将 measurement 和所有的维度值拼成一个 seriesKey;
(2)确认 SeriesKey 是否已经构建过索引: 在文件中确认该 seriesKey 是否已经存在, 如果已经存在就忽略, 不需要再将其加入到内存倒排索引. 那问题转化为如何在文件中查找某个 seriesKey 是否已经存在? 这就是 Series Block 中 Bloom Filter 的核心作用, 首先使用 Bloom Filter 进行判断, 如果不存在, 肯定不存在. 如果存在, 不一定存在, 需要进一步判断. 再进一步使用 B + 树以及 HashIndex 进一步查找判断;
(3)如果 seriesKey 在文件中不存在, 需要将其写入内存. 倒排索引内存结构主要包含两个 Map:<measurement, List<tagKey>> 和 <tagKey, <tagValue, List<SeriesKey>>>, 前者表示时序表与对应维度集合的映射, 即这个表中有多少维度列. 后者表示每个维度列都有哪些可枚举的值, 以及这些值都对应哪些 SeriesKey.InfluxDB 中 SeriesKey 就是一把钥匙, 只有拿到这把钥匙才能找到这个 SeriesKey 对应的数据. 而倒排索引就是根据一些线索去找这把钥匙.
3. Inverted Index Cache Flush 流程
(1)触发时机: 当 Inverted Index WAL 日志的大小超过阈值(默认 5M), 就会执行 flush 操作将缓存中的两个 Map 写成文件;
(2)基本流程:
缓存 Map 排序:<measurement, List<tagKey>>以及 < tagKey, <tagValue, List<SeriesKey>>都需要经过排序处理, 排序的意义在于有序数据可以结合 Hash Index 实现范围查询, 另外 Series Block 中 B + 树的构建也需要 SeriesKey 排序;
构建并持久化 Series Block: 在排序的基础上首先持久化 < tagKey, tagValue, List<SeriesKey>>结构中所有的 SeriesKey, 也就是先构建 Series Block. 依次持久化 SeriesKey 到 SeriesKeyChunk, 当 Chunk 满了之后, 根据 Chunk 中最小的 SeriesKey 构建 B + 树中的 Index Entry 节点. 当然, Hash Index 以及 Bloom Filter 是需要实时构建的. 需要注意的是, Series Block 在构建的同时需要记录下 SeriesKey 与该 Key 在文件中偏移量的对应关系, 即 < SeriesKey, SeriesKeyOffset>, 这一点至关重要;
内存中将 SeriesKey 映射为 SeriesId: 将 < tagKey, <tagValue, List<SeriesKey>>结构中所有的 SeriesKey 由上一步中得到的 < SeriesKey, SeriesKeyOffset > 中的 SeriesKeyOffset 代替. 形成新的结构:<tagKey, <tagValue, List<SeriesKeyOffset>>, 即 < tagKey, <tagValue, List<SeriesKeyId>>>, 其中 SeriesKeyId 就是 SeriesKeyOffset;
构建并持久化 Tag Block: 在新结构 < tagKey, <tagValue, List<SeriesKeyId>>>的基础上首先持久化 tagValue, 将同一个 tagKey 下的所有 tagValue 持久化在一起并生成对应 Hash Index 写入文件, 接着持久化下一个 tagKey 的所有 tagValue. 所有 tagValue 都持久话完成之后再依次持久化所有的 tagKey, 形成 Tag Block;
构建并持久化 Measurement Block: 最后持久化 measurement 形成 Measurement Block.
时序数据写入流程
时序数据的维度信息经过倒排索引引擎构建完成之后, 接着就需要将数据写入系统. 和倒排索引引擎一样, 数据写入引擎也是一个 LSM 引擎, 基本流程也是先写 WAL, 再写 Cache, 最后满足一定阈值条件之后将 Cache 中的数据 flush 到文件.
1. WAL 追加写入: 时间线数据数据会经过两重处理, 首先格式化为 WriteWALEntry 对象, 该对象字段元素如下图所示. 然后经过 snappy 压缩后写入 WAL 并持久话到文件.
2. 时序数据写入内存结构
(1)时序数据点格式化: 将所有时间序列数据点按时间线组织形成一个 Map:<SeriesKey+FieldKey, List<Value>>, 即将相同 Key(SeriesKey+FieldKey)的时序数据集中放在一个 List 中;
(2)时序数据点写入 Cache:InfluxDB 中 Cache 是一个 crude hash ring, 这个 ring 由 256 个 partition 构成, 每个 partition 负责存储一部分时序数据 Key 对应的值. 就相当于数据写入 Cache 的时候又根据 Key Hash 了一次, 根据 Hash 结果映射到不同的 partition. 为什么要这么处理? 个人认为有点像 Java 中 ConcurrentHashMap 的思路, 将一个大 HashMap 切分成多个小 HashMap, 每个 HashMap 内部在写的时候需要加锁. 这样处理可以减小锁粒度, 提高写性能.
3. Data Cache Flush 流程(参考 engine.compactCache)
(1)触发时机: Cache 执行 flush 操作有两个基本触发条件, 其一是当 cache 大小超过一定阈值, 可以通过参数'cache-snapshot-memory-size'配置, 默认是 25M 大小; 其二是超过一定时间阈值没有时序数据写入 WAL 也会触发 flush, 默认时间阈值为 10 分钟, 可以通过参数'cache-snapshot-write-cold-duration'配置;
(2)基本流程: 在了解了 TSM 文件的基本结构之后, 我们再简单看看时序数据是如何从内存中的 Map 持久化成 TSM 文件的, 整个过程可以表述为:
内存中构建 Series Data Block: 顺序遍历内存 Map 中的时序数据, 分别对时序数据的时间列和数值列进行相应的编码, 按照 Series Data Block 的格式进行组织, 当 Block 大小超过一定阈值就构建成功. 并记录这个 Block 内时间列的最小时间 MinTime 以及最大时间 MaxTime.
将构建好的 Series Data Block 写入文件: 使用输出流将内存中数据输出到文件, 并返回该 Block 在文件中的偏移量 Offset 以及总大小 Size.
构建文件级别 B + 索引: 在内存中为该 Series Data Block 构建一个索引节点 Index Entry, 使用数据 Block 在文件中的偏移量 Offset, 总大小 Size 以及 MinTime,MaxTime 构建一个 Index Entry 对象, 写入到内存 Series Index Block 对象.
这样, 每构建一个 Series Data Block 并写入文件之后都会在内存中顺序构建一个 Index Entry, 写入内存 Series Index Block 对象. 一旦一个 Key 对应的所有时序数据都持久化完成, 一个 Series Index Block 就构建完成, 构建完成之后填充 Index Block Meta 信息. 接着新建一个新的 Series Index Block 开始构建下一个 Key 对应的数据索引信息.
InfluxDB 数据删除操作(DropMeasurement,DropTagKey)
一般 LSM 引擎处理删除通常都采用 Tag 标记的方式, 即删除操作和写入操作流程基本一致, 只是数据上会多一个 Tag 标记 - deleted, 表示该值已经被 deleted. 这种处理方案可以最小化删除代价, 但万物有得必有失, 减小了写入代价必然会增加读取代价, Tag 标签方案在读取的时候需要对标记有 deleted 的数值进行特殊处理, 这个代价还是很大的. HBase 中删除操作就是采用 Tag 标记方案.
InfluxDB 比较奇葩, 对于删除操作处理的比较异类, 通常 InfluxDB 不会删除一条记录, 而是会删除某段时间内或者某个维度下的所有记录, 甚至一张表的所有记录, 这和通常的数据库有所不同. 比如:
DROP SERIES FROM h2o_feet WHERE location = 'santa_monica' DELETE FROM "cpu" DELETE FROM "cpu" WHERE time < '2000-01-01T00:00:00Z' DELETE WHERE time < '2000-01-01T00:00:00Z'
上文我们知道 InfluxDB 中一个 shard 有两个 LSM 引擎, 一个是倒排索引引擎(存储维度列到 SeriesKey 的映射关系, 方便多维查找), 一个是 TSM Engine, 用来存储实际的时序数据. 如果是删除一条记录, 通常只需要 TSM Engine 执行删除就可以, 倒排索引引擎是不需要执行删除的. 而如果是 Drop Measurement 这样的操作, 那么两个 LSM 引擎都需要执行相应的删除. 问题是, 这两个引擎的删除策略完全不同, TSM Engine 采用了一种同步删除策略, Inverted Index Engine 采用了标记删除策略. 如下图所示:
1. TSM Engine 同步删除策略, 整个删除流程可以分为如下四步:
(1)删除所有 TSM File 中满足条件的 series, 系统会遍历当前 shard 中所有 TSM File, 检查该 File 中是否存在满足删除条件的 File, 如果有会执行如下两个操作:
TSM File Index 相关处理: 在内存中删除满足条件的 Index Entry, 通常删除会带有 Time Range 以及 Key Range, 而且 TSM File Index 会在引擎启动之后加载到内存. 因此删除操作会将满足条件的 Index Entry 从内存中删除.
生成 tombstoner 文件: tombstoner 文件会记录当前 TSM File 中所有被删除的时序数据, 时序数据用 [key, min, max] 三个字段表示, 其中 key 即 SeriesKey+FieldKey,[min, max]表示要删除的时间段. 如下图所示:
(2)删除 Cache 中满足条件的 series;
(3)在 WAL 中生成一条删除 series 的记录并持久化到硬盘.
2. Inverted Index Engine 标记 Tag 删除策略, 标记 Tag 删除非常简单, 和一次写入流程基本相同:
(1)在 WAL 中生成一条 flag 为 deleted 的 LogEntry 并持久化到硬盘;
(2)将要删除的维度信息写入 Cache, 需要标记 deleted(设置 type=deleted);
(3)当 WAL 大小超过阈值之后标记为 deleted 的维度信息会随 Cache Flush 到倒排索引文件;
(4)和 HBase 一样, Inverted Index Engine 中索引信息真正被删除发生在 compact 阶段.
总结
InfluxDB 因为其特有的双 LSM 引擎而显得内部结构更加复杂, 写入流程相比其他数据库来说更加繁琐. 但只要理解了它的数据文件内部组织格式以及倒排索引文件内部组织格式, 相信对于整体的把握也并不是很难.
来源: http://stor.51cto.com/art/201804/570628.htm