sparkSQL1.1入门之三:sparkSQL组件之解析

上篇在总体上介绍了sparkSQL的运行架构及其基本实现方法(Tree和Rule的配合),也大致介绍了sparkSQL中涉及到的各个概念和组件。本篇将详细地介绍一下关键的一些概念和组件,由于hiveContext继承自sqlContext,关键的概念和组件类似,只不过后者针对hive的特性做了一些修正和重写,所以本篇就只介绍sqlContext的关键的概念和组件。

  • 概念:

    • LogicalPlan
  • 组件:
    • SqlParser
    • Analyzer
    • Optimizer
    • Planner

1:LogicalPlan

在sparkSQL的运行架构中,LogicalPlan贯穿了大部分的过程,其中catalyst中的SqlParser、Analyzer、Optimizer都要对LogicalPlan进行操作。LogicalPlan的定义如下:


abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
  self: Product =>
  case class Statistics(
    sizeInBytes: BigInt
  )
  lazy val statistics: Statistics = {
    if (children.size == 0) {
      throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
    }

    Statistics(
      sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
  }

  /**
   * Returns the set of attributes that this node takes as
   * input from its children.
   */
  lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))

  /**
   * Returns true if this expression and all its children have been resolved to a specific schema
   * and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
   * can override this (e.g.
   * [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
   * should return `false`).
   */
  lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved

  /**
   * Returns true if all its children of this query plan have been resolved.
   */
  def childrenResolved: Boolean = !children.exists(!_.resolved)

  /**
   * Optionally resolves the given string to a [[NamedExpression]] using the input from all child
   * nodes of this LogicalPlan. The attribute is expressed as
   * as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
   */
  def resolveChildren(name: String): Option[NamedExpression] =
    resolve(name, children.flatMap(_.output))

  /**
   * Optionally resolves the given string to a [[NamedExpression]] based on the output of this
   * LogicalPlan. The attribute is expressed as string in the following form:
   * `[scope].AttributeName.[nested].[fields]...`.
   */
  def resolve(name: String): Option[NamedExpression] =
    resolve(name, output)

  /** Performs attribute resolution given a name and a sequence of possible attributes. */
  protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = {
    val parts = name.split("\\.")
    val options = input.flatMap { option =>
      val remainingParts =
        if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
      if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
    }

    options.distinct match {
      case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
      // One match, but we also need to extract the requested nested field.
      case Seq((a, nestedFields)) =>
        a.dataType match {
          case StructType(fields) =>
            Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
          case _ => None // Don‘t know how to resolve these field references
        }
      case Seq() => None         // No matches.
      case ambiguousReferences =>
        throw new TreeNodeException(
          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
    }
  }
}

在LogicalPlan里维护者一套统计数据和属性数据,也提供了解析方法。同时延伸了三种类型的LogicalPlan:

  • LeafNode:对应于trees.LeafNode的LogicalPlan
  • UnaryNode:对应于trees.UnaryNode的LogicalPlan
  • BinaryNode:对应于trees.BinaryNode的LogicalPlan

而对于SQL语句解析时,会调用和SQL匹配的操作方法来进行解析;这些操作分四大类,最终生成LeafNode、UnaryNode、BinaryNode中的一种:

  • basicOperators:一些数据基本操作,如Ioin、Union、Filter、Project、Sort
  • commands:一些命令操作,如SetCommand、CacheCommand
  • partitioning:一些分区操作,如RedistributeData
  • ScriptTransformation:对脚本的处理,如ScriptTransformation
  • LogicalPlan类的总体架构如下所示

2:SqlParser

SqlParser的功能就是将SQL语句解析成Unresolved LogicalPlan。现阶段的SqlParser语法解析功能比较简单,支持的语法比较有限。其解析过程中有两个关键组件和一个关键函数:

  • 词法读入器SqlLexical,其作用就是将输入的SQL语句进行扫描、去空、去注释、校验、分词等动作。
  • SQL语法表达式query,其作用定义SQL语法表达式,同时也定义了SQL语法表达式的具体实现,即将不同的表达式生成不同sparkSQL的Unresolved LogicalPlan。
  • 函数phrase(),上面个两个组件通过调用phrase(query)(new lexical.Scanner(input)),完成对SQL语句的解析;在解析过程中,SqlLexical一边读入,一边解析,如果碰上生成符合SQL语法的表达式时,就调用相应SQL语法表达式的具体实现函数,将SQL语句解析成Unresolved
    LogicalPlan。

