作者: 王道远, 花名健身, 阿里巴巴计算平台 EMR 技术专家.
背景
EMR Spark 提供的 Relational Cache 功能, 可以通过对数据模型进行预计算和高效地存储, 加速 Spark SQL, 为客户实现利用 Spark SQL 对海量数据进行即时查询的目的. Relational Cache 的工作原理类似物化视图, 在用户提交 SQL 语句时对语句进行分析, 并选出可用的预计算结果来加速查询. 为了实现高效地预计算结果复用, 我们构建的预计算缓存一般都较为通用, 因此对于用户 query, 还需进行进一步的计算方能获得最终结果. 因此, 如何快速地找出匹配的缓存, 并构建出准确的新执行计划, 就显得尤为重要.
在 Hive 3.x 中支持的 Materialized View, 利用了 Apache Calcite 对执行计划进行重写. 考虑到 Spark SQL 使用 Catalyst 进行执行计划优化, 引入 Calcite 太重, 因此 EMR Spark 中的 Relational Cache 实现了自己的 Catalyst 规则, 用于重写执行计划. 本文将介绍执行计划重写的相关内容.
执行计划重写
准备工作
Spark 会把用户查询语句进行解析, 依次转化为 Unresolved Logical Plan(未绑定的逻辑计划),Resolved Logical Plan(绑定的逻辑计划),Optimized Logical Plan(优化的逻辑计划),Physical Plan(物理计划). 其中, 未优化的逻辑计划根据用户查询语句不同, 会有较大区别, 而 Relational Cache 作为优化的一部分, 放在逻辑计划优化过程中也较为合适, 因此我们拿到的用户查询计划会是优化中的逻辑计划. 要与优化中的逻辑计划匹配, 我们选择把这个重写过程放在 Spark 优化器比较靠后的步骤中, 同时, 预先将 Relational Cache 的逻辑计划进行解析, 获得优化后的 Cache 计划, 减小匹配时的复杂程度. 这样, 我们只需匹配做完了谓词下推, 谓词合并等等优化之后的两个逻辑计划.
基本过程
在匹配时, 我们希望能尽可能多得匹配计算和 IO 操作, 因此, 我们对目标计划进行前序遍历, 依次进行匹配, 尝试找到最多的匹配节点. 而在判断两个节点是否匹配时, 我们采用后序遍历的方式, 希望尽快发现不匹配的情况, 减少计划匹配的执行时间. 然后我们会根据匹配结果, 对计划进行重写, 包括对于 Cache 数据进行进一步的 Filter,Project,Sort 甚至 Aggregate 等操作, 使其与匹配节点完全等价, 然后更新逻辑计划节点的引用绑定, 无缝替换到逻辑计划中, 这样就能轻松获得最终的重写后的计划.
Join 匹配
Spark 中的 Join 都是二元操作, 而实际的 Join 顺序可能根据一些策略会有很大区别, 因此对于 Join 节点, 必须进行特殊处理. 我们会首先将逻辑计划进行处理, 根据缓存计划的 Join 顺序进行 Join 重排. 这一步在树状匹配之前就进行了, 避免不断重复 Join 重排带来的时间浪费. 重排后的 Join 可以更大概率地被我们匹配到.
为了实现 Cache 的通用性, 根据星型数据模型的特点, 我们引入了 Record Preserve 的概念. 这和传统数据库中的 Primary Key/Foreign Key 的关系较为类似, 当有主键的表与非空外键指向的表在外键上进行 Join 时, 记录的条数不会变化, 不会膨胀某条记录, 也不会丢失某条记录. PK/FK 的语意在大数据处理框架中经常缺失, 我们引入了新的 DDL 让用户自定义 Record Preserve Join 的关系. 当用户定义 A Inner Join B 是对于 A 表 Record Preserve 时, 我们也会把 A Inner Join B 和 A 的关系匹配起来. 有了 PK/FK 的帮助, 我们能匹配上的情况大大增加了, 一个 Relational Cache 可以被更多看似区别巨大的查询共享, 这可以很好的为用户节约额外的存储开销和预计算开销.
Aggregate 匹配
一般的 Aggregate 匹配较为简单, 而 Spark 支持的 Grouping Set 操作, 会构建出 Expand 逻辑计划节点, 相当于把一条记录转为多条, 使用 Grouping ID 进行标记. 由于 Expand 的子节点是所有 Grouping 的情况共用的, 这里我们只对子节点进行一次匹配, 再分别进行上面的 Grouping 属性和 Aggregate 属性的匹配. 主要是验证目标聚合所需的属性或者聚合函数都能从某个 Grouping ID 对应的聚合结果中计算出来, 比如粗粒度的 Sum 可以对细粒度的 Sum 进行二次 Sum 求和, 而粗粒度的 Count 对细粒度的 Count 也应通过二次 Sum 求和, 粗粒度的 Average 无法仅从细粒度的 Average 中还原出来等等.
计划重写
找出匹配的逻辑计划之后, 就是重写逻辑计划的过程. 对于无需二次聚合的逻辑计划, 直接根据缓存数据的 schema, 从缓存数据的 Relation 中选择所需列, 根据条件过滤后, 进行后续操作. 如果还需二次聚合, 选择所需列时需保留外部要用的所有列, 以及聚合时需要的列, 还有聚合函数需要的数据. 二次聚合的聚合函数需要根据实际情况进行重写, 确保能使用 Relational Cache 中已经初步聚合的结果. 这里面需要根据聚合的语意判断是否能够二次聚合. 如果时 Grouping Set 的聚合, 二次聚合之前还需选择正确的 Grouping ID 进行过滤. 经过二次聚合后, 步骤大体和普通的重写一致, 只需替换到目标计划中即可.
结果
我们以一个例子来具体说明逻辑计划的重写结果. Star Schema Benchmark(论文链接) 是星型模型数据分析的一个标准 Benchmark, 其结构定义如图所示:
我们构建 Relational Cache 的 SQL 语句如下:
- SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUM
- FROM supplier, p_lineorder, dates, customer, part
- WHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkey
- GROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
我们从中选出一条查询作为示例. 具体查询语句:
- select c_city, s_city, d_year, sum(lo_revenue) as revenue
- from customer, lineorder, supplier, dates
- where lo_custkey = c_custkey
- and lo_suppkey = s_suppkey
- and lo_orderdate = d_datekey
- and c_nation = 'UNITED KINGDOM'
- and (c_city='UNITED KI1' or c_city='UNITED KI5')
- and (s_city='UNITED KI1' or s_city='UNITED KI5')
- and s_nation = 'UNITED KINGDOM'
- and d_yearmonth = 'Dec1997'
- group by c_city, s_city, d_year
- order by d_year asc, revenue desc
原始逻辑计划如下所示:
- Sort [d_year#39 ASC NULLS FIRST, revenue#0L DESC NULLS LAST], true
- +- Aggregate [c_city#6, s_city#31, d_year#39], [c_city#6, s_city#31, d_year#39, sum(lo_revenue#23L) AS revenue#0L]
- +- Project [c_city#6, lo_revenue#23L, s_city#31, d_year#39]
- +- Join Inner, (lo_orderdate#16 = d_datekey#35)
- :- Project [c_city#6, lo_orderdate#16, lo_revenue#23L, s_city#31]
- : +- Join Inner, (lo_suppkey#15 = s_suppkey#28)
- : :- Project [c_city#6, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
- : : +- Join Inner, (lo_custkey#13 = c_custkey#3)
- : : :- Project [c_custkey#3, c_city#6]
- : : : +- Filter (((isnotnull(c_nation#7) && (c_nation#7 = UNITED KINGDOM)) && ((c_city#6 = UNITED KI1) || (c_city#6 = UNITED KI5))) && isnotnull(c_custkey#3))
- : : : +- HiveTableRelation `ssb`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#3, c_name#4, c_address#5, c_city#6, c_nation#7, c_region#8, c_phone#9, c_mktsegment#10]
- : : +- Project [lo_custkey#13, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
- : : +- Filter ((isnotnull(lo_custkey#13) && isnotnull(lo_suppkey#15)) && isnotnull(lo_orderdate#16))
- : : +- HiveTableRelation `ssb`.`lineorder`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [lo_orderkey#11L, lo_linenumber#12L, lo_custkey#13, lo_partkey#14, lo_suppkey#15, lo_orderdate#16, lo_orderpriotity#17, lo_shippriotity#18, lo_quantity#19L, lo_extendedprice#20L, lo_ordtotalprice#21L, lo_discount#22L, lo_revenue#23L, lo_supplycost#24L, lo_tax#25L, lo_commitdate#26, lo_shipmode#27]
- : +- Project [s_suppkey#28, s_city#31]
- : +- Filter (((isnotnull(s_nation#32) && ((s_city#31 = UNITED KI1) || (s_city#31 = UNITED KI5))) && (s_nation#32 = UNITED KINGDOM)) && isnotnull(s_suppkey#28))
- : +- HiveTableRelation `ssb`.`supplier`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [s_suppkey#28, s_name#29, s_address#30, s_city#31, s_nation#32, s_region#33, s_phone#34]
- +- Project [d_datekey#35, d_year#39]
- +- Filter ((isnotnull(d_yearmonth#41) && (d_yearmonth#41 = Dec1997)) && isnotnull(d_datekey#35))
- +- HiveTableRelation `ssb`.`dates`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [d_datekey#35, d_date#36, d_dayofweek#37, d_month#38, d_year#39, d_yearmonthnum#40, d_yearmonth#41, d_daynuminweek#42, d_daynuminmonth#43, d_daynuminyear#44, d_monthnuminyear#45, d_weeknuminyear#46, d_sellingseason#47, d_lastdayinweekfl#48, d_lastdayinmonthfl#49, d_holidayfl#50, d_weekdayfl#51]
重写后的一个逻辑计划如下:
- Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true
- +- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]
- +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))
- +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
由此可见, 执行计划大大简化, 我们可以做到亚秒级响应用户的命中查询.
进一步优化
在实际测试过程中, 我们发现当多个 Relational Cache 存在时, 匹配时间线性增长明显. 由于我们在 metastore 中存储的是 Cache 的 SQL 语句, 取 SQL 语句和再次解析的时间都不容小觑, 这就使得匹配过程明显增长, 背离了我们追求亚秒级响应的初衷. 因此我们在 Spark 中构建了逻辑计划缓存, 将解析过的 Relational Cache 的计划缓存在内存中, 每个 Relational Cache 只缓存一份, 计划本身占用空间有限, 因此我们可以缓存住几乎所有的 Relational Cache 的优化后的逻辑计划, 从而在第一次查询之后, 所有查询都不再收到取 SQL 语句和再次解析的延迟困扰. 经过这样的优化, 匹配时间大幅减少到 100ms 的量级.
总结与思考
Relational Cache 实现了一种基于 Cache 的优化方案, 让 Spark SQL 能够用于即时查询的场景下, 满足用户对海量数据秒级查询的需求. 通过对用户查询的动态改写, 可以大大提高缓存的利用率, 扩展缓存的命中场景, 有效提高查询性能. 现有方案也有很多可优化的地方, 比如重复的回溯遍历时间复杂度较高, 不如在逻辑计划节点内部更新维护可匹配的信息. 考虑到对 Spark 的侵入性, 我们暂时选择了现有方案, 后续根据实际的使用情况, 还会进一步优化我们的逻辑计划重写过程. 而重写的逻辑计划还涉及到基于不同的 Relational Cache Plan 会有不同的重写方式, 在这些重写结果中如何根据执行代价选择最优的重写方案, 将会在后续文章中进行揭秘, 敬请期待!
来源: https://yq.aliyun.com/articles/705655