主要内容
1. Task执行原理流程图
2. Task执行源码
3. Task执行结果在Driver端的处理
一、Task在Executor(worker)端执行及返回Driver流程图
图37-1 Driver端与Executor交互图
二、Executor(worker)端执行源码解析
1.接收Driver端发来的消息
当Driver中的SchedulerBackend给ExecutorBackend发送LaunchTask之后,ExecutorBackend在接收到LaunchTask消息后,首先反序列化TaskDescription。
& StandAlone下为SchedulerBackend具体指CoarseGrainedSchedulerBackend,ExecutorBackend指CoarseGrainedExecutorBackend。
//CoarseGrainedExecutorBackend#receive case LaunchTask(data) => if (executor == null) { //如果不存在Executor则会报错,退出系统 logError("Received LaunchTask command but executor was null") System.exit(1) } else { //反序列化Task,得到TaskDescription信息 val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) //调用executor#launchTask在executor上加载任务 executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
2.Executor加载Task
Executor会通过launchTask来执行Task。
3.调用TaskRunner执行Task
//Executor#launchTask def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { //实例化一个TaskRunner对象来执行Task val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) //将Task加入到正在运行的Task队列 runningTasks.put(taskId, tr) threadPool.execute(tr) }
class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable {//省略非关键代码 override def run(): Unit = { //为我们的Task创建内存管理器 val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) //记录反序列化时间 val deserializeStartTime = System.currentTimeMillis() //加载具体类时需要用到ClassLoader Thread.currentThread.setContextClassLoader(replClassLoader) //创建序列化器 val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") //调用ExecutorBackend#statusUpdate向Driver发信息汇报当前状态 execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) //记录运行时间和GC信息 var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { //反序列化Task的依赖,得到的结果中有taskFile(运行的文件),taskJar(环境依 //赖),taskBytes(相当于缓冲池) val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) //下载Task运行缺少的依赖。 updateDependencies(taskFiles, taskJars) //反序列化Task task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) //设置Task运行时的MemoryManager task.setTaskMemoryManager(taskMemoryManager) //如果Task在序列化前就已经被killed,则会抛出异常;否则,正常执行 if (killed) { throw new TaskKilledException } logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) //运行的实际任务,并测量它的运行时间。 taskStart = System.currentTimeMillis() var threwException = true val (value, accumUpdates) = try { //调用task#run方法,得到task运行的结果 val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { //清理所有分配的内存和分页,并检测是否有内存泄漏 val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) { throw new SparkException(errMsg) } else { logError(errMsg) } } } //记录Task完成时间 val taskFinish = System.currentTimeMillis() //如果Task killed,则报错。 if (task.killed) { throw new TaskKilledException } //否则序列化得到的Task执行的结果 val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() //记录相关的metrics for (m <- task.metrics) { m.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) m.updateAccumulators() } //创建直接返回给Driver的结果对象DirectTaskResult val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit val serializedResult: ByteBuffer = { //对直接返回的结果对象大小进行判断 if (maxResultSize > 0 && resultSize > maxResultSize) { //大于最大限制1G,直接丢弃ResultTask logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //结果大小大于设定的阀值,则放入BlockManager中 val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") //返回非直接返回给Driver的对象TaskResultTask ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { //结果不大,直接传回给Driver logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } //通知Driver Task已完成 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } //省略备份代码 finally {//将Task从运行队列中去除 runningTasks.remove(taskId) } }
4.给Driver汇报状态
Executor会通过TaskRunner在ThreadPool中来运行具体的Task;在TaskRunner的run方法中会通过调用statusUpdate方法给Driver发信息汇报自己的状态。告诉Driver,Task已经开始运行了(Running状态)。
5.反序列化Task的依赖,并下载依赖
TaskRunner内部会有一些准备工作,例如反序列化Task的依赖,然后通过网络来获取需要的文件、Jar等信息。
& 补充:在执行具体的Task的业务逻辑前会进行四次反序列化:1.TaskDescription反序列化 2,Task的反序列化 3,RDD的反序列化 4,反序列化依赖
6.反序列化Task本身
7.获得执行结果
调用反序列化后的Task的run方法来获得任务执行的结果;
final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { //创建Task执行的上下文 context = new TaskContextImpl( stageId, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, metricsSystem, internalAccumulators, runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) try{ //调用runTask执行Task (runTask(context), context.collectAccumulators()) }//省略部分代码 } }
不同Task类型对抽象方法runTask的实现不同,ShuffleMapTask#runTask方法会调用RDD的iterator()方法来计算Task;事实上,其内部会迭代Partition的元素并利用我们自定的function来进行计算。ResultTask计算过程与之类似。不同的是ShuffleMapTask#runTask在计算具体的partition后实际上会通过shuffleManager获取shufflewriter把当前Task的计算结果根据具体的shuffleManager的实现来写入到具体的文件中,操作结束后会把MapStatus发送给DAGScheduler;而ResultTask#runTask会根据前面Stage的执行结果进行Shuffle产生整个Job最后的结果。
& 对于ShuffleMapTask,首先需要对RDD以及其依赖关系进行反序列化;最终计算(不考虑cache和checkpoint)时,会调用RDD#compute方法
& def compute(split: Partition, context: TaskContext): Iterator[T]
& 具体计算时有具体的RDD,例如MapPartitionsRDD的compute;
& override def
compute(split:Partition, context: TaskContext):
Iterator[U] =
f(context, split.index, firstParent[T].iterator(split,context))
& 这里的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码;来源自对我们在该Stage的各个算子中自定义的函数的合并。
8.把Task执行结果序列化
9.判断结果传回Driver的方式
并根据序列化后的DirectResultTask的大小选择不同的方式将结果传回给Driver端。
& 若果结果大于1G(可以通过spark.driver.maxResultSize来进行设置),直接丢弃
& 如果结果“较大”(小于1G但大于一个阀值(akkaFrameSize-AkkaUtils.reservedSizeBytes),在Spark1.6中akkaFrameSize默认为128MB,此时阀值为conf.getInt("spark.akka.frameSize",128)*1024*1024-200*1024),则会放入BlockManager中。(env.blockManager.putBytes(
blockId,serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER))
& 如果结果不大,则直接传回给Driver。
10.通知Driver Task已经完成
调用ExecutorBackend#statusUpdate方法给Driver发信息汇报自己的状态。告诉Driver,Task已经开始完成了(FINISHED状态)。
11.从运行队列移除当前Task
三、Drier端处理解析
1.Driver收到Executor的任务执行结果
由上一节,可以看出Task在Executor执行完成时,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED。
//ExecutorBackend(CoarseGrainedBackend)#stateUpdate override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { //通知Driver case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
2.Driver处理消息
//SchedulerBackend(CoarseGrainedBackend).DriverEndpoint#receive case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) //1 if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => //增加这个Executor的可用CPU数 executorInfo.freeCores += scheduler.CPUS_PER_TASK //重新为这个Executor分配资源 makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }
Driver首先会将任务的状态更新通知给TaskScheduler,然后会在这个Executor上重新分配新的计算任务。(见1)
3.通知TaskScheduler(TaskSchedulerImpl)
//TaskSchedulerImpl#statusUpdate (1处被调用) taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => if (TaskState.isFinished(state)) { //如果Task的状态是FINISHED或者FAILED或者KILLED或者LOST //都是为Task执行结束,清理本地的Task数据结构 taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } } } if (state == TaskState.FINISHED) { //任务完成TaskSetManager标记该任务已经结束,此时Task不一定成功结束 taskSet.removeRunningTask(tid) //处理任务的计算结果 taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) //2 } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { //从TaskSetManager的运行Task队列中去除标记为完成的Task,此时Task不一定 //是成功执行结束。 taskSet.removeRunningTask(tid) //处理失败的情况 taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)//3 } }
& 执行结束状态包括了FINISHED, FAILED, KILLED, LOST 这四种状态,所以标记为执行结束的Task并非是成功执行Task结束的。val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
& 这里Task的状态只有是FINISHED的时候才是成功执行Task结束的标志,其余的状态例如:FAILED、KILLED和LOST都是Task执行失败的标志。
4. TaskScheduler获取Task运行结果
这里TaskScheduler处理Task执行结果时,会交给一个后台守护线程池负责。
/** * 利用一个线程池来反序列化Task执行结果或者在必要是抓取Task结果。 */ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) extends Logging { //设置线程池内线程数,可配置通过spark.resultGetter.threads private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4) private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool( THREADS, "task-result-getter") //设置序列化器 protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { sparkEnv.closureSerializer.newInstance() } } //定义Task成功执行得到的结果的处理逻辑,?处被调用 def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { //通过线程池来获取Task执行的结果 getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { //如果是直接发送到Driver端的Task执行结果,未利用BlockManager即Executor //发送Task的最后一种情况,考参照Executor端执行步骤9,判断传回Driver的方 //式 case directResult: DirectTaskResult[_] => //不符合抓取Task的大小限制 if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } //这里反序列化的值是不加锁的,这主要是为了保证多线程对其访问时,不会出现 //其他线程因为本线程而堵塞的情况,这里我们应该先调用它,获得反序列化的 //值,以便在TaskSetManager#handleSuccessfulTask中再次调用时,不需要再次 //反序列化该值 directResult.value() //得到Task执行的结果,由于是directResult,所以不需要远程读取。 (directResult, serializedData.limit()) //如果Executor返回给Driver的Task执行结果是间接的,需要借助BlockManager case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // 如果结果大小比maxResultSize,则在远程节点上(worker)删除该 //blockManager sparkEnv.blockManager.master.removeBlock(blockId) return } //需要从远程节点上抓取Task执行的结果 logDebug("Fetching indirect task result for TID %s".format(tid)) //标记Task为需要远程抓取的Task并通知DAGScheduler scheduler.handleTaskGettingResult(taskSetManager, tid) //从远程节点的BlockManager上获取Task计算结果 val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { //在Task执行结束获得结果后到driver远程去抓取结果之间,如果运行task的 //机器挂掉,或者该机器的BlockManager已经刷新掉了Task执行结果,都会导致 //远程抓取结果失败,即结果丢失。 scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } //远程抓取结果成功,反序列化结果 val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get) //删除远程的结果 sparkEnv.blockManager.master.removeBlock(blockId) //得到IndirectResult类型的结果 (deserializedResult, size) } result.metrics.setResultSize(size) //标记Task为SuccessfulTask并通知DAGScheduler scheduler.handleSuccessfulTask(taskSetManager, tid, result) //4 } catch {//省略部分非关键代码 } } }) } }
& 这里TaskScheduler获得方式结果主要是依据Driver端得到Executor端返回的Task运行结果确定的,有两种方式1)DirectResult,2)InDirectResult。对于1)直接可以反序列化Driver端接到的返回信息得到Task运行结果;对于2)则需要借助远程节点(worker)上的BlockManager来远程获取结果。
5.TaskScheduler处理运行结果
//4处被调用 def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized { taskSetManager.handleSuccessfulTask(tid, taskResult) }
//TaskSetManager#hadleSuccessfulTask def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markSuccessful() removeRunningTask(tid) //向高层调度器报告结果 5 sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) //判断该TaskSet中Task是否已全部执行成功 if (!successful(index)) {//该Task还未标记为成功执行 //增加执行成功的Task tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) // 标记执行成功的Task,如果TaskSet中的所有Task执行成功则停止该TaskSetManager successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } //从执行失败的集合中删去该Task,用于Task失败重试 failedExecutors.remove(index) //判断TaskSet中Task是否已全部执行完成,是则说明该TaskSet已执行完成,相应的对该 //TaskSetManager的调度结束,从调度池中删除该TaskSetManager maybeFinishTaskSet() }
6.TaskScheduler向高层调度器DAGScheduler报告
//DAGScheduler#taskEnd 5处被调用 def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { //加入DAGScheduler的消息队列,等待处理 eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)) }
//DAGScheduler#doOnReceive case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion)
(1)处理ShuffleMapTask
对于ShuffleMapTask,首先需要将结果保存到Stage,如果当前Stage所有Task都结束了,则将所有的结果注册到MapOutputTrackerMaster;这样下一个Stage的Task就可以通过他来获取Shuffle的结果原数据信息,进而从Shuffle数据所在的节点获取数据了。
//DAGScheduler#handleTaskCompletion event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,event.reason, event.taskInfo, event.taskMetrics)) //从该stage中等待处理的partition中去除Task对应的partition stage.pendingPartitions -= task.partitionId task match { //如果是ShuffleMapTask case smt: ShuffleMapTask => //实例化一个shuffleStage实例,用来保存TaskSet结果 val shuffleStage = stage.asInstanceOf[ShuffleMapStage] //跟新本地的状态信息 updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) //忽略在集群中游走的ShuffleMapTask(来自一个失效的节点的Task结果)。 if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { //将结果保存到Stage中。即将Task结果的输出位置放到Stage的数据结构中。 shuffleStage.addOutputLoc(smt.partitionId, status) } //如果当前Stage运行完毕 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { //标记当前Stage为Finished,并将其从运行中Stage列表中删除 markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) //将整体结果注册到MapOutputTrackerMaster; mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, shuffleStage.outputLocInMapOutputTrackerFormat(), changeEpoch = true) //清除本地缓存 clearCacheLocs() //如果shuffleMapStage中有一些tasks运行失败,没有结果。 if (!shuffleStage.isAvailable) { //则需要重新提交这个shuffleMapStage,并且需要告知顶层调度器TaskScheduler //进行处理。 logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.findMissingPartitions().mkString(", ")) submitStage(shuffleStage) } else { // 标记所有等待这个Stage结束的Map-Stage Job为结束状态 //这里会将这个Job记为Finished状态,并统计输出结果,报告给监听器 if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { markMapStageJobAsFinished(job, stats) } } }
& 从ActiveJob类的注释可以看出,Job可以有两种类型:result job,这会触发ResultStage执行的action操作,或Map-stage Job,在任何下游Stage提交之前计算出其所需的前一个Stage的结果,并对ShuffleMapStage的结果进行映射。后者用于自适应查询计划,用于在提交后期stage之前可以查看上有Stage输出结果的统计信息(如一些结果位置的元数据信息)。我们使用这个类的finalStage字段来对两种类型的Job进行区分。对于Map-Stage会借助MapOutputTracker来映射上游的Stage的Task输出信息,来实现前一个Stage输出信息的位置等元信息传递给后一个Stage的过程;并直接标记Map-Stage
Job结束,并报告输出统计信息给监听器。
& ActiveJob类中会记录Job所需计算的分片(Partition)数目,以及每个Partition是否计算完成。由于Task与Partition是一一对应的,所以我们从这个类中可以知道有多少个Task,与Task执行完成的个数。
(2)处理ResultTask
首先,MapOutputTracker会把ShuffleMapTask执行结果交给ResultTask,然后,ResultTask会根据前面Stage的执行结果进行Shuffle产生整个Job最后的结果。
case rt: ResultTask[_, _] => 实例化一个ResultStage来存储resultTask val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => //如果这个Task未执行完成 if (!job.finished(rt.outputId)) { //更新当前状态 updateAccumulators(event) //将Task所对应的Partition标记为计算完成 job.finished(rt.outputId) = true //当前作业中Partition完成数增加 job.numFinished += 1 // 如果当前的Job所有Partition对已计算完成,就将这个Stage remove掉 if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } //在处理SucceedTask时,会调用一些用户定义的函数,可能会产生异常,为 //了确保程序的健壮性,需要进行异常处理。 try { job.listener.taskSucceeded(rt.outputId, event.result)//⑥ } catch { case e: Exception => //当异常发生时,有时需要标记ResultStage失败。 job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => //在任务进行推测执行时,可能有多个Task的执行结果,对于对于的结果,系统 //会进行忽略处理。 logInfo("Ignoring result from " + rt + " because its job has finished")
//⑥处被调用 JobListener.scala private[spark] trait JobListener { //对Task执行结果进行处理的核心逻辑 def taskSucceeded(index: Int, result: Any) //⑦ //对Task执行失败进行处理的而核心逻辑。 def jobFailed(exception: Exception) }
//对父类trait JobListener中的抽象方法的具体实现 //JobWaiter#taskSucceed 对⑦抽象方法的实现 override def taskSucceeded(index: Int, result: Any): Unit = synchronized { //如果当前Job处理已完成,说明Task进行了重复处理,则会报错。 if (_jobFinished) { throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") } //调用用户逻辑,即用户定义的处理方法,来处理Task的结果 resultHandler(index, result.asInstanceOf[T]) //记录当前Job的Task完成数增加 finishedTasks += 1 //如果当前Job的所有Task都已执行完毕,则表明整个Job完成。 if (finishedTasks == totalTasks) { _jobFinished = true jobResult = JobSucceeded //通知所有调用JobWaiter#awaitResult的方法,Job执行完成,可以继续运行了。 this.notifyAll() } }
& 在DAGScheduler#JobSubmit中,会得到JobWaiter类的实例waiter,从而获得Job的执行结果。最终在DAGScheduler#runJob中,调用waiter#awaitResult,对JobSuceeded进行报告,并写入日志。Job执行就结束了。
7.补充:Task出错处理
对于出错或是执行失败的Task,TaskSchedulerImpl#statsUpdate会调用TaskResultGetter#enqueueFailedTask来处理。这个处理过程与执行成功的Task的处理过程是类似的,它们(执行成功和执行失败的Task)会是公用一个线程池来执行处理逻辑。
// TaskResultGetter# enqueueFailedTask定义Task执行失败的处理逻辑,3处被调用 //这部分可以理解为Scheduler的容错功能。 def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) { //记录执行失败的原因 var reason : TaskEndReason = UnknownReason try { //调用线程池的一个线程来处理。 getTaskResultExecutor.execute(new Runnable { //具体处理逻辑 override def run(): Unit = Utils.logUncaughtExceptions { val loader = Utils.getContextOrSparkClassLoader try { //如果是序列化结果为空或是序列化结果大于规定值,则是序列化失败导致Task执行 //失败。 if (serializedData != null && serializedData.limit() > 0) { reason = serializer.get().deserialize[TaskEndReason]( serializedData, loader) } } catch {//序列化过程中抛出异常。 case cnd: ClassNotFoundException => //由于Task执行失败并非致命性错误,所以这里只需将信息记录到日志里之后,仍然 //可以继续执行程序 logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) case ex: Exception => {} } //调用TaskSchulerImpl#handleFailedTask来处理Task失败,该方法中定义了处理 //Task失败的核心逻辑。 scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) //⑧ } }) } catch {//处理SparkContext已关闭异常 case e: RejectedExecutionException if sparkEnv.isStopped => // ignore it } }
//TaskScheduler#hadleFailTask ⑧处调用 def handleFailedTask( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskEndReason): Unit = synchronized { //调用TaskSetManager处理失败的情况。 taskSetManager.handleFailedTask(tid, taskState, reason) //⑨ if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { //这里需要重新进行资源调度来执行失败的Task,而失败的Task的状态(例如执行失败次数 //等)已由TaskManager进行了更新,来反应该任务是失败后重新执行的Task backend.reviveOffers() } }
TaskSetManager#handlerFailTask方法主要是将任务标记为失败,并将它重新添加到待处理任务列表,同时通知高层调度器DAG Scheduler。
//TaskSetManager#handlerFailTask ⑨处调用 def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) { //省略部分非关键代码,这些代码主要处理一些出错信息,并根据不同的出错信息做一些日志记 //录操作。 //如果Executor failed,则尝试重新加入这些Executor。 failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). put(info.executorId, clock.getTimeMillis()) //调用高层调度器(DAGScheduler)进行容错 //⑩ sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) //将Task加入到待处理任务列表 addPendingTask(index) if (!isZombie && state != TaskState.KILLED && reason.isInstanceOf[TaskFailedReason] //这里的countTowardTaskFailures指,是否允许在Stage被丢弃前,执行最大次数的 //Task失败重试。只有当Task的执行失败与Task本身无关时,才会设置为false(例 //如,执行Task的Executor挂掉了)。 && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) { assert (null != failureReason) numFailures(index) += 1 //如果失败次数大于最大失败次数,则将Task丢弃。 if (numFailures(index) >= maxTaskFailures) { logError("Task %d in stage %s failed %d times; aborting job".format( index, taskSet.id, maxTaskFailures)) abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" .format(index, taskSet.id, maxTaskFailures, failureReason), failureException) return } } //判断TaskSet中Task是否已全部执行完成,是则说明该TaskSet已执行完成,相应的对该 //TaskSetManager的调度结束,从调度池中删除该TaskSetManager maybeFinishTaskSet() }
给高层发送消息,调用高层容错机制。
//DAGScheduler#taskEnd def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)) }
与执行成功的Task一样,向高层调度器DAGScheduler发送的是由CompletionEvent封装的消息。而DAGScheduler会接收到这个消息,对其进行容错处理。
//DAGScheduler#doOnReceive case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion)
这里是否有似曾相识的感觉,其实步骤6中也有这一过程,再DAGScheduler#handleTaskCompletion中,会根据不同的event#reason,也就是出错信息,进行处理。主要处理的有重复提交Resubmitted和远程读取失败FetchFailed,而其他出错情况则大都采用鸵鸟政策,什么也不做。这边是高层DAGScheduler的容错处理。
& 通过对Driver端执行的过程的观察,我们可以看出底层调度器和高层调度器是紧密合作的,很多时候,在接收到Worker端的StateUpdate信息后,先由TaskSchedulerImpl进行处理,然后同时底层调度器,将这些信息报告给高层调度器,就通信过程来看,真正与Worker联系的是底层调度器,这是在Task层次上的;而底层调度其会将这些信息进行加工,向高层调度器报告,这是联系的内容大都是TaskSetManager,所以这是就是在TaskSetManager层次上进行处理的。所以我们可以看到底层和高层进行处理时,所处的层次是不一样的,这也就是为什么会划分两个调度器的原因了。
& 对于容错,底层调度器和高层调度器也是合作进行的,所以Task在出错时,会进行两个层次上的容错处理,这就大大提交了容错的效率和可靠性。
参考书目:张安站 --Spark技术内幕
说明:
本文是由DT大数据梦工厂的IFM课程第37课为基础上,加入一些参考资料所做的笔记