下面看看sparkSQL的整个解析过程和相关组件:

A:解析过程

首先,在sqlContext中使用下面代码调用catalyst.SqlParser:


/*源自 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */

protected[sql] val parser = new catalyst.SqlParser

protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

然后,直接在SqlParser的apply方法中对输入的SQL语句进行解析,解析功能的核心代码就是:

phrase(query)(new lexical.Scanner(input))


/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */

class SqlParser extends StandardTokenParsers with PackratParsers {
  def apply(input: String): LogicalPlan = {
    if (input.trim.toLowerCase.startsWith("set")) {
      //set设置项的处理

......
    } else {
      phrase(query)(new lexical.Scanner(input)) match {
        case Success(r, x) => r
        case x => sys.error(x.toString)
      }
    }
  }

......

可以看得出来,该语句就是调用phrase()函数,使用SQL语法表达式query,对词法读入器lexical读入的SQL语句进行解析,其中词法读入器lexical通过重写语句:override val lexical = new SqlLexical(reservedWords) 调用扩展了功能的SqlLexical。其定义:


/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */

// Use reflection to find the reserved words defined in this class.
  protected val reservedWords =
    this.getClass
      .getMethods
      .filter(_.getReturnType == classOf[Keyword])
      .map(_.invoke(this).asInstanceOf[Keyword].str)

  override val lexical = new SqlLexical(reservedWords)

为了加深对SQL语句解析过程的理解,让我们看看下面这个简单数字表达式解析过程来说明:


import scala.util.parsing.combinator.PackratParsers
import scala.util.parsing.combinator.syntactical._

object mylexical extends StandardTokenParsers with PackratParsers {
  //定义分割符
  lexical.delimiters ++= List(".", ";", "+", "-", "*")
  //定义表达式,支持加,减,乘
  lazy val expr: PackratParser[Int] = plus | minus | multi
  //加法表示式的实现
  lazy val plus: PackratParser[Int] = num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt}
  //减法表达式的实现
  lazy val minus: PackratParser[Int] = num ~ "-" ~ num ^^ { case n1 ~ "-" ~ n2 => n1.toInt - n2.toInt}
  //乘法表达式的实现
  lazy val multi: PackratParser[Int] = num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt}
  lazy val num = numericLit

  def parse(input: String) = {

//定义词法读入器myread,并将扫描头放置在input的首位

val myread = new PackratReader(new lexical.Scanner(input))
    print("处理表达式 " + input)
    phrase(expr)(myread) match {
      case Success(result, _) => println(" Success!"); println(result); Some(result)
      case n => println(n); println("Err!"); None
    }
  }

  def main(args: Array[String]) {
    val prg = "6 * 3" :: "24-/*aaa*/4" :: "a+5" :: "21/3" :: Nil
    prg.map(parse)
  }
}

运行结果:


处理表达式 6 * 3 Success!     //lexical对空格进行了处理,得到6*3
18     //6*3符合乘法表达式,调用n1.toInt * n2.toInt,得到结果并返回

处理表达式 24-/*aaa*/4 Success!  //lexical对注释进行了处理,得到20-4
20    //20-4符合减法表达式,调用n1.toInt - n2.toInt,得到结果并返回
处理表达式 a+5[1.1] failure: number expected
      //lexical在解析到a,发现不是整数型,故报错误位置和内容
a+5
^
Err!
处理表达式 21/3[1.3] failure: ``*‘‘ expected but ErrorToken(illegal character) found
      //lexical在解析到/,发现不是分割符,故报错误位置和内容
21/3
  ^
Err!

在运行的时候,首先对表达式 6 * 3 进行解析,词法读入器myread将扫描头置于6的位置;当phrase()函数使用定义好的数字表达式expr处理6 * 3的时候,6 * 3每读入一个词法,就和expr进行匹配,如读入6*和expr进行匹配,先匹配表达式plus,*和+匹配不上;就继续匹配表达式minus,*和-匹配不上;就继续匹配表达式multi,这次匹配上了,等读入3的时候,因为3是num类型,就调用调用n1.toInt * n2.toInt进行计算。

注意,这里的expr、plus、minus、multi、num都是表达式,|、~、^^是复合因子,表达式和复合因子可以组成一个新的表达式,如plus(num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt})就是一个由num、+、num、函数构成的复合表达式;而expr(plus | minus | multi)是由plus、minus、multi构成的复合表达式;复合因子的含义定义在类scala/util/parsing/combinator/Parsers.scala,下面是几个常用的复合因子:

  • p ~ q p成功,才会q;放回p,q的结果
  • p ~> q p成功,才会q,返回q的结果
  • p <~ q p成功,才会q,返回p的结果
  • p | q p失败则q,返回第一个成功的结果
  • p ^^ f 如果p成功,将函数f应用到p的结果上
  • p ^? f 如果p成功,如果函数f可以应用到p的结果上的话,就将p的结果用f进行转换

针对上面的6 * 3使用的是multi表达式(num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt}),其含义就是:num后跟*再跟num,如果满足就将使用函数n1.toInt * n2.toInt。

到这里为止,大家应该明白整个解析过程了吧,。SqlParser的原理和这个表达式解析器使用了一样的原理,只不过是定义的SQL语法表达式query复杂一些,使用的词法读入器更丰富一些而已。下面分别介绍一下相关组件SqlParser、SqlLexical、query。

B:SqlParser

首先,看看SqlParser的UML图:

其次,看看SqlParser的定义,SqlParser继承自类StandardTokenParsers和特质PackratParsers:

其中,PackratParsers:

  • 扩展了scala.util.parsing.combinator.Parsers所提供的parser,做了内存化处理;
  • Packrat解析器实现了回溯解析和递归下降解析,具有无限先行和线性分析时的优势。同时,也支持左递归词法解析。
  • 从Parsers中继承出来的class或trait都可以使用PackratParsers,如:object MyGrammar extends StandardTokenParsers with PackratParsers;
  • PackratParsers将分析结果进行缓存,因此,PackratsParsers需要PackratReader(内存化处理的Reader)作为输入,程序员可以手工创建PackratReader,如production(new PackratReader(new lexical.Scanner(input))),更多的细节参见scala库中/scala/util/parsing/combinator/PackratParsers.scala文件。

StandardTokenParsers是最终继承自Parsers

  • 增加了词法的处理能力(Parsers是字符处理),在StdTokenParsers中定义了四种基本词法:

    • keyword tokens
    • numeric literal tokens
    • string literal tokens
    • identifier tokens
  • 定义了一个词法读入器lexical,可以进行词法读入

SqlParser在进行解析SQL语句的时候是调用了PackratParsers中phrase():


/*源自 scala/util/parsing/combinator/PackratParsers.scala */

/**
   *  A parser generator delimiting whole phrases (i.e. programs).
   *
   *  Overridden to make sure any input passed to the argument parser
   *  is wrapped in a `PackratReader`.
   */
  override def phrase[T](p: Parser[T]) = {
    val q = super.phrase(p)
    new PackratParser[T] {
      def apply(in: Input) = in match {
        case in: PackratReader[_] => q(in)
        case in => q(new PackratReader(in))
      }
    }
  }

在解析过程中,一般会定义多个表达式,如上面例子中的plus | minus | multi,一旦前一个表达式不能解析的话,就会调用下一个表达式进行解析:


/*源自 scala/util/parsing/combinator/Parsers.scala */

def append[U >: T](p0: => Parser[U]): Parser[U] = { lazy val p = p0 // lazy argument
      Parser{ in => this(in) append p(in)}
    }

表达式解析正确后,具体的实现函数是在PackratParsers中完成:


/*源自 scala/util/parsing/combinator/PackratParsers.scala */

