Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark
SQL Catalyst源码分析之Physical Plan
,本文将介绍Physical Plan的toRDD的具体实现细节:

我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD。

  lazy val toRdd: RDD[Row] = executedPlan.execute()

Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join、Aggregate和Sort这种稍复杂的。

一、BasicOperator

1.1、Project

Project 的大致含义是:传入一系列表达式Seq[NamedExpression],给定输入的Row,经过Convert(Expression的计算eval)操作,生成一个新的Row。

Project的实现是调用其child.execute()方法,然后调用mapPartitions对每一个Partition进行操作。

这个f函数其实是new了一个MutableProjection,然后循环的对每个partition进行Convert。

case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
  override def output = projectList.map(_.toAttribute)
  override def execute() = child.execute().mapPartitions { iter => //对每个分区进行f映射
    @transient val reusableProjection = new MutableProjection(projectList)
    iter.map(reusableProjection)
  }
}

通过观察MutableProjection的定义,可以发现,就是bind references to a schema 和 eval的过程:

将一个Row转换为另一个已经定义好schema column的Row。

如果输入的Row已经有Schema了,则传入的Seq[Expression]也会bound到当前的Schema。

case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
    this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema

  private[this] val exprArray = expressions.toArray
  private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row
  def currentValue: Row = mutableRow
  def apply(input: Row): Row = {
    var i = 0
    while (i < exprArray.length) {
      mutableRow(i) = exprArray(i).eval(input)  //根据输入的input,即一个Row,计算生成的Row
      i += 1
    }
    mutableRow //返回新的Row
  }
}

1.2、Filter

Filter的具体实现是传入的condition进行多input row的eval计算,最后返回的是一个Boolean类型,

如果表达式计算成功,返回true,则这个分区的这条数据就会保存下来,否则会过滤掉。

case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
  override def output = child.output

  override def execute() = child.execute().mapPartitions { iter =>
    iter.filter(condition.eval(_).asInstanceOf[Boolean]) //计算表达式 eval(input row)
  }
}

1.3、Sample

Sample取样操作其实是调用了child.execute()的结果后,返回的是一个RDD,对这个RDD调用其sample函数,原生方法。

case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
  extends UnaryNode
{
  override def output = child.output

  // TODO: How to pick seed?
  override def execute() = child.execute().sample(withReplacement, fraction, seed)
}

1.4、Union

Union操作支持多个子查询的Union,所以传入的child是一个Seq[SparkPlan]

execute()方法的实现是对其所有的children,每一个进行execute(),即select查询的结果集合RDD。

通过调用SparkContext的union方法,将所有子查询的结果合并起来。

case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {
  // TODO: attributes output by union should be distinct for nullability purposes
  override def output = children.head.output
  override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查询的结果进行union

  override def otherCopyArgs = sqlContext :: Nil
}

1.5、Limit

Limit操作在RDD的原生API里也有,即take().

但是Limit的实现分2种情况:

第一种是 limit作为结尾的操作符,即select xxx from yyy limit zzz。 并且是被executeCollect调用,则直接在driver里使用take方法。

第二种是 limit不是作为结尾的操作符,即limit后面还有查询,那么就在每个分区调用limit,最后repartition到一个分区来计算global limit.

case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)
  extends UnaryNode {
  // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
  // partition local limit -> exchange into one partition -> partition local limit again

  override def otherCopyArgs = sqlContext :: Nil

  override def output = child.output

  override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver调用take

  override def execute() = {
    val rdd = child.execute().mapPartitions { iter =>
      val mutablePair = new MutablePair[Boolean, Row]()
      iter.take(limit).map(row => mutablePair.update(false, row)) //每个分区先计算limit
    }
    val part = new HashPartitioner(1)
    val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //需要shuffle,来repartition
    shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
    shuffled.mapPartitions(_.take(limit).map(_._2)) //最后单独一个partition来take limit
  }
}

1.6、TakeOrdered

TakeOrdered是经过排序后的limit N,一般是用在sort by 操作符后的limit。

可以简单理解为TopN操作符。

case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
                      (@transient sqlContext: SQLContext) extends UnaryNode {
  override def otherCopyArgs = sqlContext :: Nil

  override def output = child.output

  @transient
  lazy val ordering = new RowOrdering(sortOrder) //这里是通过RowOrdering来实现排序的

  override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)

  // TODO: Terminal split should be implemented differently from non-terminal split.
  // TODO: Pick num splits based on |limit|.
  override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}

1.7、Sort

Sort也是通过RowOrdering这个类来实现排序的,child.execute()对每个分区进行map,每个分区根据RowOrdering的order来进行排序,生成一个新的有序集合。

也是通过调用Spark RDD的sorted方法来实现的。

case class Sort(
    sortOrder: Seq[SortOrder],
    global: Boolean,
    child: SparkPlan)
  extends UnaryNode {
  override def requiredChildDistribution =
    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

  @transient
  lazy val ordering = new RowOrdering(sortOrder) //排序顺序

  override def execute() = attachTree(this, "sort") {
    // TODO: Optimize sorting operation?
    child.execute()
      .mapPartitions(
        iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每个分区调用sorted方法,传入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序规则,进行排序</span>
        preservesPartitioning = true)
  }

  override def output = child.output
}

1.8、ExistingRdd

ExistingRdd是

object ExistingRdd {
  def convertToCatalyst(a: Any): Any = a match {
    case o: Option[_] => o.orNull
    case s: Seq[Any] => s.map(convertToCatalyst)
    case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
    case other => other
  }

  def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
    data.mapPartitions { iterator =>
      if (iterator.isEmpty) {
        Iterator.empty
      } else {
        val bufferedIterator = iterator.buffered
        val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)

        bufferedIterator.map { r =>
          var i = 0
          while (i < mutableRow.length) {
            mutableRow(i) = convertToCatalyst(r.productElement(i))
            i += 1
          }

          mutableRow
        }
      }
    }
  }

  def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
    ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
  }
}

未完待续 :)

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/38274621

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现,布布扣,bubuko.com

时间: 2024-08-05 06:49:44

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现的相关文章

Spark SQL Catalyst源码分析之Physical Plan

前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized L

第六篇:Spark SQL Catalyst源码分析之Physical Plan

/** Spark SQL源码分析系列文章*/ 前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有S

第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

/** Spark SQL源码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. [java] view plain copy lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark SQL Catalyst源码分析之Optimizer

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以

第二篇:Spark SQL Catalyst源码分析之SqlParser

/** Spark SQL源码分析系列文章*/ Spark SQL的核心执行流程我们已经分析完毕,可以参见Spark SQL核心执行流程,下面我们来分析执行流程中各个核心组件的工作职责. 本文先从入口开始分析,即如何解析SQL文本生成逻辑计划的,主要设计的核心组件式SqlParser是一个SQL语言的解析器,用scala实现的Parser将解析的结果封装为Catalyst TreeNode ,关于Catalyst这个框架后续文章会介绍. 一.SQL Parser入口 Sql Parser 其实是

第八篇:Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp

第三篇:Spark SQL Catalyst源码分析之Analyzer

/** Spark SQL源码分析系列文章*/ 前面几篇文章讲解了Spark SQL的核心执行流程和Spark SQL的Catalyst框架的Sql Parser是怎样接受用户输入sql,经过解析生成Unresolved Logical Plan的.我们记得Spark SQL的执行流程中另一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用. Analyzer位于Catalyst的analysis package下,主要职责是将Sql Parser

第四篇:Spark SQL Catalyst源码分析之TreeNode Library

/** Spark SQL源码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Lib