接上一篇文章Spark
SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节:
我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD。
lazy val toRdd: RDD[Row] = executedPlan.execute()
Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join、Aggregate和Sort这种稍复杂的。
一、BasicOperator
1.1、Project
Project 的大致含义是:传入一系列表达式Seq[NamedExpression],给定输入的Row,经过Convert(Expression的计算eval)操作,生成一个新的Row。
Project的实现是调用其child.execute()方法,然后调用mapPartitions对每一个Partition进行操作。
这个f函数其实是new了一个MutableProjection,然后循环的对每个partition进行Convert。
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output = projectList.map(_.toAttribute) override def execute() = child.execute().mapPartitions { iter => //对每个分区进行f映射 @transient val reusableProjection = new MutableProjection(projectList) iter.map(reusableProjection) } }
通过观察MutableProjection的定义,可以发现,就是bind references to a schema 和 eval的过程:
将一个Row转换为另一个已经定义好schema column的Row。
如果输入的Row已经有Schema了,则传入的Seq[Expression]也会bound到当前的Schema。
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) { def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema private[this] val exprArray = expressions.toArray private[this] val mutableRow = new GenericMutableRow(exprArray.size) //新的Row def currentValue: Row = mutableRow def apply(input: Row): Row = { var i = 0 while (i < exprArray.length) { mutableRow(i) = exprArray(i).eval(input) //根据输入的input,即一个Row,计算生成的Row i += 1 } mutableRow //返回新的Row } }
1.2、Filter
Filter的具体实现是传入的condition进行多input row的eval计算,最后返回的是一个Boolean类型,
如果表达式计算成功,返回true,则这个分区的这条数据就会保存下来,否则会过滤掉。
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { override def output = child.output override def execute() = child.execute().mapPartitions { iter => iter.filter(condition.eval(_).asInstanceOf[Boolean]) //计算表达式 eval(input row) } }
1.3、Sample
Sample取样操作其实是调用了child.execute()的结果后,返回的是一个RDD,对这个RDD调用其sample函数,原生方法。
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan) extends UnaryNode { override def output = child.output // TODO: How to pick seed? override def execute() = child.execute().sample(withReplacement, fraction, seed) }
1.4、Union
Union操作支持多个子查询的Union,所以传入的child是一个Seq[SparkPlan]
execute()方法的实现是对其所有的children,每一个进行execute(),即select查询的结果集合RDD。
通过调用SparkContext的union方法,将所有子查询的结果合并起来。
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output = children.head.output override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //子查询的结果进行union override def otherCopyArgs = sqlContext :: Nil }
1.5、Limit
Limit操作在RDD的原生API里也有,即take().
但是Limit的实现分2种情况:
第一种是 limit作为结尾的操作符,即select xxx from yyy limit zzz。 并且是被executeCollect调用,则直接在driver里使用take方法。
第二种是 limit不是作为结尾的操作符,即limit后面还有查询,那么就在每个分区调用limit,最后repartition到一个分区来计算global limit.
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext) extends UnaryNode { // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again override def otherCopyArgs = sqlContext :: Nil override def output = child.output override def executeCollect() = child.execute().map(_.copy()).take(limit) //直接在driver调用take override def execute() = { val rdd = child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Boolean, Row]() iter.take(limit).map(row => mutablePair.update(false, row)) //每个分区先计算limit } val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //需要shuffle,来repartition shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.mapPartitions(_.take(limit).map(_._2)) //最后单独一个partition来take limit } }
1.6、TakeOrdered
TakeOrdered是经过排序后的limit N,一般是用在sort by 操作符后的limit。
可以简单理解为TopN操作符。
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) (@transient sqlContext: SQLContext) extends UnaryNode { override def otherCopyArgs = sqlContext :: Nil override def output = child.output @transient lazy val ordering = new RowOrdering(sortOrder) //这里是通过RowOrdering来实现排序的 override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1) }
1.7、Sort
Sort也是通过RowOrdering这个类来实现排序的,child.execute()对每个分区进行map,每个分区根据RowOrdering的order来进行排序,生成一个新的有序集合。
也是通过调用Spark RDD的sorted方法来实现的。
case class Sort( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan) extends UnaryNode { override def requiredChildDistribution = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil @transient lazy val ordering = new RowOrdering(sortOrder) //排序顺序 override def execute() = attachTree(this, "sort") { // TODO: Optimize sorting operation? child.execute() .mapPartitions( iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //每个分区调用sorted方法,传入<span style="font-family: Arial, Helvetica, sans-serif;">ordering排序规则,进行排序</span> preservesPartitioning = true) } override def output = child.output }
1.8、ExistingRdd
ExistingRdd是
object ExistingRdd { def convertToCatalyst(a: Any): Any = a match { case o: Option[_] => o.orNull case s: Seq[Any] => s.map(convertToCatalyst) case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray) case other => other } def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { data.mapPartitions { iterator => if (iterator.isEmpty) { Iterator.empty } else { val bufferedIterator = iterator.buffered val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity) bufferedIterator.map { r => var i = 0 while (i < mutableRow.length) { mutableRow(i) = convertToCatalyst(r.productElement(i)) i += 1 } mutableRow } } } } def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = { ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd)) } }
未完待续 :)
原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/38274621
Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现,布布扣,bubuko.com