第二篇:Spark SQL Catalyst源码分析之SqlParser

/** Spark SQL源码分析系列文章*/

Spark SQL的核心执行流程我们已经分析完毕,可以参见Spark SQL核心执行流程,下面我们来分析执行流程中各个核心组件的工作职责。

本文先从入口开始分析,即如何解析SQL文本生成逻辑计划的,主要设计的核心组件式SqlParser是一个SQL语言的解析器,用scala实现的Parser将解析的结果封装为Catalyst TreeNode ,关于Catalyst这个框架后续文章会介绍。

一、SQL Parser入口

Sql Parser 其实是封装了scala.util.parsing.combinator下的诸多Parser,并结合Parser下的一些解析方法,构成了Catalyst的组件UnResolved Logical Plan。

先来看流程图:

一段SQL会经过SQL Parser解析生成UnResolved Logical Plan(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)。

在源代码里是:

[java] view plain copy

  1. def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))//sql("select name,value from temp_shengli") 实例化一个SchemaRDD
  2. protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql) //实例化SqlParser
  3. class SqlParser extends StandardTokenParsers with PackratParsers {
  4. def apply(input: String): LogicalPlan = {  //传入sql语句调用apply方法,input参数即sql语句
  5. // Special-case out set commands since the value fields can be
  6. // complex to handle without RegexParsers. Also this approach
  7. // is clearer for the several possible cases of set commands.
  8. if (input.trim.toLowerCase.startsWith("set")) {
  9. input.trim.drop(3).split("=", 2).map(_.trim) match {
  10. case Array("") => // "set"
  11. SetCommand(None, None)
  12. case Array(key) => // "set key"
  13. SetCommand(Some(key), None)
  14. case Array(key, value) => // "set key=value"
  15. SetCommand(Some(key), Some(value))
  16. }
  17. } else {
  18. phrase(query)(new lexical.Scanner(input)) match {
  19. case Success(r, x) => r
  20. case x => sys.error(x.toString)
  21. }
  22. }
  23. }

1.  当我们调用sql("select name,value from temp_shengli")时,实际上是new了一个SchemaRDD

2. new SchemaRDD时,构造方法调用parseSql方法,parseSql方法实例化了一个SqlParser,这个Parser初始化调用其apply方法。

3. apply方法分支:

3.1 如果sql命令是set开头的就调用SetCommand,这个类似Hive里的参数设定,SetCommand其实是一个Catalyst里TreeNode之LeafNode,也是继承自LogicalPlan,关于Catalyst的TreeNode库这个暂不详细介绍,后面会有文章来详细讲解。

3.2 关键是else语句块里,才是SqlParser解析SQL的核心代码:

[java] view plain copy

  1. phrase(query)(new lexical.Scanner(input)) match {
  2. case Success(r, x) => r
  3. case x => sys.error(x.toString)
  4. }

可能 phrase方法大家很陌生,不知道是干什么的,那么我们首先看一下SqlParser的类图:

SqlParser类继承了scala内置集合Parsers,这个Parsers。我们可以看到SqlParser现在是具有了分词的功能,也能解析combiner的语句(类似p ~> q,后面会介绍)。

  Phrase方法:

[java] view plain copy

  1. /** A parser generator delimiting whole phrases (i.e. programs).
  2. *
  3. *  `phrase(p)` succeeds if `p` succeeds and no input is left over after `p`.
  4. *
  5. *  @param p the parser that must consume all input for the resulting parser
  6. *           to succeed.
  7. *  @return  a parser that has the same result as `p`, but that only succeeds
  8. *           if `p` consumed all the input.
  9. */
  10. def phrase[T](p: Parser[T]) = new Parser[T] {
  11. def apply(in: Input) = lastNoSuccessVar.withValue(None) {
  12. p(in) match {
  13. case s @ Success(out, in1) =>
  14. if (in1.atEnd)
  15. s
  16. else
  17. lastNoSuccessVar.value filterNot { _.next.pos < in1.pos } getOrElse Failure("end of input expected", in1)
  18. case ns => lastNoSuccessVar.value.getOrElse(ns)
  19. }
  20. }
  21. }

Phrase是一个循环读取输入字符的方法,如果输入in没有到达最后一个字符,就继续对parser进行解析,直到最后一个输入字符。

我们注意到Success这个类,出现在Parser里, 在else块里最终返回的也有Success:

[java] view plain copy

  1. /** The success case of `ParseResult`: contains the result and the remaining input.
  2. *
  3. *  @param result The parser‘s output
  4. *  @param next   The parser‘s remaining input
  5. */
  6. case class Success[+T](result: T, override val next: Input) extends ParseResult[T] {

通过源码可知,Success封装了当前解析器的解析结果result, 和还没有解析的语句。

所以上面判断了Success的解析结果中in1.atEnd? 如果输入流结束了,就返回s,即Success对象,这个Success包含了SqlParser解析的输出。

二、Sql Parser核心

在SqlParser里phrase接受2个参数:

第一个是query,一种带模式的解析规则,返回的是LogicalPlan。

第二个是lexical词汇扫描输入。

SqlParser parse的流程是,用lexical词汇扫描接受SQL关键字,使用query模式来解析符合规则的SQL。

2.1 lexical keyword

在SqlParser里定义了KeyWord这个类:

[java] view plain copy

  1. protected case class Keyword(str: String)

在我使用的spark1.0.0版本里目前只支持了一下SQL保留字:

[java] view plain copy

  1. protected val ALL = Keyword("ALL")
  2. protected val AND = Keyword("AND")
  3. protected val AS = Keyword("AS")
  4. protected val ASC = Keyword("ASC")
  5. protected val APPROXIMATE = Keyword("APPROXIMATE")
  6. protected val AVG = Keyword("AVG")
  7. protected val BY = Keyword("BY")
  8. protected val CACHE = Keyword("CACHE")
  9. protected val CAST = Keyword("CAST")
  10. protected val COUNT = Keyword("COUNT")
  11. protected val DESC = Keyword("DESC")
  12. protected val DISTINCT = Keyword("DISTINCT")
  13. protected val FALSE = Keyword("FALSE")
  14. protected val FIRST = Keyword("FIRST")
  15. protected val FROM = Keyword("FROM")
  16. protected val FULL = Keyword("FULL")
  17. protected val GROUP = Keyword("GROUP")
  18. protected val HAVING = Keyword("HAVING")
  19. protected val IF = Keyword("IF")
  20. protected val IN = Keyword("IN")
  21. protected val INNER = Keyword("INNER")
  22. protected val INSERT = Keyword("INSERT")
  23. protected val INTO = Keyword("INTO")
  24. protected val IS = Keyword("IS")
  25. protected val JOIN = Keyword("JOIN")
  26. protected val LEFT = Keyword("LEFT")
  27. protected val LIMIT = Keyword("LIMIT")
  28. protected val MAX = Keyword("MAX")
  29. protected val MIN = Keyword("MIN")
  30. protected val NOT = Keyword("NOT")
  31. protected val NULL = Keyword("NULL")
  32. protected val ON = Keyword("ON")
  33. protected val OR = Keyword("OR")
  34. protected val OVERWRITE = Keyword("OVERWRITE")
  35. protected val LIKE = Keyword("LIKE")
  36. protected val RLIKE = Keyword("RLIKE")
  37. protected val UPPER = Keyword("UPPER")
  38. protected val LOWER = Keyword("LOWER")
  39. protected val REGEXP = Keyword("REGEXP")
  40. protected val ORDER = Keyword("ORDER")
  41. protected val OUTER = Keyword("OUTER")
  42. protected val RIGHT = Keyword("RIGHT")
  43. protected val SELECT = Keyword("SELECT")
  44. protected val SEMI = Keyword("SEMI")
  45. protected val STRING = Keyword("STRING")
  46. protected val SUM = Keyword("SUM")
  47. protected val TABLE = Keyword("TABLE")
  48. protected val TRUE = Keyword("TRUE")
  49. protected val UNCACHE = Keyword("UNCACHE")
  50. protected val UNION = Keyword("UNION")
  51. protected val WHERE = Keyword("WHERE")

这里根据这些保留字,反射,生成了一个SqlLexical

[java] view plain copy

  1. override val lexical = new SqlLexical(reservedWords)

SqlLexical利用它的Scanner这个Parser来读取输入,传递给query。

2.2 query

query的定义是Parser[LogicalPlan]  和 一堆奇怪的连接符(其实都是Parser的方法啦,看上图),*,~,^^^,看起来很让人费解。通过查阅读源码,以下列出几个常用的:

|  is the alternation combinator. It says “succeed if either the left or right operand parse successfully” 
左边算子和右边的算子只要有一个成功了,就返回succeed,类似or

~ is the sequential combinator. It says “succeed if the left operand parses successfully, and then the right parses successfully on the remaining input”
左边的算子成功后,右边的算子对后续的输入也计算成功,就返回succeed

opt  `opt(p)` is a parser that returns `Some(x)` if `p` returns `x` and `None` if `p` fails.
如果p算子成功则返回则返回Some(x) 如果p算子失败,返回fails

^^^ `p ^^^ v` succeeds if `p` succeeds; discards its result, and returns `v` instead.
如果左边的算子成功,取消左边算子的结果,返回右边算子。

~> says “succeed if the left operand parses successfully followed by the right, but do not include the left content in the result”
如果左边的算子和右边的算子都成功了,返回的结果中不包含左边的返回值。
  protected lazy val limit: Parser[Expression] =
    LIMIT ~> expression

<~ is the reverse, “succeed if the left operand is parsed successfully followed by the right, but do not include the right content in the result”
这个和~>操作符的意思相反,如果左边的算子和右边的算子都成功了,返回的结果中不包含右边的
    termExpression <~ IS ~ NOT ~ NULL ^^ { case e => IsNotNull(e) } |

^^{} 或者 ^^=> is the transformation combinator. It says “if the left operand parses successfully, transform the result using the function on the right”
rep => simply says “expect N-many repetitions of parser X” where X is the parser passed as an argument to rep
变形连接符,意思是如果左边的算子成功了,用^^右边的算子函数作用于返回的结果

接下来看query的定义:

[java] view plain copy

  1. protected lazy val query: Parser[LogicalPlan] = (
  2. select * (
  3. UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
  4. UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
  5. )
  6. | insert | cache
  7. )

没错,返回的是一个Parser,里面的类型是LogicalPlan。

query的定义其实是一种模式,用到了上述的诸多操作符,如|, ^^, ~> 等等

给定一种sql模式,如select,select xxx from yyy where ccc =ddd  如果匹配这种写法,则返回Success,否则返回Failure.

这里的模式是select 模式后面可以接union all 或者 union distinct。

即如下书写式合法的,否则出错。

[java] view plain copy

  1. select a,b from c
  2. union all
  3. select e,f from g

这个 *号是一个repeat符号,即可以支持多个union all 子句。

看来目前spark1.0.0只支持这三种模式,即select, insert, cache。

那到底是怎么生成LogicalPlan的呢? 我们再看一个详细的:

[java] view plain copy

  1. protected lazy val select: Parser[LogicalPlan] =
  2. SELECT ~> opt(DISTINCT) ~ projections ~
  3. opt(from) ~ opt(filter) ~
  4. opt(grouping) ~
  5. opt(having) ~
  6. opt(orderBy) ~
  7. opt(limit) <~ opt(";") ^^ {
  8. case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
  9. val base = r.getOrElse(NoRelation)
  10. val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
  11. val withProjection =
  12. g.map {g =>
  13. Aggregate(assignAliases(g), assignAliases(p), withFilter)
  14. }.getOrElse(Project(assignAliases(p), withFilter))
  15. val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
  16. val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
  17. val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
  18. val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
  19. withLimit
  20. }

这里我给称它为select模式。

看这个select语句支持什么模式的写法:

select  distinct  projections from filter grouping having orderBy limit. 

给出一个符合的该select 模式的sql, 注意到 带opt连接符的是可选的,可以写distinct也可以不写。

[java] view plain copy

  1. select  game_id, user_name from game_log where date<=‘2014-07-19‘ and user_name=‘shengli‘ group by game_id having game_id > 1 orderBy game_id limit 50.

projections是什么呢?

其实是一个表达式,是一个Seq类型,一连串的表达式可以使 game_id也可以是 game_id AS gmid 。

返回的确实是一个Expression,是Catalyst里TreeNode。

[java] view plain copy

  1. protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
  2. protected lazy val projection: Parser[Expression] =
  3. expression ~ (opt(AS) ~> opt(ident)) ^^ {
  4. case e ~ None => e
  5. case e ~ Some(a) => Alias(e, a)()
  6. }

模式里from是什么的?

其实是一个relations,就是一个关系,在SQL里可以是表,表join表

[java] view plain copy

  1. protected lazy val from: Parser[LogicalPlan] = FROM ~> relations

[java] view plain copy

  1. protected lazy val relation: Parser[LogicalPlan] =
  2. joinedRelation |
  3. relationFactor
  4. protected lazy val relationFactor: Parser[LogicalPlan] =
  5. ident ~ (opt(AS) ~> opt(ident)) ^^ {
  6. case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
  7. } |
  8. "(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }
  9. protected lazy val joinedRelation: Parser[LogicalPlan] =
  10. relationFactor ~ opt(joinType) ~ JOIN ~ relationFactor ~ opt(joinConditions) ^^ {
  11. case r1 ~ jt ~ _ ~ r2 ~ cond =>
  12. Join(r1, r2, joinType = jt.getOrElse(Inner), cond)
  13. }

这里看出来,其实就是table之间的操作,但是返回的Subquery确实是一个LogicalPlan

[java] view plain copy

  1. case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
  2. override def output = child.output.map(_.withQualifiers(alias :: Nil))
  3. override def references = Set.empty
  4. }

scala里的语法糖很多,这样写的确比较方便,但是对初学者可能有点晦涩了。

至此我们知道,SqlParser是怎么生成LogicalPlan的了。

三、总结

本文从源代码剖析了Spark Catalyst 是如何将Sql解析成Unresolved逻辑计划(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)的。

sql文本作为输入,实例化了SqlParser,SqlParser的apply方法被调用,分别处理2种输入,一种是命令参数,一种是sql。对应命令参数的会生成一个叶子节点,SetCommand,对于sql语句,会调用Parser的phrase方法,由lexical的Scanner来扫描输入,分词,最后由query这个由我们定义好的sql模式利用parser的连接符来验证是否符合sql标准,如果符合则随即生成LogicalPlan语法树,不符合则会提示解析失败。

通过对spark catalyst sql parser的解析,使我理解了,sql语言的语法标准是如何实现的和如何解析sql生成逻辑计划语法树。

——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/37943507

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

转自:http://blog.csdn.net/oopsoom/article/details/37943507

时间: 2024-12-29 01:58:16

第二篇:Spark SQL Catalyst源码分析之SqlParser的相关文章

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark SQL Catalyst源码分析之Optimizer

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以

Spark SQL Catalyst源码分析之Physical Plan

前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized L

第六篇:Spark SQL Catalyst源码分析之Physical Plan

/** Spark SQL源码分析系列文章*/ 前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有S

第三篇:Spark SQL Catalyst源码分析之Analyzer

/** Spark SQL源码分析系列文章*/ 前面几篇文章讲解了Spark SQL的核心执行流程和Spark SQL的Catalyst框架的Sql Parser是怎样接受用户输入sql,经过解析生成Unresolved Logical Plan的.我们记得Spark SQL的执行流程中另一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用. Analyzer位于Catalyst的analysis package下,主要职责是将Sql Parser

第四篇:Spark SQL Catalyst源码分析之TreeNode Library

/** Spark SQL源码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Lib

第五篇:Spark SQL Catalyst源码分析之Optimizer

/** Spark SQL源码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化

第八篇:Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp