提交stage

//提交stage,为stage创建一批task,task数量和partition数量相同

private def submitMissingTasks(stage: Stage, jobId: Int) {

logDebug("submitMissingTasks(" + stage + ")")

// Get our pending tasks and remember them in our pendingTasks entry

stage.pendingTasks.clear()

// First figure out the indexes of partition ids to compute.

//获取要创建的task的数量

val partitionsToCompute: Seq[Int] = {

if (stage.isShuffleMap) {

(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)

} else {

val job = stage.resultOfJob.get

(0 until job.numPartitions).filter(id => !job.finished(id))

}

}

val properties = if (jobIdToActiveJob.contains(jobId)) {

jobIdToActiveJob(stage.jobId).properties

} else {

// this stage will be assigned to "default" pool

null

}

//将stage加入runningstage队列

runningStages += stage

// SparkListenerStageSubmitted should be posted before testing whether tasks are

// serializable. If tasks are not serializable, a SparkListenerStageCompleted event

// will be posted, which should always come after a corresponding SparkListenerStageSubmitted

// event.

stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))

outputCommitCoordinator.stageStart(stage.id)

listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

// the serialized copy of the RDD and for each task we will deserialize it, which means each

// task gets a different copy of the RDD. This provides stronger isolation between tasks that

// might modify state of objects referenced in their closures. This is necessary in Hadoop

// where the JobConf/Configuration object is not thread-safe.

var taskBinary: Broadcast[Array[Byte]] = null

try {

// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

// For ResultTask, serialize and broadcast (rdd, func).

val taskBinaryBytes: Array[Byte] =

if (stage.isShuffleMap) {

closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()

} else {

closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()

}

taskBinary = sc.broadcast(taskBinaryBytes)

} catch {

// In the case of a failure during serialization, abort the stage.

case e: NotSerializableException =>

abortStage(stage, "Task not serializable: " + e.toString)

runningStages -= stage

return

case NonFatal(e) =>

abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")

runningStages -= stage

return

}

//为stage创建指定数量的task

val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {

partitionsToCompute.map { id =>

//给每个partition创建一个task

//给每个task计算最佳位置

val locs = getPreferredLocs(stage.rdd, id)

val part = stage.rdd.partitions(id)

//对于finalstage之外的stage的isShuffleMap都是true

//所以会创建ShuffleMapTask

new ShuffleMapTask(stage.id, taskBinary, part, locs)

}

} else {

//如果不是ShuffleMap,就会创建finalstage

//finalstage是穿件resultTask

val job = stage.resultOfJob.get

partitionsToCompute.map { id =>

val p: Int = job.partitions(id)

val part = stage.rdd.partitions(p)

//获取task计算的最佳位置的方法 getPreferredLocs

val locs = getPreferredLocs(stage.rdd, p)

new ResultTask(stage.id, taskBinary, part, locs, id)

}

}

if (tasks.size > 0) {

logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

stage.pendingTasks ++= tasks

logDebug("New pending tasks: " + stage.pendingTasks)

taskScheduler.submitTasks(

new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

} else {

// Because we posted SparkListenerStageSubmitted earlier, we should post

// SparkListenerStageCompleted here in case there are no tasks to run.

outputCommitCoordinator.stageEnd(stage.id)

listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))

logDebug("Stage " + stage + " is actually done; %b %d %d".format(

stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))

runningStages -= stage

}

}

def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {

getPreferredLocsInternal(rdd, partition, new HashSet)

}

//task对应partition的最佳位置

//就是从stage的最后一个RDD开始,找哪个RDD是被持久化了或者checkpoint

//那么task的最佳位置就是缓存的/checkpoint 的 partition的位置

//因为这样的话,task就在那个节点上执行,不需要计算之前的RDD

private def getPreferredLocsInternal(

rdd: RDD[_],

partition: Int,

visited: HashSet[(RDD[_],Int)])

: Seq[TaskLocation] =