def memo[T](p: super.Parser[T]): PackratParser[T] = {
    new PackratParser[T] {
      def apply(in: Input) = {
        val inMem = in.asInstanceOf[PackratReader[Elem]]

        //look in the global cache if in a recursion
        val m = recall(p, inMem)
        m match {
          //nothing has been done due to recall
          case None =>
            val base = LR(Failure("Base Failure",in), p, None)
            inMem.lrStack = base::inMem.lrStack
            //cache base result
            inMem.updateCacheAndGet(p,MemoEntry(Left(base)))
            //parse the input
            val tempRes = p(in)
            //the base variable has passed equality tests with the cache
            inMem.lrStack = inMem.lrStack.tail
            //check whether base has changed, if yes, we will have a head
            base.head match {
              case None =>
                /*simple result*/
                inMem.updateCacheAndGet(p,MemoEntry(Right(tempRes)))
                tempRes
              case [email protected](_) =>
                /*non simple result*/
                base.seed = tempRes
                //the base variable has passed equality tests with the cache
                val res = lrAnswer(p, inMem, base)
                res
            }

          case Some(mEntry) => {
            //entry found in cache
            mEntry match {
              case MemoEntry(Left(recDetect)) => {
                setupLR(p, inMem, recDetect)
                //all setupLR does is change the heads of the recursions, so the seed will stay the same
                recDetect match {case LR(seed, _, _) => seed.asInstanceOf[ParseResult[T]]}
              }
              case MemoEntry(Right(res: ParseResult[_])) => res.asInstanceOf[ParseResult[T]]
            }
          }
        }
      }
    }
  }

StandardTokenParsers增加了词法处理能力,SqlParers定义了大量的关键字,重写了词法读入器,将这些关键字应用于词法读入器。

C:SqlLexical

词法读入器SqlLexical扩展了StdLexical的功能,首先增加了大量的关键字:


/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */

protected val ALL = Keyword("ALL")
  protected val AND = Keyword("AND")
  protected val AS = Keyword("AS")
  protected val ASC = Keyword("ASC")
  ......
  protected val SUBSTR = Keyword("SUBSTR")
  protected val SUBSTRING = Keyword("SUBSTRING")

其次丰富了分隔符、词法处理、空格注释处理:


/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */

