这一节介绍具体task的运行以及最终结果的处理
看线程运行的run方法,见代码注释
override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") //这个就是就是向Driver发送StatusUpdate方法,状态是RUNNING,其实不做什么操作的 execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { //将serializedTask解析出来 val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) //下载运行task需要的jar,文件等 updateDependencies(taskFiles, taskJars) //把真正的task反序列化出来 task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) task.setTaskMemoryManager(taskMemoryManager) // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. if (killed) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw new TaskKilledException } logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() val value = try { //任务执行,见下面解析 task.run(taskAttemptId = taskId, attemptNumber = attemptNumber) } finally { // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread; // when changing this, make sure to update both copies. //释放内存 val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { throw new SparkException(errMsg) } else { logError(errMsg) } } } val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. if (task.killed) { throw new TaskKilledException } val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() //将task运行结果序列化 val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. m.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) } val accumUpdates = Accumulators.values val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit //这里将最终结果序列化成serializedDirectResult,并根据这个序列化之后的大小区分处理 // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { //最终结果序列化之后>1G if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } //最终结果序列化之后>10M,把序列化的结果作为一个Block存放在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中 else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { //小数据可以直接处理 logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case ffe: FetchFailedException => val reason = ffe.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case _: TaskKilledException | _: InterruptedException if task.killed => logInfo(s"Executor killed $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) case cDE: CommitDeniedException => val reason = cDE.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case t: Throwable => // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. logError(s"Exception in $taskName (TID $taskId)", t) val metrics: Option[TaskMetrics] = Option(task).flatMap { task => task.metrics.map { m => m.setExecutorRunTime(System.currentTimeMillis() - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m } } val taskEndReason = new ExceptionFailure(t, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(taskEndReason)) // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. if (Utils.isFatalError(t)) { SparkUncaughtExceptionHandler.uncaughtException(t) } } finally { // Release memory used by this thread for shuffles env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() // Release memory used by this thread for accumulators Accumulators.clear() runningTasks.remove(taskId) } } }看task.run做了什么?
final def run(taskAttemptId: Long, attemptNumber: Int): T = { //首先构造了一个TaskContext,它维护了task的整个生命周期 context = new TaskContextImpl( stageId = stageId, partitionId = partitionId,//分区号 taskAttemptId = taskAttemptId,//taskId attemptNumber = attemptNumber,//这个task的第几次尝试,从0开始 taskMemoryManager = taskMemoryManager, runningLocally = false) TaskContext.setTaskContext(context)//设置到threadLocal中 context.taskMetrics.setHostname(Utils.localHostName()) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try { runTask(context)//根据不同的task类型启动 } finally { context.markTaskCompleted() TaskContext.unset() } }这里的runTask其实是区分shuffleMapTask和ResultTask的。而我在之前举例的sparkPi是没有shuffle过程的,所以这里我列举一个wordcount的例子来说明shuffle的部分。import org.apache.spark._ import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length != 3 ){ println("usage is org.test.WordCount <master> <input> <output>") return } val sc = new SparkContext(args(0), "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(args(2)) } }在sc.textFile(...).flatMap(...).map(...)之后得到的是一个MapPartitionsRDD,这个跟以前的例子是差不多的,就不介绍了。我们直接看reduceByKey,方法在PairRDDFunctions中/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKey[V]((v: V) => v, func, func, partitioner) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { reduceByKey(new HashPartitioner(numPartitions), func) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }这里有3个不同的reduceByKey方法。我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。
如果不指定reduce个数的话,规则如下: 1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。 2、如果没有定义分区函数而是定义了reduce个数的话,默认分区函数就是根据reduce个数生成HashPartitioner 3、如果这个也没设置,那就按照reduce个数是"spark.default.parallelism"或者rdd.head.partitions.size来生成HashPartitioner这里我们的K类型是String,V类型是Int,这里的C对于这个题来说也就是Intdef combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) //如果目前RDD中的分区函数与我们设置的一样,那就根本不需要进行shuffle操作了 //它将一个匿名函数封装成MapPartitionsRDD返回 if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { //不然就产生一个ShuffledRDD返回 new ShuffledRDD[K, V, C](self, partitioner)//这里没有传入关于依赖的参数,而之前的提到过的one-to-one依赖都是直接传入的,后面会说到 .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }可以看到,这个reduceByKey代码看似很简单的样子。这是因为它只是一个transformation,真正发挥作用是在action触发之后,就可以体会到复杂性。
那就看最终的result.saveAsTextFile(args(2)),这就是一个action操作。
上面说到shuffledRDD的依赖不是在new的时候传入的,那么在构建stage的时候需要根据dep来划分。其实shuffledRDD有一个重载方法getDependenciesoverride def getDependencies: Seq[Dependency[_]] = { List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) }所以在划分stage的时候就是因为这个重载方法的存在。下面直接跳到剩下的runTask的介绍,分为shuffledMapTask和ResultTask1、ShuffleMapTask的runTaskoverride def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() //反序列化taskBinary成rdd和dep(注:rdd是这个stage的最后一个rdd,dep是这个stage与下一个stage相连的shuffleDependency) val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager //默认是SortShuffleManager //这里的shuffleHandle是封装了shuffleId, _rdd.partitions.size和反序列出来的dep //这里getWriter新建一个SortShuffleWriter对象 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //看write的参数,其实是调用了rdd的compute方法,返回这个partition分区数据的一个迭代器,具体看下面介绍 //调用write就是将数据写到本地磁盘,并将把blockManagerId和block的大小组合成一个mapStatus writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) return writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }每个Stage的上边界,要么从外部存储读取数据,要么从上一个Stage的输出读取;而下边界,要么写入本地文件系统,以供下一个Stage读取,要么是ResultTask输出结果。上例中其实是划分了两个stage,两个stage通过shuffle依赖建立连接先看第一个stage,它反序列化出来的rdd是这个stage的最后一个rdd,即MapPartitionsRDD;Dep是与下一个stage连接的依赖,即shuffleDependency,这点很重要我们看调用这个rdd的compute方法发生了什么,很简单,对于这个分区中的数据依次调用传入的方法,返回一个计算过后的数据的迭代器。<span style="font-size:14px;background-color: rgb(255, 255, 255);">override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))</span>我们看用默认的sortShuffleWriter调用write方法。
首先会将records写入ExternalSorter,ExternalSorter会使用一个map来存储新的计算结果。如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件。当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里,见writePartitionedFile。最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件.至此,第一个stage,即从输入源到shuffle输出执行就结束了。2、ResultTask的runTaskoverride def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() //反序列化出finalrdd及func val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) //调用我们设置的func func(context, rdd.iterator(partition, context)) }我们看func的参数,同理是调用rdd的compute方法。因为我们这里的rdd是经过shuffle之后产生的,所以这里是shuffledRDD,它的compute方法如下
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }这里的SparkEnv.get.shuffleManager可以分为sort和hash,不管是哪种,getReader就是生成一个HashShuffleReader
看read方法/** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { val ser = Serializer.getSerializer(dep.serializer) //真正的从file中抓取reducer所需的内容,最终封装成InterruptibleIterator返回 val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } // Sort the output if there is a sort ordering defined. dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) sorter.iterator case None => aggregatedIter } }之后真正调用我们的func方法返回结果
我们知道task的run方法返回值是T,所以对于子类shuffleMapTask返回MapStatus,对于ResultTask返回调用func之后的结果。所以在Executor.scala中的run方法中,value就是run方法的返回值val value = try { task.run(taskAttemptId = taskId, attemptNumber = attemptNumber) } finally { ... ... }之后就是一开始讲到过的将返回结果序列化,并调用execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)结束整个流程。它其实是向driver发送StatusUpdate消息,包含了executorId,taskId,task的状态以及运算的序列化之后的结果case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK//一个任务运行完成,该Executor上相应的freeCores增加 makeOffers(executorId)//可在该Executor上调度可以运行的任务 case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }看driver端的处理。首先调用了TaskSchedulerImpl的statusUpdate方法。这个方法中根据状态区分处理,这里是FINISHED状态,且如果结果是DirectTaskResult,直接反序列化结果;如果是IndirectTaskResult,则根据反序列化之后得到的blockId去blockManager中远程读取。不管何种方式,数据取到之后,根据taskId获取tasksetId,再根据tasksetId获取tasksetManager,从而调用该tasksetManager的handleSuccessfulTask方法。该方法主要是调用DAGScheduler的taskEnded方法,向DAGScheduler事件循环发送CompletionEvent事件
我们主要看ResultTask和ShuffleMapTask的处理逻辑,见注释
/** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. */ private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.attempt, event.reason) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. if (event.reason != Success) { val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) } if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. return } val stage = stageIdToStage(task.stageId) event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] resultStage.resultOfJob match { case Some(job) => //如果这个分区尚未被标记为已完成,处理 if (!job.finished(rt.outputId)) { updateAccumulators(event) job.finished(rt.outputId) = true//标记为已完成 job.numFinished += 1 // If the whole job has finished, remove it // 最后一个stage的所有分区都完成了,即这个job运行完成了 if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job)//清理内存中的关于该job的信息 listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. try { //调用listener的taskSucceeded方法,这里的listener就是提交job时的JobWaiter,见下面分析 job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") } case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus]//shuffleMapTask的输出结果是一个MapStatus结构 val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { //将这个partition标记为运行完成,即添加分区号->status的映射到outputLoc的hashmap结构中 shuffleStage.addOutputLoc(smt.partitionId, status) } //如果这个shuffleMapTask是该stage处理的最后一个task,表明这个stage处理结束了 if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the // epoch incremented to refetch them. // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. //保存shuffleId->mapStatuses的映射 mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, //这里取list.head是因为同一个partition有可能会有多个task attempt在运行,每一个task attempt运行完成后就添加到list的头部 shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, changeEpoch = true) //由于已经更新了outputLoc, 所以将缓存中的clear clearCacheLocs() //如果有任何一个partition的outputLoc为空,即说明这个stage未完成,需要重新提交 if (shuffleStage.outputLocs.contains(Nil)) { // Some tasks had failed; let's resubmit this shuffleStage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty) .map(_._2).mkString(", ")) submitStage(shuffleStage) } else { val newlyRunnable = new ArrayBuffer[Stage] for (shuffleStage <- waitingStages) { logInfo("Missing parents for " + shuffleStage + ": " + getMissingParentStages(shuffleStage)) } //准备提交下一个stage。下一个可以提交的stage的依据是它的parent stage已经都完成了 for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty) { newlyRunnable += shuffleStage } waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { shuffleStage <- newlyRunnable.sortBy(_.id) jobId <- activeJobForStage(shuffleStage) } { logInfo("Submitting " + shuffleStage + " (" + shuffleStage.rdd + "), which is now runnable") //提交被选中的stage运行 submitMissingTasks(shuffleStage, jobId) } } } } ...略掉部分case } submitWaitingStages()//这个是调度等待中的stage }override def taskSucceeded(index: Int, result: Any): Unit = synchronized { if (_jobFinished) { throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") } resultHandler(index, result.asInstanceOf[T])//调用先前设置的方法,见下面 finishedTasks += 1 if (finishedTasks == totalTasks) { _jobFinished = true jobResult = JobSucceeded this.notifyAll()//触发该job awaitResult等待完成 } }resultHandler方法就是对每一个分区的结果用数组保存。之后将该数组返回。
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res) results }至此,任务的运行就介绍结束了。
版权声明:本文为博主原创文章,未经博主允许不得转载。
时间: 2024-10-11 04:04:11