前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation。
object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
调用SparkContext的parallelize方法。此方法在一个已经存在的Scala集合上创建出一个可以被并行操作的分布式数据集,也就是返回一个RDD。
大家都在书面化的说RDD是什么,我还不如用源码的方式展示更为直观。可以看出来,最基本的RDD其实就是sparkContext和它本身的依赖。
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { ... ... }
我们顺着看parallelize方法的内容。seq是一个scala集合类型。numSlices是设置的并行度,有默认值(配置项或者driver获得的Executor注册上来的总的core)
override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) }
def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }
我们继续看ParallelCollectionRDD的内部。它重写了RDD的一些方法,并设置依赖为Nil
private[spark] class ParallelCollectionRDD[T: ClassTag]( @transient sc: SparkContext, @transient data: Seq[T], numSlices: Int, locationPrefs: Map[Int, Seq[String]]) extends RDD[T](sc, Nil) { // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // instead. // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal. override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } override def compute(s: Partition, context: TaskContext): Iterator[T] = { new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator) } override def getPreferredLocations(s: Partition): Seq[String] = { locationPrefs.getOrElse(s.index, Nil) } }
到这里,parallelize的操作就结束了。这其实就是人们常说的spark Transformation,并没有触发任务的调度。
接着,在这个ParallelCollectionRDD上执行map操作。其实就是执行了RDD抽象类中的map方法。
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
map方法中的f参数其实就是我们程序中写的map中的操作,之后产生了MapPartitionsRDD。
prev参数就是执行map转换之前的ParallelCollectionRDD
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) }
查看父类RDD的构造过程,其实就是默认构建一个one-to-one的依赖,OneToOneDependency意味着RDD的转换过程是一对一的,即分区号是一一对应的。注明的是,依赖中保存的RDD是转换之前的RDD,这里的话就是ParallelCollectionRDD
/** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))
partitioner其实就是包含了分区的数量以及根据key获得分区号的一个包装。
abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int }
到这里为止,map操作就介绍结束了。同样也没有触发任务的调度,只是RDD的转换。
现在看最后的reduce操作
/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) //定义一个具体的方法,将迭代器参数中的值从左一个个相加 val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None //定义一个将各分区合并的方法 val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } //执行runJob sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }最终调用这个runJob方法,参数:
rdd:MapPartitionsRDD
func:包装了reducePartition方法
partitions:0 until rdd.partitions.size,rdd.partitions就是调用了MapPartitionsRDD的getPartitions方法所得。每次转换之后的RDD都会保存之前所有的依赖,所以可以根据依赖关系追溯到第一个RDD。这里MapPartitionsRDD的getPartitions方法是获取第一个RDD的getPartitions方法。allowLocal:falseresultHandler:mergeResult方法
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than * shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }这里最终调用了DAGScheduler的runJob方法 -> submitJob
这里产生JobId,发送JobSubmitted消息到DAG的事件循环中 -> handleJobSubmitted
所以,这个reduce实际触发了runJob操作,即spark中称为Action。
下一节,就介绍剩下的reduce操作
版权声明:本文为博主原创文章,未经博主允许不得转载。