delimiters += (
      "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
      ",", ";", "%", "{", "}", ":", "[", "]"
  )

  override lazy val token: Parser[Token] = (
    identChar ~ rep( identChar | digit ) ^^
      { case first ~ rest => processIdent(first :: rest mkString "") }
      | rep1(digit) ~ opt(‘.‘ ~> rep(digit)) ^^ {
      case i ~ None    => NumericLit(i mkString "")
      case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
    }
      | ‘\‘‘ ~ rep( chrExcept(‘\‘‘, ‘\n‘, EofCh) ) ~ ‘\‘‘ ^^
      { case ‘\‘‘ ~ chars ~ ‘\‘‘ => StringLit(chars mkString "") }
      | ‘\"‘ ~ rep( chrExcept(‘\"‘, ‘\n‘, EofCh) ) ~ ‘\"‘ ^^
      { case ‘\"‘ ~ chars ~ ‘\"‘ => StringLit(chars mkString "") }
      | EofCh ^^^ EOF
      | ‘\‘‘ ~> failure("unclosed string literal")
      | ‘\"‘ ~> failure("unclosed string literal")
      | delim
      | failure("illegal character")
    )

  override def identChar = letter | elem(‘_‘) | elem(‘.‘)

  override def whitespace: Parser[Any] = rep(
    whitespaceChar
      | ‘/‘ ~ ‘*‘ ~ comment
      | ‘/‘ ~ ‘/‘ ~ rep( chrExcept(EofCh, ‘\n‘) )
      | ‘#‘ ~ rep( chrExcept(EofCh, ‘\n‘) )
      | ‘-‘ ~ ‘-‘ ~ rep( chrExcept(EofCh, ‘\n‘) )
      | ‘/‘ ~ ‘*‘ ~ failure("unclosed comment")
  )

最后看看SQL语法表达式query。

D:query

SQL语法表达式支持3种操作:select、insert、cache


/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */

protected lazy val query: Parser[LogicalPlan] = (
    select * (
        UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
        INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
        EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
        UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
      )
    | insert | cache
  )

而这些操作还有具体的定义,如select,这里开始定义了具体的函数,将SQL语句转换成构成Unresolved LogicalPlan的一些Node:


/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */

protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> opt(DISTINCT) ~ projections ~
    opt(from) ~ opt(filter) ~
    opt(grouping) ~
    opt(having) ~
    opt(orderBy) ~
    opt(limit) <~ opt(";") ^^ {
      case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
        val base = r.getOrElse(NoRelation)
        val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
        val withProjection =
          g.map {g =>
            Aggregate(assignAliases(g), assignAliases(p), withFilter)
          }.getOrElse(Project(assignAliases(p), withFilter))
        val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
        val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
        val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
        val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
        withLimit
  }

3:Analyzer

Analyzer的功能就是对来自SqlParser的Unresolved LogicalPlan中的UnresolvedAttribute项和UnresolvedRelation项,对照catalog和FunctionRegistry生成Analyzed LogicalPlan。Analyzer定义了5大类14小类的rule:


/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala */

val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      ResolveSortReferences ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      UnresolvedHavingClauseAttributes ::
      typeCoercionRules :_*),
    Batch("Check Analysis", Once,
      CheckResolution),
    Batch("AnalysisOperators", fixedPoint,
      EliminateAnalysisOperators)
  )
  • MultiInstanceRelations

    • NewRelationInstances
  • CaseInsensitiveAttributeReferences
    • LowercaseAttributeReferences
  • Resolution
    • ResolveReferences
    • ResolveRelations
    • ResolveSortReferences
    • NewRelationInstances
    • ImplicitGenerate
    • StarExpansion
    • ResolveFunctions
    • GlobalAggregates
    • UnresolvedHavingClauseAttributes
    • typeCoercionRules
  • Check Analysis
    • CheckResolution
  • AnalysisOperators
    • EliminateAnalysisOperators

这些rule都是使用transform对Unresolved
LogicalPlan进行操作,其中typeCoercionRules是对HiveQL语义进行处理,在其下面又定义了多个rule:PropagateTypes、ConvertNaNs、WidenTypes、PromoteStrings、BooleanComparisons、BooleanCasts、StringToIntegralCasts、FunctionArgumentConversion、CaseWhenCoercion、Division,同样了这些rule也是使用transform对Unresolved
LogicalPlan进行操作。这些rule操作后,使得LogicalPlan的信息变得丰满和易懂。下面拿其中的两个rule来简单介绍一下:

比如rule之ResolveReferences,最终调用LogicalPlan的resolveChildren对列名给一名字和序号,如name#67之列的,这样保持列的唯一性:


/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala */

object ResolveReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
      case q: LogicalPlan if q.childrenResolved =>
        logTrace(s"Attempting to resolve ${q.simpleString}")
        q transformExpressions {
          case u @ UnresolvedAttribute(name) =>
            // Leave unchanged if resolution fails.  Hopefully will be resolved next round.
            val result = q.resolveChildren(name).getOrElse(u)
            logDebug(s"Resolving $u to $result")
            result
        }
    }
  }

又比如rule之StarExpansion,其作用就是将Select * Fom tbl中的*展开,赋予列名:


/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala */

object StarExpansion extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      // Wait until children are resolved
      case p: LogicalPlan if !p.childrenResolved => p
      // If the projection list contains Stars, expand it.
      case p @ Project(projectList, child) if containsStar(projectList) =>
        Project(
          projectList.flatMap {
            case s: Star => s.expand(child.output)
            case o => o :: Nil
          },
          child)
      case t: ScriptTransformation if containsStar(t.input) =>
        t.copy(
          input = t.input.flatMap {
            case s: Star => s.expand(t.child.output)
            case o => o :: Nil
          }
        )
      // If the aggregate function argument contains Stars, expand it.
      case a: Aggregate if containsStar(a.aggregateExpressions) =>
        a.copy(
          aggregateExpressions = a.aggregateExpressions.flatMap {
            case s: Star => s.expand(a.child.output)
            case o => o :: Nil
          }
        )
    }

    /**
     * Returns true if `exprs` contains a [[Star]].
     */
    protected def containsStar(exprs: Seq[Expression]): Boolean =
      exprs.collect { case _: Star => true }.nonEmpty
  }
}

