背景
HiTSDB 时序数据库引擎在服务于阿里巴巴集团内的客户时, 根据集团业务特性做了很多针对性的优化. 然而在 HiTSDB 云产品的打磨过程中逐渐发现, 很多针对性的优化很难在公有云上针对特定用户去实施.
于此同时, 在公有云客户使用 HiTSDB 的过程中, 发现了越来越多由于聚合查询导致的问题, 比如: 返回数据点过多会出现栈溢出等错误, 聚合点过多导致 OOM, 或者无法完成聚合, 实例完全卡死等等问题. 这些问题主要由于原始的聚合引擎架构上的缺陷导致.
因此 HiTSDB 开发团队评估后决定围绕新的聚合引擎架构对 HiTSDB 引擎进行升级, 包含: 存储模型的改造, 索引方式的升级, 实现全新的流式聚合, 数据迁移, 性能评测. 本文主要围绕这 5 个方面进行梳理, 重点在 "全新的流式聚合部分".
1. 时序数据存储模型:
1.1 时序的数据存储格式.
一个典型的时序数据由两个维度来表示, 一个维度表示时间轴, 随着时间的不断流入, 数据会不断地追加. 另外一个维度是时间线, 由指标和数据源组成, 数据源就是由一系列的标签标示的唯一数据采集点. 例如指标 cpu.usage 的数据来自于机房, 应用, 实例等维度组合成的采集点. 这样大家逻辑上就可以抽象出来一个 id+{timestamp, value}的时序数据模型. 这种数据模型的存储是如何呢. 一般有两种典型的数据存储思路:
一种按照时间窗口维度划分数据块, 同一段自然时间窗口内的连续数据放到相邻的位置, 比如{1:00, 2:00}->(id1, id2, id3, ... ... ,idN). 采用这种方式的典型时序数据库包含 InfluxDB, Promethues 等等 TSMT 结构的数据库. OpenTSDB 有些特殊, 因为 OpenTSDB 是单值模型, 指标这个维度在查询的时候是必带的. 所以可以先按照指标做了一级划分, 再根据时间窗口做二级的划分, 本质上还是同一时间窗口内的连续数据. 按照时间窗口切分的方式, 优势是写入的时候可以很天然的按照窗口去落盘, 对于高纬度的标签查询基本上是一些连续 Scan. 这种方式有个比较难解的问题就是 "out of order" 乱序问题, 对于时间窗口过期后再来的时间点, Promethues 直接采用丢弃的方式, InfluxDB 在这种情况下性能会有损耗.
另外一种按照时间线维度划分数据块, 同一时间线的数据放到相邻的位置, 比如(id1)->(1:00, 2:00, 3:00, ... ... , 23:00). HiTSDB 采用时间线维度划分的方式: 目前落盘数据存储于 HBASE, 底层 Rowkey 由指标 + 标签 + 自然窗口的方式组合而成. Rowkey 按照大小顺序合并某个时间线的数据点是连续相邻的. 因此对于一些低维的查询效率是非常高效的. 根据目前接触的一些物联网服务, 更多的是一些低维的访问. 对于中等维度的查询采用流式 scan. 对于极高纬度标签的查询 HiTSDB 采用预聚合的服务(不在本文讨论范围内).
1.2 时序模型的热点问题处理
生产环境中业务方采集的指标类型多种多样, 对指标的采集周期各不相同. 比如 cpu.usage 这个指标的变化频率比较快, 业务方关注度高, 采集周期通常很短, 1 秒, 5 秒, 10 秒等等. 然而指标 disk.usage 这个指标变化趋势相对平滑, 采集周期通常为 1 分钟, 5 分钟, 10 分钟等. 这种情况下, 数据的存储如果针对同一个指标不做特殊处理, 容易形成热点问题. 假设按照指标类型进行存储资源的分片, 想象一下如果有 20 个业务, 每个业务 10 个集群, 每个集群 500 台主机, 采集周期是 1 秒的话, 每秒就会有 10 万个 cpu.usage 的指标数据点落到同一个存储资源实例中, 而 disk.usage 采集周期为 1 分钟, 所以大约只有 1666 个指标数据点落到另外一个存储资源上, 这样数据倾斜的现象非常严重.
1.2.1 分桶
这类问题的经典解法就是分桶. 比如除了指标类型外, 同时将业务名和主机名作为维度标识 tags, 把指标 cpu.usage 划分到不同的桶里面. 写入时根据时间线哈希值分散写入到不同的桶里面. OpenTSDB 在处理热点问题也是采用了分桶模式, 但是需要广播读取, 根本原因在于查询方式需要在某个时间窗口内的全局扫描. 所以设置 OpenTSDB 的分桶数量需要一个平衡策略, 如果数量太少, 热点还是有局部性的问题, 如果太多, 查询时广播读带来的开销会非常大.
与其相比较, HiTSDB 避免了广播读, 提高了查询效率. 由于 HiTSDB 在查询时, 下发到底层存储扫描数据之前, 首先会根据查询语句得到精确命中的时间线. 有了具体的时间线就可以确定桶的位置, 然后到相应的块区域取数据, 不存在广播读. 关于 HiTSDB 如何在查询数据的时候获取命中的时间线, 相信读者这个疑问会在读取完倒排这一节的时候消释.
1.2.2 Region Pre-Split
当一个表刚被创建的时候, HBase 默认分配一个 Region 给新表. 所有的读写请求都会访问到同一个 regionServer 的同一个 region 中. 此时集群中的其他 regionServer 会处于比较空闲的状态, 这个时候就达不到负载均衡的效果了. 解决这个问题使用 pre-split, 在创建新表的时候根据分桶个数采用自定义的 pre-split 的算法, 生成多个 region. byte[][] splitKeys =new byte[bucketNumber-1][]; splitKeys[bucketIndex-1] = (bucketIndex&0xFF);
2. 倒排索引:
2.1 时序数据中的多维时间线
多维支持对于任何新一代时序数据库都是极其重要的. 时序数据的类型多种多样, 来源更是非常复杂, 不止有单一维度上基于时间的有序数值, 还有多维时间线相关的大量组合. 举个简单例子, cpu 的 load 可以有三个维度描述 cpu core, host, app 应用, 每个维度可以有百级别甚至万级别的标签值. sys.cpu.load cpu=1 host=ipA app=hitsdb, 各个维度组合后时间线可以轻松达到百万级别. 如何管理这些时间线, 建立索引并且提供高效的查询是时序数据库里面需要解决的重要问题. 目前时序领域比较主流的做法是采用倒排索引的方式.
2.2 倒排索引基本组合
基本的时间线在倒排中的组合思路如下:
时间线的原始输入值:
倒排构建后:
查询时间线 cpu=3 and host=ipB:
取交集后查询结果为 7:
2.3 倒排面临的问题以及优化思路
倒排主要面临的是内存膨胀的问题:
posting list 过长, 对于高纬度的 tag, 比如 "机房 = 杭州", 杭州可能会有千级别甚至万级别的机器, 这就意味着 posting list 需要存储成千上万个 64-bit 的 id. 解决这个问题的思路是采用压缩 posting list 的方式, 在构建 posting list 的时候对数组里面的 id 进行排序, 然后采用 delta 编码的方式压缩.
如果 Tag 键值对直接作为 term 使用, 内存占用取决于字符串的大小, 采用字符串字典化, 也可大大减少内存开销.
3. 流式聚合引擎
3.1 HiTSDB 聚合引擎的技术痛点
HiTSDB 现有聚合引擎公有云公测以及集体内部业务运行中, 暴露发现了以下问题:
3.1.1 Materialization 执行模式造成 Heap 内存易打爆
下图显示了原查询引擎的架构图. HiTSDB 以 HBase 作为存储, 原引擎通过 Async HBase client 从 HBase 获取时序数据. 由于 HBase 的数据读取是一个耗时的过程, 通常的解法是采用异步 HBase client 的 API, 从而有效提高系统的并行性. 但原聚合引擎采用了一种典型的 materialization 的执行方式: 1)启动多个异步 HBase API 启 HBase 读, 2)只有当查询所涉及的全部时序数据读入到内存中后, 聚合运算才开始启动. 这种把 HBase Scan 结果先在内存中 materialized 再聚合的方式使得 HiTSDB 容易发生 Heap 内存打爆的现象. 尤其当用户进行大时间范围查询, 或者查询的时间线的数据非常多的时候, 因为涉及的时序数据多, HiTSDB 会发生 Heap OOM 而导致查询失败.
3.1.2 大查询打爆 HBase 的问题
两个原因造成 HiTSDB 处理聚合查询的时候, 容易发生将底层 HBase 打爆.
HBase 可能读取多余时间线数据. HiTSDB 的时间线采用指标 + 时间窗口 + 标签的编码方式存储在 HBase. 典型的查询是用户指定一个指标, 时间范围, 以及空间维度上标签要寻找的匹配值. 空间维度的标签查询条件并不都是在标签编码前缀. 当这种情况发生时, HiTSDB 倒排索引不能根据空间维度的查询条件, 精确定位到具体的 HBase 的查询条件, 而是采用先读取再过滤的方式. 这意味着 HBase 有可能读取很多冗余数据, 从而加重 HBase 的负载.
HiTSDB 有可能在短时间内下发太多 HBase 读请求. 一方面, HiTSDB 在 HBase 采用分片存储方式, 对每一个分片, 都至少启动一个读请求, 另一方面, 因为上面提到的 materialization 的执行方式, 一个查询涉及到的 HBase 读请求同时异步提交, 有可能在很短时间内向 HBase 下发大量的读请求. 这样, 一个大查询就有可能把底层的 HBase 打爆.
当这种情况发生时, 更糟糕的场景是 HiTSDB 无法处理时序数据的写入请求, 造成后续新数据的丢失.
3.1.3 执行架构高度耦合, 修改或增加功能困难
聚合引擎主要针对应用场景是性能监控, 查询模式固定, 所以引擎架构采用单一模式, 把查询, 过滤, 填值 / 插值, 和聚合运算的逻辑高度耦合在一起. 这种引擎架构对于监控应用的固定查询没有太多问题, 但 HiTSDB 目标不仅仅是监控场景下的简单查询, 而是着眼于更多应用场景下的复杂查询.
我们发现采用原有引擎的架构, 很难在原有基础上进行增加功能, 或修改原来的实现. 本质上的原因在于原有聚合引擎没有采用传统数据库所通常采用的执行架构, 执行层由可定制的多个执行算子组成, 查询语义可以由不同的执行算子组合而完成. 这个问题在产品开发开始阶段并不感受很深, 但确是严重影响 HiTSDB 拓宽应用场景, 增加新功能的一个重要因素.
3.1.4 聚合运算效率有待提高
原有引擎在执行聚合运算的时候, 也和传统数据库所通常采用的 iterative 执行模式一样, 迭代执行聚合运算. 问题在于每次 iteration 执行, 返回的是一个时间点. Iterative 执行每次返回一条时间点, 或者一条记录, 常见于 OLTP 这样的场景, 因为 OLTP 的查询所需要访问的记录数很小. 但对 HiTSDB 查询有可能需要访问大量时间线数据, 这样的执行方式效率上并不可取.
原因 1)每次处理一个时间点, 都需要一系列的函数调用, 性能上有影响, 2)iterative 循环迭代所涉及到的函数调用, 无法利用新硬件所支持的 SIMD 并行执行优化, 也无法将函数代码通过 inline 等 JVM 常用的 hotspot 的优化方式. 在大数据量的场景下, 目前流行的通用做法是引入 Vectorization processing, 也就是每次 iteration 返回的不再是一条记录, 而是一个记录集(batch of rows), 比如 Google Spanner 用 batch-at-a-time 代替了 row-at-a-time, Spark SQL 同样也在其执行层采用了 Vectorization 的执行模式.
3.2 流式聚合引擎设计思路
针对 HiTSDB 原有聚合运算引擎上的问题, 为了优化 HiTSDB, 支持 HiTSDB 商业化运营, 我们决定改造 HiTSDB 聚合运算引擎. 下图给出了新聚合查询引擎的基本架构.
3.2.1 pipeline 执行模式
借鉴传统数据库执行模式, 引入 pipeline 的执行模式 (aka Volcano / Iterator 执行模式).Pipeline 包含不同的执行计算算子(operator), 一个查询被物理计划生成器解析分解成一个 DAG 或者 operator tree, 由不同的执行算子组成, DAG 上的 root operator 负责驱动查询的执行, 并将查询结果返回调用者. 在执行层面, 采用的是 top-down 需求驱动 (demand-driven) 的方式, 从 root operator 驱动下面 operator 的执行. 这样的执行引擎架构具有优点:
这种架构方式被很多数据库系统采用并证明是有效;
接口定义清晰, 不同的执行计算算子可以独立优化, 而不影响其他算子;
易于扩展: 通过增加新的计算算子, 很容易实现扩展功能. 比如目前查询协议里只定义了 tag 上的查询条件. 如果要支持指标值上的查询条件(cpu.usage>= 70% and cpu.usage <=90%), 可以通过增加一个新的 FieldFilterOp 来实现.
每个 operator, 实现如下接口:
Open : 初始化并设置资源
Next : 调用输入 operator 的 next()获得一个 batch of time series, 处理输入, 输出 batch of time series
Close : 关闭并释放资源
我们在 HiTSDB 中实现了以下算子:
ScanOp: 用于从 HBase 异步读取时间线数据
DsAggOp: 用于进行降采样计算, 并处理填值
AggOp: 用于进行分组聚合运算, 分成 PipeAggOp, MTAggOp
RateOp: 用于计算时间线值的变化率
3.2.2 执行计算算子一个 batch 的时间线数据为运算单位
在计算算子之间以一个 batch 的时间线数据为单位, 提高计算引擎的执行性能. 其思想借鉴于 OLAP 系统所采用的 Vectorization 的处理模式. 这样 Operator 在处理一个 batch 的多条时间线, 以及每条时间线的多个时间点, 能够减少函数调用的代价, 提高 loop 的执行效率.
每个 Operator 以流式线的方式, 从输入获得时间线 batch, 经过处理再输出时间线 batch, 不用存储输入的时间线 batch, 从而降低对内存的要求. 只有当 Operator 的语义要求必须将输入 materialize, 才进行这样的操作(参见下面提到的聚合算子的不同实现).
3.2.3. 区分不同查询场景, 采用不同聚合算子分别优化
HiTSDB 原来的聚合引擎采用 materialization 的执行模式, 很重要的一个原因在于处理时序数据的插值运算, 这主要是因为时序数据的一个典型特点是时间线上不对齐: 不同的时间线在不同的时间戳上有数据. HiTSDB 兼容 OpenTSDB 的协议, 引入了插值 (interpolation) 的概念, 目的在于聚合运算时通过指定的插值方式, 在不对齐的时间戳上插入计算出来的值, 从而将不对齐的时间线数据转换成对齐的时间线. 插值是在同一个 group 的所有时间线之间比较, 来决定在哪个时间戳上需要进行插值 (参见 OpenTSDB 文档).
为了优化聚合查询的性能, 我们引入了不同的聚合运算算子. 目的在于针对不同的查询的语义, 进行不同的优化. 有些聚合查询需要插值, 而有些查询并不要求插值; 即使需要插值, 只需要把同一聚合组的时间线数据读入内存, 就可以进行插值运算.
PipeAggOp: 当聚合查询满足以下条件时,
1)不需要插值: 查询使用了降采样(downsample), 并且降采样的填值采用了非 null/NaN 的策略. 这样的查询, 经过降采样后, 时间线的数据都是对齐补齐的, 也就是聚合函数所用到的插值不再需要.
2)聚合函数可以支持渐进式迭代计算模式 (Incremental iterative aggregation), 比如 sum, count ,avg, min, max, zerosum, mimmim, mimmax, 我们可以采用 incremental 聚合的方式, 而不需要把全部输入数据读入内存. 这个执行算子采用了流水线的方式, 每次从输入的 operator 获得一系列时间线, 计算分组并更新聚合函数的部分值, 完成后可以清理输入的时间线, 其自身只用保留每个分组的聚合函数的值.
MTAgOp: 需要插值, 并且输入算子无法帮助将时间线 ID 预先分组, 这种方式回退到原来聚合引擎所采用的执行模式.
对于 MTAggOp, 我们可以引入分组聚合的方法进行优化:
GroupedAggOp: 需要插值, 但是输入算子能够保证已经将时间线的 ID 根据标识 (tags) 进行排序分组, 这样在流水线处理中, 只要 materialize 最多一个组的数据, 这样的算子比起内存保留所有分组时间线, 内存要求要低, 同时支持不同组之间的并行聚合运算.
3.2.4 查询优化器和执行器
引入执行算子和 pipeline 执行模式后, 我们可以在 HiTSDB 分成两大模块, 查询优化器和执行器. 优化器根据查询语义和执行算子的不同特点, 产生不同的执行计划, 优化查询处理. 例如 HiTSDB 可以利用上面讨论的三个聚合运算算子, 在不同的场景下, 使用不同的执行算子, 以降低查询执行时的内存开销和提高执行效率为目的. 这样的处理方式相比于原来聚合引擎单一的执行模式, 更加优化.
4. 数据迁移
HiTSDB 新的聚合引擎采用的底层存储格式与以前的版本并不兼容. 公有云公测期间运行在旧版本实例的数据, 需要迁移至新的聚合引擎. 同时热升级出现了问题, 数据迁移还应回滚功能, 将新版本的数据点转换成旧的数据结构, 实现版本回滚. 整体方案对于用户的影响做到: 写入无感知, 升级过程中, 历史数据不可读.
4.1 数据迁移架构
并发转换和迁移数据: 原有的 HiTSDB 数据点已经在写入的时候进行了分片. 默认有 20 个 Salts. 数据迁移工具会对每个 Salt 的数据点进行并发处理. 每个 "Salt" 都有一个 Producer 和一个 Consumer.Producer 负责开启 HBase Scanner 获取数据点. 每个 Scanner 异步对 HBase 进行扫描, 每次获取 HBASE_MAX_SCAN_SIZE 行数的数据点. 然后将 HBase 的 Row Key 转换成新的结构.
最后将该 Row 放到所有的一个 Queue 上等待 Consumer 消费. Consumer 每次会处理 HBASE_PUT_BATCHSIZE 或者 HBASE_PUT_MIN_DATAPOINTS 的数据量. 每次 Consumer 顺利写入该 Batch 的时候, 我们会在 UID 表中记录对应 "Salt" 的数据处理位置. 这样便于故障重启时 Producer 从最后一次成功的地方重新开始获取数据点进行转换. 数据迁移工具对 HBase 的操作都采用异步的读写. 当扫描数据或者写入数据失败的时候, 我们会进行有限制的尝试. 如果超出尝试次数, 我们就终止该 "Salt" 的数据迁移工作, 其他 "Salt" 的工作不受到任何影响. 当下次工具自动重启时, 我们会出现问题的 "Salt" 数据继续进行迁移, 直到所有数据全部顺利转换完成.
流控限制: 大部分情况下, Producer 对 HBase 的扫描数据要快于 Consumer 对 HBase 的写入. 为了防止 Queue 的数据积压对内存造成压力同时为了减少 Producer 扫描数据时对 HBase 的压力, 我们设置了流控. 当 Queue 的大小达到 HBASE_MAX_REQUEST_QUEUE_SIZE 时候, Producer 会暂时停止对 HBase 的数据扫描等待 Consumer 消费. 当 Queue 的大小减少到 HBASE_RESUME_SCANNING_REQUEST_QUEUE_SIZE 时候, Producer 会重新恢复.
Producer 和 Consumer 进程的退出
顺利完成时候如何退出: 当一切进展顺利时候, 当 Producer 完成数据扫描之后, 会在 Queue 上放一个 EOS(End of Scan), 然后退出. Consumer 遇到 EOS 就会知道该 Batch 为最后一批, 成功处理完该 Batch 之后就会自动退出.
失败后如何关闭: Consumer 遇到问题时: 当 Consumer 写入 HBase 失败之后, consumer 会设置一个 Flag, 然后退出线程. 每当 Producer 准备进行下一个 HBASE_MAX_SCAN_SIZE 的扫描时候, 他会先检查该 Flag. 如果被设置, 他会知道对应的 Consumer 线程已经失败并且退出. Producer 也会停止扫描并且退出. Producer 遇到问题时: 当 Producer 扫描数据失败时, 处理方式和顺利完成时候类似. 都是通过往 Queue 上 EOS 来完成通知. 下次重启时, Producer 会从上次记录的数据处理位置开始重新扫描.
4.2 数据迁移的一致性
由于目前云上版本 HiTSDB 为双节点, 在结点升级结束后会自动重启 HiTSDB. 自动启动脚本会自动运行数据迁移工具. 如果没有任何预防措施, 此时两个 HiTSDB 节点会同时进行数据迁移. 虽然数据上不会造成任何丢失或者损坏, 但是会对 HBase 造成大量的写入和读取压力从而严重影响用户的正常的写入和查询性能.
为了防止这样的事情发生, 我们通过 HBase 的 Zoo Keeper 实现了类似 FileLock 锁, 我们称为 DataLock, 的机制保证只有一个结点启动数据迁移进程. 在数据迁移进程启动时, 他会通过类似非阻塞的 tryLock()的形式在 Zoo Keeper 的特定路径创建一个暂时的节点. 如果成功创建节点则代表成果获得 DataLock. 如果该节点已经存在, 即被另一个 HiTSDB 创建, 我们会收到 KeeperException. 这样代表未获得锁, 马上返回失败. 如果未成功获得 DataLock, 该节点上的数据迁移进程就会自动退出. 成果获得 DataLock 的节点则开始进行数据迁移.
4.3 数据迁移中的 "执行一次"
当所有 "Salt" 的数据点全部顺利完成迁移之后, 我们会在 HBase 的旧表中插入一行新数据, data_conversion_completed. 此行代表了数据迁移工程全部顺利完成. 同时自动脚本会每隔 12 个小时启动数据迁移工具, 这样是为了防止上次数据迁移没有全部完成. 每次启动时, 我们都会先检查 "data_conversion_completed" 标志. 如果标志存在, 工具就会马上退出. 此项操作只会进行一次 HBase 的查询, 比正常的健康检查成本还要低. 所以周期性的启动数据迁移工具并不会对 HiTSDB 或者 HBase 产生影响.
4.4. 数据迁移的评测
效果: 上线后无故障完成 100 + 实例数据的迁移, 热升级.
5. 查询性能评测
Case 1: 数据采集频率 5s, 查询命中 1000 条, 时间窗口 3600s
总结: 新的查询聚合引擎将查询速度提高了 10 倍以上.
其他
本文介绍了高性能时间序列数据库 HiTSDB 引擎在商业化运营之前进行的优化升级, 目的是提高 HiTSDB 引擎的稳定性, 数据写入和查询性能以及新功能的扩展性. HiTSDB 已经在阿里云正式商业化运营, 我们将根据用户反馈, 进一步提高 HiTSDB 引擎, 更好服务于 HiTSDB 的客户.
来源: http://zhuanlan.51cto.com/art/201804/571021.htm