【Spark2.0源码学习】-10.Task执行与回馈

通过上一节内容,DriverEndpoint最终生成多个可执行的TaskDescription对象,并向各个ExecutorEndpoint发送LaunchTask指令,本节内容将关注ExecutorEndpoint如何处理LaunchTask指令,处理完成后如何回馈给DriverEndpoint,以及整个job最终如何多次调度直至结束。

一、Task的执行流程

     承接上一节内容,Executor接受LaunchTask指令后,开启一个新线程TaskRunner解析RDD,并调用RDD的compute方法,归并函数得到最终任务执行结果

  • ExecutorEndpoint接受到LaunchTask指令后,解码出TaskDescription,调用Executor的launchTask方法
  • Executor创建一个TaskRunner线程,并启动线程,同时将改线程添加到Executor的成员对象中,代码如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
runningTasks.put(taskDescription.taskId, taskRunner)
  • TaskRunner
    • 首先向DriverEndpoint发送任务最新状态为RUNNING
    • 从TaskDescription解析出Task,并调用Task的run方法
  • Task
    • 创建TaskContext以及CallerContext(与HDFS交互的上下文对象)
    • 执行Task的runTask方法
      • 如果Task实例为ShuffleMapTask:解析出RDD以及ShuffleDependency信息,调用RDD的compute()方法将结果写Writer中(Writer这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回MapStatus对象
      • 如果Task实例为ResultTask:解析出RDD以及合并函数信息,调用函数将调用后的结果返回
  • TaskRunner将Task执行的结果序列化,再次向DriverEndpoint发送任务最新状态为FINISHED

二、Task的回馈流程

     TaskRunner执行结束后,都将执行状态发送至DriverEndpoint,DriverEndpoint最终反馈指令CompletionEvent至DAGSchedulerEventProcessLoop中

  • DriverEndpoint接受到StatusUpdate消息后,调用TaskScheduler的statusUpdate(taskId, state, result)方法
  • TaskScheduler如果任务结果是完成,那么清除该任务处理中的状态,并调动TaskResultGetter相关方法,关键代码如下:
val taskSet = taskIdToTaskSetManager.get(tid)

taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
  executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
taskSet.removeRunningTask(tid)

if (state == TaskState.FINISHED) {
  taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
  taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
  • TaskResultGetter启动线程启动线程【task-result-getter】进行相关处理
    • 通过解析或者远程获取得到Task的TaskResult对象
    • 调用TaskSet的handleSuccessfulTask方法,TaskSet的handleSuccessfulTask方法直接调用TaskSetManager的handleSuccessfulTask方法
  • TaskSetManager
    • 更新内部TaskInfo对象状态,并将该Task从运行中Task的集合删除,代码如下:
val info = taskInfos(tid)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
    • 调用DAGScheduler的taskEnded方法,关键代码如下:
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
  • DAGScheduler向DAGSchedulerEventProcessLoop存入CompletionEvent指令,CompletionEvent对象定义如下
private[scheduler] case class CompletionEvent(
    task: Task[_],
    reason: TaskEndReason,
    result: Any,
    accumUpdates: Seq[AccumulatorV2[_, _]],
    taskInfo: TaskInfo)
  extends DAGSchedulerEvent

三、Task的迭代流程

DAGSchedulerEventProcessLoop中针对于CompletionEvent指令,调用DAGScheduler进行处理,DAGScheduler更新Stage与该Task的关系状态,如果Stage下Task都返回,则做下一层Stage的任务拆解与运算工作,直至Job被执行完毕
  

  • DAGSchedulerEventProcessLoop接收到CompletionEvent指令后,调用DAGScheduler的handleTaskCompletion方法
  • DAGScheduler根据Task的类型分别处理
  • 如果Task为ShuffleMapTask
    • 待回馈的Partitions减取当前partitionId
    • 如果所有task都返回,则markStageAsFinished(shuffleStage),同时向MapOutputTrackerMaster注册MapOutputs信息,且markMapStageJobAsFinished
    • 调用submitWaitingChildStages(shuffleStage)进行下层Stages的处理,从而迭代处理最终处理到ResultTask,job结束,关键代码如下:
private def submitWaitingChildStages(parent: Stage) {
   ...
  val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
  waitingStages --= childStages
  for (stage <- childStages.sortBy(_.firstJobId)) {
    submitStage(stage)
  }
}
  • 如果Task为ResultTask
    • 改job的partitions都已返回,则markStageAsFinished(resultStage),并cleanupStateForJobAndIndependentStages(job),关键代码如下
for (stage <- stageIdToStage.get(stageId)) {
  if (runningStages.contains(stage)) {
    logDebug("Removing running stage %d".format(stageId))
    runningStages -= stage
  }
  for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
    shuffleIdToMapStage.remove(k)
  }
  if (waitingStages.contains(stage)) {
    logDebug("Removing stage %d from waiting set.".format(stageId))
    waitingStages -= stage
  }
  if (failedStages.contains(stage)) {
    logDebug("Removing stage %d from failed set.".format(stageId))
    failedStages -= stage
  }
}
// data structures based on StageId
stageIdToStage -= stageId
jobIdToStageIds -= job.jobId
jobIdToActiveJob -= job.jobId
activeJobs -= job

至此,用户编写的代码最终调用Spark分布式计算完毕。

时间: 2024-12-25 04:52:18

【Spark2.0源码学习】-10.Task执行与回馈的相关文章

【Spark2.0源码学习】-6.Client启动

Client作为Endpoint的具体实例,下面我们介绍一下Client启动以及OnStart指令后的额外工作 一.脚本概览 下面是一个举例: /opt/jdk1.7.0_79/bin/java -cp /opt/spark-2.1.0/conf/:/opt/spark-2.1.0/jars/*:/opt/hadoop-2.6.4/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --maste

【Spark2.0源码学习】-5.Worker启动

Worker作为Endpoint的具体实例,下面我们介绍一下Worker启动以及OnStart指令后的额外工作 一.脚本概览 下面是一个举例: /opt/jdk1.7.0_79/bin/java -cp /opt/spark-2.1.0/conf/:/opt/spark-2.1.0/jars/*:/opt/hadoop-2.6.4/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --web

Solr4.8.0源码分析(10)之Lucene的索引文件(3)

Solr4.8.0源码分析(10)之Lucene的索引文件(3) 1. .si文件 .si文件存储了段的元数据,主要涉及SegmentInfoFormat.java和Segmentinfo.java这两个文件.由于本文介绍的Solr4.8.0,所以对应的是SegmentInfoFormat的子类Lucene46SegmentInfoFormat. 首先来看下.si文件的格式 头部(header) 版本(SegVersion) doc个数(SegSize) 是否符合文档格式(IsCompoundF

[Android FrameWork 6.0源码学习] View的重绘过程之WindowManager的addView方法

博客首页:http://www.cnblogs.com/kezhuang/p/ 关于Activity的contentView的构建过程,我在我的博客中已经分析过了,不了解的可以去看一下 <[Android FrameWork 6.0源码学习] Window窗口类分析> 本章博客是接着上边那篇博客分析,目的是为了引出分析ViewRootImpl这个类.现在只是分析完了Window和ActivityThread的调用过程 从ActivityThread到WindowManager再到ViewRoo

lodash源码学习(10)

_.delay(func, wait, [args]) 延迟wait毫秒之后调用该函数,添加的参数为函数调用时的参数 //delay.js var baseDelay = require('./_baseDelay'),//baseDelay方法 baseRest = require('./_baseRest'),//创建使用rest参数方法 toNumber = require('./toNumber');//转化为数字 /** * * @param {Function} func 需要延迟执

[Android FrameWork 6.0源码学习] LayoutInflater 类分析

LayoutInflater是用来解析XML布局文件,然后生成对象的ViewTree的工具类.是这个工具类的存在,才能让我们写起Layout来那么省劲. 我们接下来进去刨析,看看里边的奥秘 //调用inflate方法就可以把XML解析成View对象 View contentView = LayoutInflater.from(this).inflate(R.layout.activity_main, null); 我们在使用这个类的时候,通常都是像上面这样写,首先通过from函数获取对象,在调用

[Android FrameWork 6.0源码学习] View的重绘过程之Draw

View绘制的三部曲,测量,布局,绘画现在我们分析绘画部分测量和布局 在前两篇文章中已经分析过了.不了解的可以去我的博客里找一下 下面进入正题,开始分析调用以及函数原理 private void performDraw() { if (mAttachInfo.mDisplayState == Display.STATE_OFF && !mReportNextDraw) { return; } final boolean fullRedrawNeeded = mFullRedrawNeede

spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理 看线程运行的run方法,见代码注释 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClass

[Android FrameWork 6.0源码学习] ViewGroup的addView函数分析

Android中整个的View的组装是采用组合模式. ViewGroup就相当与树根,各种Layout就相当于枝干,各种子View,就相当于树叶. 至于View类.我们就当它是个种子吧.哈哈! ViewGroup属于树根,可以生长数很多枝干(继承自定义Layout)而枝干上有可以长出很多叶子(TextView,ImageVIew......) 好,闲话少叙,接下来步入正题! 首先,关于View的操作方法,被定义在一个叫做ViewManager的接口中,接口中还有两个方法,分别是移除和更新,这次主