Spark SQL 原理解析前言:
Spark SQL 源码剖析(一)SQL 解析框架 Catalyst 流程概述
Spark SQL 源码解析(二)Antlr4 解析 Sql 并生成树
Spark SQL 源码解析(三)Analysis 阶段分析
前面已经介绍了 SQL parse, 将一条 SQL 语句使用 antlr4 解析成语法树并使用访问者模式生成 Unresolved LogicalPlan, 然后是 Analysis 阶段将 Unresolved LogicalPlan 转换成 Resolved LogicalPlan. 这一篇我们介绍 Optimization 阶段, 和生成 Physical Planning 阶段.
经过这两个阶段后, 就差不多要到最后转换成 Spark 的 RDD 任务了.
Spark SQL Optimization 阶段概述
先来看看 Logical Optimization 阶段.
上一篇我们讨论了 Analysis 阶段如何生成一个真正的 Logical Plan 树. 这一阶段听名字就知道是优化阶段, Spark SQL 中有两个部分的优化, 第一部分就是这里, 是 rule-base 阶段的优化, 就是根据各种关系代数的优化规则, 对生成的 Logical Plan 适配, 匹配到就进行相应的优化逻辑. 这些规则大概有: 投影消除, constant folding, 替换 null 值, 布尔表达式简化等等. 当然大部分规则细节我也不是很清楚, 仅仅能从名字推断一二. 这
同时还可以添加自己的优化 rule, 也比较容易实现, 论文中就给出了一段自定义优化 rule 的代码:
- object DecimalAggregates extends Rule[LogicalPlan] {
- /** Maximum number of decimal digits in a Long */
- val MAX_LONG_DIGITS = 18
- def apply(plan: LogicalPlan): LogicalPlan = {
- plan transformAllExpressions {
- case Sum(e @ DecimalType.Expression(prec , scale))
- if prec + 10 <= MAX_LONG_DIGITS =>
- MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale)
- }
- }
这段代码的大意是自定义了一个 rule, 如果匹配到 SUM 的表达式, 那就执行相应的逻辑, 论文里描述这里是找到对应的小数并将其转换为未缩放的 64 位 LONG. 具体逻辑看不是很明白不过不重要, 重要的是编写自己的优化 rule 很方便就是.
顺便点一下另一种优化, 名字叫做 cost-base 优化(CBO), 是发生在 Physical Planning 阶段的, 这里就先卖个关子, 后面说到的时候再讨论吧.
然后看到源码的时候, 会发现 Optimizer 这个类也是继承自 RuleExecutor, 继承这个类之后的流程基本都是一样的. 前面分析 Analysis 阶段的时候已经有详细介绍过这个流程, 这里就不展开说了.
其实这优化器的重点应该是各种优化规则, 这里我觉得更多的是设计到关系代数表达式优化理论方面的知识, 这部分我也不甚精通, 所以也就不说了. 对这块感兴趣的童鞋可以看看网上别人的文章, 这里顺便列几个可能有帮助的博客,
sparksql 中外连接查询中的谓词下推处理
数据库查询优化入门: 代数与物理优化基础 https://www.jianshu.com/p/edf503a2a1e7
「 数据库原理 」查询优化(关系代数表达式优化)
下面还是来看看最开始的例子进行 Optimization 阶段后会变成什么样吧, 先看看之前的示例代码:
- val df = Seq((1, 1)).toDF("key", "value")
- df.createOrReplaceTempView("src")
- val queryCaseWhen = sql("select key from src")
然后在 Optimization 优化阶段后, 变成了:
- Project [_1#2 AS key#5]
- +- LocalRelation [_1#2, _2#3]
好吧, 看起来没什么变化, 与 Analysis 阶段相比, 也就少了个 SubqueryAlias , 符合预期. 不过也对, 就一条 SELECT 语句能优化到哪去啊.
Physical Planning 生成阶段概述
相比较于 Logical Plan,Physical plan 算是 Spark 可以去执行的东西了, 当然本质上它也是一棵树.
前面说到, Spark 有一种 cost-based 的优化. 主要就在这一阶段, 在这一阶段, 会生成一个或多个 Physical Plan, 然后使用 cost model 预估各个 Physical Plan 的处理性能, 最后选择一个最优的 Physical Plan. 这里最主要优化的是 join 操作, 当触发 join 操作的时候, 会根据左右两边的数据集判断, 然后决定使用 Broadcast join, 还是传统的 Hash join, 抑或是 MergeSort join, 有关这几种 join 的区别这里就不详细解释了, 有兴趣童鞋可以百度看看.
除了 cost-based 优化, 这一阶段也依旧会有 rule-based 优化, 所以说 RuleExecutor 这个类是很重要的, 前面提到的 Analysis 阶段也好, Optimization 阶段也好, 包括这里的 Physical Plan 阶段, 只要是涉及到 rule-based 优化, 都会跟 RuleExecutor 这个类扯上关系. 当然这样无疑是极大使用了面向对象的特性, 不同的阶段编写不同的 rule 就行, 一次编写, 到处复用.
Physical Planning 源码分析
首先是在 QueryExecution 中调度,
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
...... 其他代码
- lazy val sparkPlan: SparkPlan = {
- SparkSession.setActiveSession(sparkSession)
- // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
- // but we will implement to choose the best plan.
- planner.plan(ReturnAnswer(optimizedPlan)).next()
- }
...... 其他代码
}
这里的 planner 是 org.apache.spark.sql.execution.SparkPlanner 这个类, 而这个类继承自 org.apache.spark.sql.catalyst.planning.QueryPlanner,plan()方法也是在父类 QueryPlanner 中实现的. 和 RuleExecution 类似, QueryPlanner 中有一个返回 Seq[GenericStrategy[PhysicalPlan]]的方法: def strategies: Seq[GenericStrategy[PhysicalPlan]], 这个方法会在子类 (也就是 SparkPlanner) 重写, 然后被 QueryPlanner 的 plan()方法调用.
我们来看看 SparkPlanner 中 strategies 方法的重写, 再来看 QueryPlanner 的 plan()方法吧.
- class SparkPlanner(
- val sparkContext: SparkContext,
- val conf: SQLConf,
- val experimentalMethods: ExperimentalMethods)
- extends SparkStrategies {
...... 其他代码
- override def strategies: Seq[Strategy] =
- experimentalMethods.extraStrategies ++
- extraPlanningStrategies ++ (
- PythonEvals ::
- DataSourceV2Strategy ::
- FileSourceStrategy ::
- DataSourceStrategy(conf) ::
- SpecialLimits ::
- Aggregation ::
- Windows ::
- JoinSelection ::
- InMemoryScans ::
- BasicOperators :: Nil)
...... 其他代码
strategies()返回策略列表, 是生成策略 GenericStrategy, 这是个具体的抽象类, 位于 org.apache.spark.sql.catalyst.planning 包. 所谓生成策略, 就是决定如果根据 Logical Plan 生成 Physical Plan 的策略. 比如上面介绍的 join 操作可以生成 Broadcast join,Hash join, 抑或是 MergeSort join, 就是一种生成策略, 具体的类就是上面代码中的 JoinSelection. 每个生成策略 GenericStrategy 都是 object, 其 apply()方法返回的是 Seq[SparkPlan], 这里的 SparkPlan 就是 PhysicalPlan(注意: 下文会将 SparkPlan 和 PhysicalPlan 混着用).
明白了生成策略后, 就可以来看看 QueryPlanner 的 plan()方法了.
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
...... 其他代码
- def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
- // Obviously a lot to do here still...
- // Collect physical plan candidates.
- val candidates = strategies.iterator.flatMap(_(plan)) // 迭代调用并平铺, 变成 Iterator[SparkPlan]
- // The candidates may contain placeholders marked as [[planLater]],
- // so try to replace them by their child plans.
- val plans = candidates.flatMap { candidate =>
- val placeholders = collectPlaceholders(candidate)
- if (placeholders.isEmpty) {
- // Take the candidate as is because it does not contain placeholders.
- Iterator(candidate)
- } else {
- // Plan the logical plan marked as [[planLater]] and replace the placeholders.
- placeholders.iterator.foldLeft(Iterator(candidate)) {
- case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
- // Plan the logical plan for the placeholder.
- val childPlans = this.plan(logicalPlan)
- candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
- childPlans.map { childPlan =>
- // Replace the placeholder by the child plan
- candidateWithPlaceholders.transformUp {
- case p if p.eq(placeholder) => childPlan
- }
- }
- }
- }
- }
- }
- val pruned = prunePlans(plans)
- assert(pruned.hasNext, s"No plan for $plan")
- pruned
- }
...... 其他代码
}
这里的流程其实不难, 主要工作其实就是调用各个生成策略 GenericStrategy 的 apply()方法, 生成 Iterator[SparkPlan]. 后面很大部分代码是处理占位符, 按我的理解, 在生成 Logical Plan 的时候, 可能有些无意义的占位符, 这种需要使用子节点替换调它. 倒数第三行 prunePlans()方法按注释说是用来去掉 bad plan 的, 但看实际代码只是原封不动返回.
这样最终就得到一个 Iterator[SparkPlan], 每个 SparkPlan 就是可执行的物理操作了.
大致流程就是如此, 当然具体到一些生成策略没有细说, 包括输入源策略, 聚合策略等等, 每一个都蛮复杂的, 这里就不细说, 有兴趣可以自行查阅.
对了, 最后还要看看示例代码到这一步变成什么样了, 先上示例代码:
- // 生成 DataFrame
- val df = Seq((1, 1)).toDF("key", "value")
- df.createOrReplaceTempView("src")
- // 调用 spark.sql
- val queryCaseWhen = sql("select key from src")
经过 Physical Planning 阶段后, 变成如下:
- Project [_1#2 AS key#5]
- +- LocalTableScan [_1#2, _2#3]
对比上面的 optimized 阶段, 直观看就是 LocalRelation 变成 LocalTableScan. 变得更加具体了, 但实际上, Project 也变了, 虽然打印名字相同, 但一个的类型是 Project, 本质上是 LogicalPlan. 而一个是 ProjectExec, 本质上是 SparkPlan(也就是 PhysicalPlan). 这一点通过断点看的更清楚.
到这一步已经很解决终点了, 后面再经过一个 Preparations 阶段就能生成 RDD 了, 剩下的部分留待下篇介绍吧.
以上~
来源: https://www.cnblogs.com/listenfwind/p/12886205.html