4:Optimizer

Optimizer的功能就是将来自Analyzer的Analyzed LogicalPlan进行多种rule优化,生成Optimized LogicalPlan。Optimizer定义了3大类12个小类的优化rule:


/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Combine Limits", FixedPoint(100),
      CombineLimits) ::
    Batch("ConstantFolding", FixedPoint(100),
      NullPropagation,
      ConstantFolding,
      LikeSimplification,
      BooleanSimplification,
      SimplifyFilters,
      SimplifyCasts,
      SimplifyCaseConversionExpressions) ::
    Batch("Filter Pushdown", FixedPoint(100),
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughJoin,
      ColumnPruning) :: Nil
}
  • Combine Limits 合并Limit

    • CombineLimits:将两个相邻的limit合为一个
  • ConstantFolding 常量叠加
    • NullPropagation 空格处理
    • ConstantFolding:常量叠加
    • LikeSimplification:like表达式简化
    • BooleanSimplification:布尔表达式简化
    • SimplifyFilters:Filter简化
    • SimplifyCasts:Cast简化
    • SimplifyCaseConversionExpressions:CASE大小写转化表达式简化
  • Filter Pushdown Filter下推
    • CombineFilters Filter合并
    • PushPredicateThroughProject 通过Project谓词下推
    • PushPredicateThroughJoin 通过Join谓词下推
    • ColumnPruning 列剪枝

这些优化rule都是使用transform对LogicalPlan进行操作,如合并、删除冗余、简化、剪枝等,是整个LogicalPlan变得更简洁更高效。

比如将两个相邻的limit进行合并,可以使用CombineLimits。象sql("select
* from (select * from src limit 5)a limit 3 ") 这样一个SQL语句,会将limit 5和limit 3进行合并,只剩一个一个limit 3。


/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */

object CombineLimits extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
      Limit(If(LessThan(ne, le), ne, le), grandChild)
  }
}

又比如Null值的处理,可以使用NullPropagation处理。象sql("select count(null) from src where key is not null")这样一个SQL语句会转换成sql("select count(0) from src where key is not null")来处理。


/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */

object NullPropagation extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType)
      case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)
      case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)
      case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
      case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
      case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
      case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
      case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
      case e @ EqualNullSafe(Literal(null, _), r) => IsNull(r)
      case e @ EqualNullSafe(l, Literal(null, _)) => IsNull(l)

      ......
    }
  }
}

对于具体的优化方法可以使用下一章所介绍的hive/console调试方法进行调试,用户可以使用自定义的优化函数,也可以使用sparkSQL提供的优化函数。使用前先定义一个要优化查询,然后查看一下该查询的Analyzed LogicalPlan,再使用优化函数去优化,将生成的Optimized LogicalPlan和Analyzed LogicalPlan进行比较,就可以看到优化的效果。

5:SpankPlan

时间: 2024-10-13 22:31:43

sparkSQL1.1入门之三:sparkSQL组件之解析的相关文章

sparkSQL1.1入门之一:为什么sparkSQL

2014年9月11日,Spark1.1.0忽然之间发布.笔者立即下载.编译.部署了Spark1.1.0.关于Spark1.1的编译和部署,请参看笔者博客Spark1.1.0 源码编译和部署包生成 . Spark1.1.0中变化较大是sparkSQL和MLlib,sparkSQL1.1.0主要的变动有: 增加了JDBC/ODBC Server(ThriftServer),用户可以在应用程序中连接到SparkSQL并使用其中的表和缓存表. 增加了对JSON文件的支持 增加了对parquet文件的本地

三、OpenStack入门 之 各组件解析

