使用 Relational Cache 加速 EMR Spark 数据分析
使用 EMR Spark Relational Cache 跨集群同步数据
EMR Spark Relational Cache 的执行计划重写
EMR Spark Relational Cache 如何支持雪花模型中的关联匹配
背景
在利用 Relational Cache 进行查询优化时, 我们需要通过预计算, 存储大量数据. 而在查询时, 我们真正需要读取的数据量也许并不大. 为了能让查询实现秒级响应, 这就涉及到优化从大量数据中快速定位所需数据的场景. 本文介绍在 EMR Spark Relational Cache 中, 我们如何针对这种场景进行了优化.
存储格式
在数据存储格式上, 我们默认选择 Spark 社区支持最好的 Parquet 格式. Parquet 是一种列式存储格式, 我们可以很方便地利用列式存储格式进行字段裁剪. 另外, Parquet 的每个数据文件由多个 Row Group 组成, 同时在每个数据文件的 footer 中记录了各个 Row Group 的统计信息, 如最大值, 最小值等. 这些统计信息可以在读取数据时减少实际的 IO 开销. 事实上, 在现在的 Spark 版本中, 我们可以看到 Catalyst 优化器已经把可以下推的一些过滤条件下推到了 Parquet reader, 利用 Parquet 文件的统计信息过滤真正需要读取的 Row Group, 从而实现减少 IO 量, 加速查询时间的效果. 这也是列存格式基本都支持的功能.
对于 Relational Cache 而言, 有很多过滤条件时确定已知的. 我们直接利用这一特性, 将确定的查询条件下推到 Parquet reader 里, 由 Parquet reader 完成对 Row Group 的选择. 由于实际要读取的数据量占总数据量的比重往往很小, 这种过滤的实际效果还是比较好的.
数据分区
对于 Spark Relational Cache 来说, 由于构建 Cube 时会使用到 Expand 算子, 我们需要引入 Grouping ID 来区分不同的 grouping set. 在大部分后续的查询中, 我们往往只需要其中一个 Grouping ID 所对应的数据. 因此, Grouping ID 成了一个天然的数据分区选择. 在 Hive/Spark 等大数据分析引擎中, 数据分区是对于结构化数据, 将其中一个或多个字段的具体值作为目录, 分目录存放文件的一种常见做法. 当我们确定要选择某 Grouping ID 对应的数据时, 我们只需读取对应目录中的数据即可. 这种做法可以直接忽略 Grouping ID 不匹配的文件, 从而大大减少启动的总 task 数量, 减少 Spark 的任务调度开销.
文件索引
当总数据量较大时, 存储的文件数也会比较多. 此时即使我们通过 Parquet 的 footer 可以获得较好的过滤效果, 我们还是要启动一些 task 去读取这些 footer. 在 Spark 实际的实现中, 往往需要与文件数量的量级相当的 task 去进行 footer 读取. 在集群计算资源有限时, 调度这些任务就显得比较浪费时间. 为了能进一步减少 Spark 作业的调度开销, 进一步提高执行效率, 我们实现了文件索引来优化这种场景.
文件索引就类似于独立的 footer. 我们提前收集每个文件中各字段数据的最大最小值, 并存储在一个隐藏的数据表中. 这样, 我们只需要读取一个单独的表就可以从文件层面对需要处理的文件做一个初步的过滤. 这是一个单独的 stage, 由于一个文件只对应这个隐藏表中的一条记录, 因此读取隐藏表所需的 task 数量要远远小于读取所有数据文件 footer 的开销. 后续 stage 的任务数量也因此可以大大减少. 在 Relational Cache 的访问场景下, 整体加速效果非常明显.
数据排序
为了能实现高效的数据准备过程, 不论是在 Parquet 文件的 footer 还是在我们实现的文件索引中, 都是主要依靠最大值和最小值的信息来过滤数据. 那么在极端场景下, 光靠这些统计信息可能会完全没有过滤的效果. 举个例子, 如果某个 key 的所有数据文件, 所有 Row Group 的最大值和最小值都等于全局最大值和最小值时, 对这个 key 的过滤就完全无效了. 这样, 我们会自然而然的想到对数据进行排序.
但是, 传统的数据排序还有一个问题. 在数据库中, 当我们对多个字段进行排序时, 往往字段之间具有主次关系, 这就导致排序字段序列中, 排在最前面的字段有很好的过滤效果, 而排得靠后的字段因为数据分散, 往往过滤效果越来越差. 这就需要我们找到更好的排序方法, 能够兼顾到多个字段的数据过滤效果.
这里涉及到一个空间填充曲线的概念. 我们可以把数据想像成一个有限空间, 如何将数据进行排序和分块, 能够使得每一块的最值都只是在一个不大的范围内, 从而让文件索引获得较好的过滤效果呢? 我们选择了 Z-Order 曲线对多维数据空间进行排序, 这样可以保证每列都有较为均衡的过滤效果. 下图是二维空间中 Z-Order 曲线的示意图.
不过我们也要注意到, 随着排序列的增加, 单列的过滤效果将会越来越差. 因此在实际运用中, 我们也要对排序列进行取舍, 才能获得最佳的整体效果.
小结
本文介绍了 EMR Spark 的 Relational Cache 如何从数据量较大的 Cube 中快速提取出所需数据加速查询的原理. 通过列式存储, 文件索引, Z-Order 等技术, 我们可以快速过滤数据, 大大减少实际发生的 IO 数据量, 避免 IO 瓶颈的出现, 从而优化整体查询性能.
参考
- Apache Parquet
- Processing Petabytes of Data in Seconds with Databricks Delta
- Z-Order Curve
来源: http://www.jianshu.com/p/5baaa4d832a4