{

// If the partition has already been visited, no need to re-visit.

// This avoids exponential path exploration.  SPARK-695

if (!visited.add((rdd,partition))) {

// Nil has already been returned for previously visited partitions.

return Nil

}

// If the partition is cached, return the cache locations

//寻找当前RDD是否缓存了

val cached = getCacheLocs(rdd)(partition)

if (!cached.isEmpty) {

return cached

}

// If the RDD has some placement preferences (as is the case for input RDDs), get those

//寻找当前RDD是否checkpoint了

val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

if (!rddPrefs.isEmpty) {

return rddPrefs.map(TaskLocation(_))

}

// If the RDD has narrow dependencies, pick the first partition of the first narrow dep

// that has any placement preferences. Ideally we would choose based on transfer sizes,

// but this will do for now.

//递归调用,看看父RDD是否缓存或者checkpoint

rdd.dependencies.foreach {

case n: NarrowDependency[_] =>

for (inPart <- n.getParents(partition)) {

val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

if (locs != Nil) {

return locs

}

}

case _ =>

}

//如果从第一个RDD到最后一个RDD都没有缓存或者checkpoint,那最佳位置就是Nil,也就是没有最佳位置

//那他的位置就要由taskscheduler来分配

Nil

}

时间: 2024-10-14 05:20:31

提交stage的相关文章

[Spark源代码剖析] DAGScheduler提交stage

转载请标明出处:http://blog.csdn.net/bigbigdata/article/details/47310657 DAGScheduler通过调用submitStage来提交stage.实现例如以下: private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + &q

Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJob org.apache.spark.scheduler.DAGScheduler#runJob org.apache.spark.scheduler.DAGScheduler#submitJob org.apache.spark.scheduler.DAGSchedulerEventProcess

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

【Spark Core】TaskScheduler源码与任务提交原理浅析1

引言 上一节<Stage生成和Stage源码浅析>中,我介绍了Stage生成划分到提交Stage的过程,分析最终归结到submitStage的递归提交Stage,其中要通过submitMissingTasks函数创建task集合来实现任务的创建和分发. 在接下来的几篇文章中,我将具体介绍一下任务创建和分发的过程,为了让逻辑更加清楚,我将分成几篇文章进行介绍,好保证简明清晰,逻辑连贯,前后统一. TaskScheduler介绍 TaskScheduler的主要任务是提交taskset到集群运算并

stage划分算法

stage划分算法总结 最后一个RDD创建finalstage finalstage倒推 通过宽依赖,来进行新的stage划分 使用递归,依次提交stage,从父stage开始 源码 org.apache.spark.scheduler包下 stage划分算法由 submitStage和getMissingParentStages方法组成 第一步:使用触发job的最后一个RDD,创建finalstage,传入到newstage方法中 var finalStage: Stage = null //

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

【Spark】Stage生成和Stage源代码浅析

引入 上一篇文章<DAGScheduler源代码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在.这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同一时候介绍Stage的相关源代码. Stage生成 Stage的调度是由DAGScheduler完毕的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后运行的Stage为根进行广度优先遍历,遍历到最開始运行的Stage运行.假设提

Spark DAGSheduler生成Stage过程分析实验

Spark Action会触发SparkContext类的runJob,而runJob会继续调用DAGSchduler类的runJob DAGSchduler类的runJob方法调用submitJob方法,并根据返回的completionFulture的value判断Job是否完成. onReceive用于DAGScheduler不断循环的处理事件,其中submitJob()会产生JobSubmitted事件,进而触发handleJobSubmitted方法. 正常情况下会根据finalStag

Spark Job的提交与task本地化分析(源码阅读八)

我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程).NODE_LOCAL(本地节点).NODE_PREF.RACK_LOCAL(本地机架).ANY(任何)几种.其他都很好理解,NODE_LOCAL会在spark日志中执行拉取数据所执行的task时,打印出来,因为Spark是移动计算,而不是移动数据的嘛. 那么什么是NODE_PREF? 当Driver应用程序刚刚启动,Driver分配获得的Executor很可能还