OpenStack入门 之 各组件解析 写在前面 学习目标: 掌握 OpenStack 的各组件的架构和功能 本次笔记的内容有: Nova 组件解析 Swift 组件解析 Cinder 组件解析 Neutron 组件解析 Horizon 组件解析 Glance 组件解析 Keystone 组件解析 是常用的 7 个组件: 负责虚拟机创建.管理和销毁.提供计算资源服务的 Nova: 提供对象存储服务的分布式存储 Swift: 提供块存储服务的 Cinder: 提供虚拟机镜像管理和存储服务的 Gla

四、OpenStack入门 之 各组件解析(进阶)

OpenStack入门 之 各组件解析(进阶) 学习目标: 掌握更多组件的架构和功能 本次笔记的内容有: Ceilmeter 组件解析 Heat 组件解析 Trove 组件解析 Sahara 组件解析 Ironic 组件解析 1. Ceilometer组件解析 又称为 OpenStack Telemetry(远程测量收集数据),是 OpenStack 里面做 metering 的项目.Ceilometer 的主要目的是 为计费提供数据支持. OpenStack 本身不提供计费的功能,Ceilom

sparkSQL1.1入门之十:总结

回顾一下,在前面几章中,就sparkSQL1.1.0基本概念.运行架构.基本操作和实用工具做了基本介绍. 基本概念: SchemaRDD Rule Tree LogicPlan Parser Analyzer Optimizer SparkPlan 运行架构: sqlContext运行架构 hiveContext运行架构 基本操作 原生RDD的操作 parquet文件的操作 json文件的操作 hive数据的操作 和其他spark组件混合使用 实用工具 hive/console的操作 CLI的配

运维工具SaltStack之三Grains组件

运维工具SaltStack之三Grains组件 一.grains组件介绍 grains是收集Minion主机的静态.不常变化的基本信息,存储在Minion端本地,如:CPU.内核.操作系统.虚拟化等,并且服务器端可以根据这些信息进行灵活定制或个性化定制,是Saltstack最重要的组件之一,多用来做资产管理的信息收集,主要解决平台的差异性. 如可以使用以下命令: #salt 'minion01' grains.items  #获取minion01主机基本信息 二.grains组件自定义 自定义g

【美妙的Python之三】Python 对象解析

美妙的Python之Python对象 简而言之: Python 是能你无限惊喜的语言,与众不同.         Python对象概念的理解,是理解Python数据存储的前提.Python使用对象来存储数据,构造任何类型的值都是对象.         1.Python 对象:         Python的对象有3个属性:         标识:每个对象都有一个唯一的标识,通过  id(  )  可以查看对象的标识.         类型:对象的类型,指明该对象可以存储的数据类型,通过  typ

视音频数据处理入门:FLV封装格式解析

===================================================== 视音频数据处理入门系列文章: 视音频数据处理入门:RGB.YUV像素数据处理 视音频数据处理入门:PCM音频采样数据处理 视音频数据处理入门:H.264视频码流解析 视音频数据处理入门:AAC音频码流解析 视音频数据处理入门:FLV封装格式解析 视音频数据处理入门:UDP-RTP协议解析 ===================================================

【Android开发精要笔记】Android组件模型解析

Android组件模型解析 Android中的Mashup 将应用切分成不同类别的组件,通过统一的定位模型和接口标准将他们整合在一起,来共同完成某项任务.在Android的Mashup模式下,每个组件的功能都可以被充分的复用.来自不同应用的组件可以有机地结合在一起,共同完成任务. 基于Mashup的Android应用模型 三个基本要素:组件.连接.配置 接口就是实现单元.从代码来看,组件就是派生自特定接口或基类的子类的实现,如界面组件Activity就是指派生自android.app.Activ

(转载)Android之三种网络请求解析数据(最佳案例)

[置顶] Android之三种网络请求解析数据(最佳案例) 2016-07-25 18:02 4725人阅读 评论(0) 收藏 举报  分类: Gson.Gson解析(1)  版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 小武:相信大家都用过网络请求解析数据,只是方法不一样而已,但是,逻辑都是差不多的: 一:AsyncTask解析数据 AsyncTask主要用来更新UI线程,比较耗时的操作可以在AsyncTask中使用. AsyncTask是个抽象类,使用时需要继承这个