Spark SQL 原理解析前言:
Spark SQL 源码剖析 (一)SQL 解析框架 Catalyst 流程概述
这一次要开始真正介绍 Spark 解析 SQL 的流程, 首先是从 Sql Parse 阶段开始, 简单点说, 这个阶段就是使用 Antlr4, 将一条 Sql 语句解析成语法树.
可能有童鞋没接触过 antlr4 这个内容, 推荐看看《antlr4 权威指南》前四章, 看完起码知道 antlr4 能干嘛. 我这里就不多介绍了.
这篇首先先介绍调用 spark.sql() 时候的流程, 再看看 antlr4 在这个其中的主要功能, 最后再将探究 Logical Plan 究竟是什么东西.
初始流程
当你调用 spark.sql 的时候, 会调用下面的方法:
- def sql(sqlText: String): DataFrame = {
- Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
- }
parse sql 阶段主要是 parsePlan(sqlText) 这一部分. 而这里又会辗转去 org.apache.spark.sql.catalyst.parser.AbstractSqlParser 调用 parse 方法. 这里贴下关键代码.
- protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
- logDebug(s"Parsing command: $command")
- val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
- lexer.removeErrorListeners()
- lexer.addErrorListener(ParseErrorListener)
- lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
- val tokenStream = new CommonTokenStream(lexer)
- val parser = new SqlBaseParser(tokenStream)
- parser.addParseListener(PostProcessor)
- parser.removeErrorListeners()
- parser.addErrorListener(ParseErrorListener)
- parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
- try {
- try {
- // first, try parsing with potentially faster SLL mode
- parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
- toResult(parser)
- }
- catch {
- case e: ParseCancellationException =>
- // if we fail, parse with LL mode
- tokenStream.seek(0) // rewind input stream
- parser.reset()
- // Try Again.
- parser.getInterpreter.setPredictionMode(PredictionMode.LL)
- toResult(parser)
- }
- }
- catch {
- case e: ParseException if e.command.isDefined =>
- throw e
- case e: ParseException =>
- throw e.withCommand(command)
- case e: AnalysisException =>
- val position = Origin(e.line, e.startPosition)
- throw new ParseException(Option(command), e.message, position, position)
- }
- }
可以发现, 这里面的处理逻辑, 无论是 SqlBaseLexer 还是 SqlBaseParser 都是 Antlr4 的东西, 包括最后的 toResult(parser) 也是调用访问者模式的类去遍历语法树来生成 Logical Plan. 如果对 antlr4 有一定了解, 那么对这里这些东西一定不会陌生. 那我们接下来看看 Antlr4 在这其中的角色.
Antlr4 生成语法树
Spark 提供了一个. g4 文件, 编译的时候会使用 Antlr 根据这个. g4 生成对应的词法分析类和语法分析类, 同时还使用了访问者模式, 用以构建 Logical Plan(语法树).
访问者模式简单说就是会去遍历生成的语法树 (针对语法树中每个节点生成一个 visit 方法), 以及返回相应的值. 我们接下来看看一条简单的 select 语句生成的树是什么样子.
这个 sqlBase.g4 文件我们也可以直接拿出来玩, 直接复制出来, 用 antlr 相关工具就可以生成一个生成一个解析 SQL 的图了.
这里 antlr4 和 grun 都已经存储成 bat 文件, 所以可以直接调用, 实际命令在《antlr4 权威指南》说得很详细了就不介绍了. 调用完后就会生成这样的语法树.
这里, 将 SELECT TABLE_A.B FROM TABLE_A, 转换成一棵语法树. 我们可以看到这颗语法树非常复杂, 这是因为 SQL 解析中, 要适配这种 SELECT 语句之外, 还有很多其他类型的语句, 比如 INSERT,ALERT 等等. Spark SQL 这个模块的最终目标, 就是将这样的一棵语法树转换成一个可执行的 Dataframe(RDD).
我们现阶段的目标则是要先生成 Logical Plan,Spark 使用 Antlr4 的访问者模式, 生成 Logical Plan. 这里顺便说下怎么实现访问者模式吧, 在使用 antlr4 命令的时候, 加上 - visit 参数就会生成 SqlBaseBaseVisitor, 里面提供了默认的访问各个节点的触发方法. 我们可以通过继承这个类, 重写对应节点的 visit 方法, 实现自己的访问逻辑, 而这个继承的类就是 org.apache.spark.sql.catalyst.parser.AstBuilder.
通过观察这棵树, 我们可以发现针对我们的 SELECT 语句, 比较重要的一个节点, 是 querySpecification 节点, 实际上, 在 AstBuilder 类中, visitQuerySpecification 也是比较重要的一个方法 (访问对应节点时触发), 正是在这个方法中生成主要的 Logical Plan 的.
接下来重点看这个方法, 以及探究 Logical Plan.
生成 Logical Plan
我们先看看 AstBuilder 中的代码:
class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging {
...... 其他代码
- override def visitQuerySpecification(
- ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
- val from = OneRowRelation().optional(ctx.fromClause) { // 如果有 FROM 语句, 生成对应的 Logical Plan
- visitFromClause(ctx.fromClause)
- }
- withQuerySpecification(ctx, from)
- }
...... 其他代码
代码中会先判断是否有 FROM 子语句, 有的话会去生成对应的 Logical Plan, 再调用 withQuerySpecification() 方法, 而 withQuerySpecification() 方法是比较核心的一个方法. 它会处理包括 SELECT,FILTER,GROUP BY,HAVING 等子语句的逻辑.
代码比较长就不贴了, 有兴趣的童鞋可以去看看, 大意就是使用 scala 的模式匹配, 匹配不同的子语句生成不同的 Logical Plan.
然后再来说说最终生成的 LogicalPlan,LogicalPlan 其实是继承自 TreeNode, 所以本质上 LogicalPlan 就是一棵树.
而实际上, LogicalPlan 还有多个子类, 分别表示不同的 SQL 子语句.
LeafNode, 叶子节点, 一般用来表示用户命令
UnaryNode, 一元节点, 表示 FILTER 等操作
BinaryNode, 二元节点, 表示 JOIN,GROUP BY 等操作
这里一元二元这些都是对应关系代数方面的知识, 在学数据库理论的时候肯定有接触过, 不过估计都还给老师了吧 (/ 偷笑). 不过一元二元基本上也就是用来区分具体的操作, 如上面说的 FILTER, 或是 JOIN 等, 也不是很复杂. 这三个类都位于 org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 中, 有兴趣的童鞋可以看看. 而后, 这三个类又会有多个子类, 用以表示不同的情况, 这里就不再赘述.
最后看看用一个测试案例, 看看会生成什么吧. 示例中简单生成一个临时的 view, 然后直接 select 查询这个 view. 代码如下:
- val df = Seq((1, 1)).toDF("key", "value")
- df.createOrReplaceTempView("src")
- val queryCaseWhen = sql("select key from src")
最终经过 parse SQL 后会变成如下的内容:
- 'Project ['key]
- +- 'UnresolvedRelation `src`
这个 Project 是 UnaryNode 的一个子类 (SELECT 自然是一元节点), 表明我们要查询的字段是 key.
UnresolvedRelation 是一个新的概念, 这里顺便说下, 我们通过 SQL parse 生成的这棵树, 其实叫 Unresolved LogicalPlan, 这里的 Unresolved 的意思说, 还不知道 src 是否存在, 或它的元数据是什么样, 只有通过 Analysis 阶段后, 才会把 Unresolved 变成 Resolved LogicalPlan. 这里的意思可以理解为, 读取名为 src 的表, 但这张表的情况未知, 有待验证.
总的来说, 我们的示例足够简单直接, 所以内容会比较少, 不过拿来学习是足够了.
下一个阶段是要使用这棵树进行分析验证了, 也就是 Analysis 阶段, 这一块留到下篇介绍吧.
以上~
来源: https://www.cnblogs.com/listenfwind/p/12735833.html