版本:1.6.2
不管是hadoop中map/reduce还是spark中各种算子,shuffle过程都是其中核心过程,shuffle的设计是否高效,基本确定了整个计算过程是否高效。 设计难点在于shuffle过程涉及到大数据的IO操作(包括本地临时文件IO和网络IO),以及可能存在的cpu密集型排序计算操作。
在spark1.6.2版本,spark针对大型数据有三种shuffle 机制,即“sort-based shuffle”,”hash-based shuffle”,”tungsten-sort shuffle"
下面 是官方对其的描述:
/**
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
* read their portion of the map output. In cases where the map output data is too large to fit in
* memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
* to produce the final output file.
*
* Sort-based shuffle has two different write paths for producing its map output files:
*
* - Serialized sorting: used when all three of the following conditions hold:
* 1. The shuffle dependency specifies no aggregation or output ordering.
* 2. The shuffle serializer supports relocation of serialized values (this is currently
* supported by KryoSerializer and Spark SQL‘s custom serializers).
* 3. The shuffle produces fewer than 16777216 output partitions.
* - Deserialized sorting: used to handle all other cases.
*
* -----------------------
* Serialized sorting mode
* -----------------------
*
* In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
* shuffle writer and are buffered in a serialized form during sorting. This write path implements
* several optimizations:
*
* - Its sort operates on serialized binary data rather than Java objects, which reduces memory
* consumption and GC overheads. This optimization requires the record serializer to have certain
* properties to allow serialized records to be re-ordered without requiring deserialization.
* See SPARK-4550, where this optimization was first proposed and implemented, for more details.
*
* - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
* arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
* record in the sorting array, this fits more of the array into cache.
*
* - The spill merging procedure operates on blocks of serialized records that belong to the same
* partition and does not need to deserialize records during the merge.
*
* - When the spill compression codec supports concatenation of compressed data, the spill merge
* simply concatenates the serialized and compressed spill partitions to produce the final output
* partition. This allows efficient data copying methods, like NIO‘s `transferTo`, to be used
* and avoids the need to allocate decompression or copying buffers during the merge.
*
* For more details on these optimizations, see SPARK-7081.
*/
本文针对shuffle相关的代码逻辑做一次串读,其中包括shuffle的原理,以及shuffle代码级别的实现。
Job,Stage,Task, Dependency
在Spark中,RDD是操作对象的单位,其中操作可以分为转换(transformation)和动作(actions),只有动作操作才会触发一个spark计算操作。
以rdd.map操作和rdd.count操作做比较
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
map是一个转换操作,它只是在当前的rdd的基础上创建一个MapPartitionsRDD对象,而count是一个动作操作,它会调用sc.runJob向spark提交一个Job
Job是一组rdd的转换以及最后动作的操作集合,它是Spark里面计算最大最虚的概念,甚至在spark的任务页面中都无法看到job这个单位。 但是不管怎么样,在spark用户的角度,job是我们计算目标的单位,每次在一个rdd上做一个动作操作(acions)时,都会触发一个job,完成计算并返回我们想要的数据。
Job是由一组RDD上转换和动作组成,这组RDD之间的转换关系表现为一个有向无环图(DAG),每个RDD的生成依赖于前面1个或多个RDD。
在Spark中,两个RDD之间的依赖关系是Spark的核心。站在RDD的角度,两者依赖表现为点对点依赖, 但是在Spark中,RDD存在分区(partition)的概念,两个RDD之间的转换会被细化为两个RDD分区之间的转换。
如上图所示,站在job角度,RDD_B由RDD_A转换而成,RDD_D由RDD_C转换而成,最后RDD_E由RDD_B和RDD_D转换,最后输出RDD_E上做了一个动作,将结果输出。 但是细化到RDD内分区之间依赖,RDD_B对RDD_A的依赖,RDD_D对RDD_C的依赖是不一样,他们的区别用专业词汇来描述即为窄依赖和宽依赖。
所谓的窄依赖是说子RDD中的每一个数据分区只依赖于父RDD中的对应的有限个固定的数据分区,而宽依赖是指子RDD中的每个数据分区依赖于父RDD中的所有数据分区。
宽依赖很好理解,但是对于窄依赖比较绕口,特别是定义中有限与固定两个要求,宽依赖也满足有限和固定这两个要求?难道他们俩个之间区别也仅仅在于“有限”这个数字的大小? 其实就是这样的理解,“有限”就表现为所依赖的分区数目相比完整分区数相差很大,而且spark靠窄依赖来实现的RDD基本上都大部分都是一对一的依赖,所以就不需要纠结这个有限的关键字。
这里还有一个问题,count操作是依赖父RDD的所有分区进行计算而得到,那么它是宽依赖吗?这么疑问,答案肯定就是否定的,首先这里依赖是父RDD和子RDD之间的关系描述,count操作只有输出, 没有子rdd的概念,就不要把依赖的关系硬套上给你带来麻烦。看上面的实现,count只是把sc.runJob计算返回的Array[U]做一次sum操作而已。
窄依赖和宽依赖的分类是Spark中很重要的特性,不同依赖在实现,任务调度机制,容错恢复上都有不同的机制。
- 实现上:对于窄依赖,rdd之间的转换可以直接pipe化,而宽依赖需要采用shuffle过程来实现。
- 任务调度上:窄依赖意味着可以在某一个计算节点上直接通过父RDD的某几块数据(通常是一块)计算得到子RDD某一块的数据; 而相对的,宽依赖意味着子RDD某一块数据的计算必须等到它的父RDD所有数据都计算完成之后才可以进行,而且需要对父RDD的计算结果需要经过shuffle才能被下一个rdd所操作。
- 容错恢复上:窄依赖的错误恢复会比宽依赖的错误恢复要快很多,因为对于窄依赖来说,只有丢失的那一块数据需要被重新计算, 而宽依赖意味着所有的祖先RDD中所有的数据块都需要被重新计算一遍,这也是我们建议在长“血统”链条特别是有宽依赖的时候,需要在适当的时机设置一个数据检查点以避免过长的容错恢复。
在这边 可以使用:RDD.checkpoint的方法来实现检查点
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
理清楚了Job层面RDD之间的关系,RDD层面分区之间的关系,那么下面讲述一下Stage概念。
Stage的划分是对一个Job里面一系列RDD转换和动作进行划分。
- 首先job是因动作而产生,因此每个job肯定都有一个ResultStage,否则job就不会启动。
- 其次,如果Job内部RDD之间存在宽依赖,Spark会针对它产生一个中间Stage,即为ShuffleStage,严格来说应该是ShuffleMapStage,这个stage是针对父RDD而产生的, 相当于在父RDD上做一个父rdd.map().collect()的操作。ShuffleMapStage生成的map输入,对于子RDD,如果检测到所自己所“宽依赖”的stage完成计算,就可以启动一个shuffleFectch,
从而将父RDD输出的数据拉取过程,进行后续的计算。
因此一个Job由一个ResultStage和多个ShuffleMapStage组成。
无Shuffle Job的执行过程
对一个无Shuffle的job执行过程的剖析可以知晓我们执行一个"动作"时,spark的处理流程. 下面我们就以一个简单例子进行讲解:
sc.textFile(“mahuacai").count
//def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
这个例子很简单就是统计这个文件的行数;上面一行代码,对应了下面三个过程中:
- sc.textFile(“mahucai")会返回一个rdd,
- 然后在这个rdd上做count动作,触发了一次Job的提交sc.runJob(this, Utils.getIteratorSize _)
- 对runJob返回的Array结构进行sum操作;
核心过程就是第二步,下面我们以代码片段的方式来描述这个过程,这个过程肯定是线性的,就用step来标示每一步,以及相关的代码类:
//step1:SparkContext
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
sc.runJob(this, Utils.getIteratorSize _)的过程会经过一组runJob的重载函数,进入上述step1中的runJob函数,相比原始的runJob,到达这边做的工作不多,比如设置partitions个数, Utils.getIteratorSize _到func转化等,以后像这样简单的过程就不再描述.
Step1做的一个很重要的工作是构造一个Array,并构造一个函数对象"(index, res) => results(index) = res"继续传递给runJob函数,然后等待runJob函数运行结束,将results返回; 对这里的解释相当在runJob添加一个回调函数,将runJob的运行结果保存到Array到, 回调函数,index表示mapindex, res为单个map的运行结果,对于我们这里例子.res就为每个分片的 文件行数.
//step2:SparkContext
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
Step2中runJob就有一个resultHandler参数,这就是Step1构造的回调函数,dagScheduler是Spark里面最外层调度器,通过调用它的runJob函数,将相关参见传入到Spark调度器中. 只有Step1中的runJob函数的返回值有返回值,这里的runJob,包括dagScheduler.runJob都是没有返回值的;返回是通过Step1的回调函数进行设置的.
为什么我要一再强调返回值是通过Step1的回调函数来设置的?这个很重要,否则你都不知道spark调度的job的运行结果是怎么样被我们自己的逻辑代码所获取的!!
还有一点很重要,Step2是Step1以后的直接步骤,所以Step2中的dagScheduler.runJob是堵塞的操作,即直到Spark完成Job的运行之前,rdd.doCheckpoint()是不会执行的;
//Step3:DAGScheduler
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
Step2中说了dagScheduler.runJob是堵塞的,堵塞就堵塞在Step3的waiter.awaitResult()操作,即submitJob会返回一个waiter对象,而我们的awaitResult()就堵塞了;
到目前为止,我们终于从runJob这个多处出现的函数名称跳到submitJob这个函数名称;继续下一步
//Step4:DAGScheduler
/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
在Step4的submitJob中,我们给这次job分配了一个jobID, 通过创建了一个JobWaiter对象,返回给Step3;最重要的步骤就是调用eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))向DAG调度器
发送一个JobSubmitted的消息;
/**
* Put the event into the event queue. The event thread will process it later.
*/
def post(event: E): Unit = {
eventQueue.put(event)
}
到目前为止我们都没有关系函数的参数,这里我们要分析一下发送的JobSubmitted的消息包:
- jobId,rdd,func2,partitions.toArray这几个都比较好理解,就不阐述了
- callSite/properties:个人不是很感兴趣,姑且理解为不重要的
- waiter就是上面创建的JobWaiter对象,这个很重要,因为这个对象封装了几个重要的参数:
- jobId:Job编号
- partitions.size:分区编号
- resultHandler:我们Step1设置的回调函数
为什么JobWaiter重要,这个对象包含了我们分区的个数.我们知道分区的个数和task个数是相同的,因此JobWaiter成功返回的前提是: 它接受到partitions.size个归属于jobid的task成功运行的结果,并通过resultHandler来将这些task运行结果回调给Step2的Array
这句话应该不难理解,其实这句话也包含了我们后面job调度的整体过程, 下面我们就一步一步来分析从job到Stage,到task以及直到task运行成功,调用我们的resultHandler回调的过程.
//Step5:DAGScheduler
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
Step4发送的消息最后被Step5中的handleJobSubmitted函数进行处理,我这里删除了handleJobSubmitted中很多我们不关心的代码,Step5的核心代码就是创建一个finalStage, 并调用 submitStage将stage提交给Dag进行调度;这里我们从Job单位层面进入Stage层;
这个Stage命名很好:
finalStage,它是整个DAG上的最后一个stage,它不是一个集合,而是单一的stage,这说明一个道理,runJob肯定只对应一个finalStage,即最终的输出肯定只有一个, 中间的stage就是我们传说中的shuffleStage,shuffleStage的生成就是在生成finalStage过程中生成的,即newStage.
那么我们就进入newResultStage这个函数,等一下我们还会回到submitStage,来分析怎么将Stage解析为Task提交给Spark进行运行;
//Step5.1:DAGScheduler
/**
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
getParentStagesAndId 的具体的实现
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
getParentStages 的具体的实现
从实现的代码里面 不难看出 这边使用的 是图算法里面的 广度遍历
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage] // 存储parent stage
val visited = new HashSet[RDD[_]] //存储已经被访问过的RDD
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can‘t do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。
visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
}
parents.toList
}
RDD[_] 表示 是任何类型的 RDD
getShuffleMapStage 的逻辑
**
* Get or create a shuffle map stage for the given shuffle dependency‘s map side.
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
这边 可以简单的看一下 关于的 ShuffleDependency
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It‘s possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
Step5.1中首先是在当前的rdd上调用getParentStagesAndId来生成父Stage,父Stages是一个列表;我们这里分析的cache是没有Shuffle的,那么肯定就没有父Stage这个过程;我们就不深入 去分析这个过程;
然后就创建一个Stage对象,并更新Stage和job之间的关系.
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
下面我们要从维度5.1跳转到一个和执行流程无关的代码,即Stage类的实现,毕竟是Spark的核心对象,对它的理解还是很重要的;
官方对stage 的解释 说明
/**
* A stage is a set of parallel tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks‘ results are input for
* other stage(s), or a result stage, in which case its tasks directly compute a Spark action
* (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
* track the nodes that each output partition is on.
*
* Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
* Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that
* case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.
* The latest one will be accessible through latestInfo.
*
* @param id Unique stage ID
* @param rdd RDD that this stage runs on: for a shuffle map stage, it‘s the RDD we run map tasks
* on, while for a result stage, it‘s the target RDD that we ran an action on
* @param numTasks Total number of tasks in stage; result stages in particular may not need to
* compute all partitions, e.g. for first(), lookup(), and take().
* @param parents List of stages that this stage depends on (through shuffle dependencies).
* @param firstJobId ID of the first job this stage was part of, for FIFO scheduling.
* @param callSite Location in the user program associated with this stage: either where the target
* RDD was created, for a shuffle map stage, or where the action for a result stage was called.
*/
private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite)
extends Logging {
val numPartitions = rdd.partitions.length
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]
val pendingPartitions = new HashSet[Int]
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
val name: String = callSite.shortForm
val details: String = callSite.longForm
private var _internalAccumulators: Seq[Accumulator[Long]] = Seq.empty
/** Internal accumulators shared across all tasks in this stage. */
def internalAccumulators: Seq[Accumulator[Long]] = _internalAccumulators
/**
* Re-initialize the internal accumulators associated with this stage.
*
* This is called every time the stage is submitted, *except* when a subset of tasks
* belonging to this stage has already finished. Otherwise, reinitializing the internal
* accumulators here again will override partial values from the finished tasks.
*/
def resetInternalAccumulators(): Unit = {
_internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
}
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
/**
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
private val fetchFailedAttemptIds = new HashSet[Int]
private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}
/**
* Check whether we should abort the failedStage due to multiple consecutive fetch failures.
*
* This method updates the running set of failed stage attempts and returns
* true if the number of failures exceeds the allowable number of failures.
*/
private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
fetchFailedAttemptIds.add(stageAttemptId)
fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
}
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
nextAttemptId += 1
}
/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo
override final def hashCode(): Int = id
override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]
}
private[scheduler] object Stage {
// The number of consecutive failures allowed before a stage is aborted
val MAX_CONSECUTIVE_FETCH_FAILURES = 4
}
官方的解释
/**
* ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
* They occur right before each shuffle operation, and might contain multiple pipelined operations
* before that (e.g. map and filter). When executed, they save map output files that can later be
* fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
* and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
*
* ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
* For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
* there can be multiple ActiveJobs trying to compute the same shuffle map stage.
*/
private[spark] class ShuffleMapStage(
id: Int,
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
private[this] var _mapStageJobs: List[ActiveJob] = Nil
private[this] var _numAvailableOutputs: Int = 0
/**
* List of [[MapStatus]] for each partition. The index of the array is the map partition id,
* and each value in the array is the list of possible [[MapStatus]] for a partition
* (a single task might run multiple times).
*/
private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
override def toString: String = "ShuffleMapStage " + id
/**
* Returns the list of active jobs,
* i.e. map-stage jobs that were submitted to execute this stage independently (if any).
*/
def mapStageJobs: Seq[ActiveJob] = _mapStageJobs
/** Adds the job to the active job list. */
def addActiveJob(job: ActiveJob): Unit = {
_mapStageJobs = job :: _mapStageJobs
}
/** Removes the job from the active job list. */
def removeActiveJob(job: ActiveJob): Unit = {
_mapStageJobs = _mapStageJobs.filter(_ != job)
}
/**
* Number of partitions that have shuffle outputs.
* When this reaches [[numPartitions]], this map stage is ready.
* This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.
*/
def numAvailableOutputs: Int = _numAvailableOutputs
/**
* Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
* This should be the same as `outputLocs.contains(Nil)`.
*/
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}
def addOutputLoc(partition: Int, status: MapStatus): Unit = {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
if (prevList == Nil) {
_numAvailableOutputs += 1
}
}
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location == bmAddress)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
_numAvailableOutputs -= 1
}
}
/**
* Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
* value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
* that position is filled with null.
*/
def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
outputLocs.map(_.headOption.orNull)
}
/**
* Removes all shuffle outputs associated with this executor. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists), as they are still
* registered with this execId.
*/
def removeOutputsOnExecutor(execId: String): Unit = {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
val newList = prevList.filterNot(_.location.executorId == execId)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
_numAvailableOutputs -= 1
}
}
if (becameUnavailable) {
logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
this, execId, _numAvailableOutputs, numPartitions, isAvailable))
}
}
}
首先我们看ShuffleMapStage(是 stage 的实现 )几个字段,其中shuffleDep和parents最为重要,首先如果一个Stage的shuffleDep不为空,那么当前的Stage是因为shuffleMap输出而生成的Stage;
怎么解释呢?shuffleDep就是该Stage的生成原因;因为下游rdd对当前的rdd有这个依赖而生成在当前rdd上生成一个Stage. 因此FinalStage,shuffleDep值为none
parents参数就是父Stage列表,当前rdd被调度的前提是所有的父Stage都调度完成;对于我们当前研究这个case来说,shuffleDep和parents都为none;
Stage这个类还有两个比较重要的函数:
//ShuffleMapStage.class
/**
* Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
* This should be the same as `outputLocs.contains(Nil)`.
*/
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
def addOutputLoc(partition: Int, status: MapStatus): Unit = {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
if (prevList == Nil) {
_numAvailableOutputs += 1
}
}