Spark源码分析之二:Job的调度模型与运行反馈

《Spark源码分析之Job提交运行总流程概述》一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:

1、Job的调度模型与运行反馈;

2、Stage划分;

3、Stage提交:对应TaskSet的生成。

今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:

[java] view plain copy

  1. /**
  2. * Run an action job on the given RDD and pass all the results to the resultHandler function as
  3. * they arrive.
  4. *
  5. * @param rdd target RDD to run tasks on
  6. * @param func a function to run on each partition of the RDD
  7. * @param partitions set of partitions to run on; some jobs may not want to compute on all
  8. *   partitions of the target RDD, e.g. for operations like first()
  9. * @param callSite where in the user program this job was called
  10. * @param resultHandler callback to pass each result to
  11. * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
  12. *
  13. * @throws Exception when the job fails
  14. */
  15. def runJob[T, U](
  16. rdd: RDD[T],
  17. func: (TaskContext, Iterator[T]) => U,
  18. partitions: Seq[Int],
  19. callSite: CallSite,
  20. resultHandler: (Int, U) => Unit,
  21. properties: Properties): Unit = {
  22. // 开始时间
  23. val start = System.nanoTime
  24. // 调用submitJob()方法,提交Job,返回JobWaiter
  25. // rdd为最后一个rdd,即target RDD to run tasks on
  26. // func为该rdd上每个分区需要执行的函数,a function to run on each partition of the RDD
  27. // partitions为该rdd上需要执行操作的分区集合,set of partitions to run on
  28. // callSite为用户程序job被调用的地方,where in the user program this job was called
  29. val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  30. // JobWaiter调用awaitResult()方法等待结果
  31. waiter.awaitResult() match {
  32. case JobSucceeded => // Job运行成功
  33. logInfo("Job %d finished: %s, took %f s".format
  34. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  35. case JobFailed(exception: Exception) =>// Job运行失败
  36. logInfo("Job %d failed: %s, took %f s".format
  37. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  38. // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
  39. val callerStackTrace = Thread.currentThread().getStackTrace.tail
  40. exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
  41. throw exception
  42. }
  43. }

runJob()方法就做了三件事:

首先,获取开始时间,方便最后计算Job执行时间;

其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;

最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。

awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:

[java] view plain copy

  1. def awaitResult(): JobResult = synchronized {
  2. // 循环,如果标志位_jobFinished为false,则一直循环,否则退出,返回JobResult
  3. while (!_jobFinished) {
  4. this.wait()
  5. }
  6. return jobResult
  7. }

而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:

[java] view plain copy

  1. // 任务运行完成
  2. override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
  3. if (_jobFinished) {
  4. throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
  5. }
  6. resultHandler(index, result.asInstanceOf[T])
  7. finishedTasks += 1
  8. // 已完成Task数目是否等于总Task数目
  9. if (finishedTasks == totalTasks) {
  10. // 设置标志位_jobFinished为ture
  11. _jobFinished = true
  12. // 作业运行结果为成功
  13. jobResult = JobSucceeded
  14. this.notifyAll()
  15. }
  16. }
  17. // 作业失败
  18. override def jobFailed(exception: Exception): Unit = synchronized {
  19. // 设置标志位_jobFinished为ture
  20. _jobFinished = true
  21. // 作业运行结果为失败
  22. jobResult = JobFailed(exception)
  23. this.notifyAll()
  24. }

接下来,看看submitJob()方法,代码定义如下:

[java] view plain copy

  1. /**
  2. * Submit an action job to the scheduler.
  3. *
  4. * @param rdd target RDD to run tasks on
  5. * @param func a function to run on each partition of the RDD
  6. * @param partitions set of partitions to run on; some jobs may not want to compute on all
  7. *   partitions of the target RDD, e.g. for operations like first()
  8. * @param callSite where in the user program this job was called
  9. * @param resultHandler callback to pass each result to
  10. * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
  11. *
  12. * @return a JobWaiter object that can be used to block until the job finishes executing
  13. *         or can be used to cancel the job.
  14. *
  15. * @throws IllegalArgumentException when partitions ids are illegal
  16. */
  17. def submitJob[T, U](
  18. rdd: RDD[T],
  19. func: (TaskContext, Iterator[T]) => U,
  20. partitions: Seq[Int],
  21. callSite: CallSite,
  22. resultHandler: (Int, U) => Unit,
  23. properties: Properties): JobWaiter[U] = {
  24. // Check to make sure we are not launching a task on a partition that does not exist.
  25. // 检测rdd分区以确保我们不会在一个不存在的partition上launch一个task
  26. val maxPartitions = rdd.partitions.length
  27. partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  28. throw new IllegalArgumentException(
  29. "Attempting to access a non-existent partition: " + p + ". " +
  30. "Total number of partitions: " + maxPartitions)
  31. }
  32. // 为Job生成一个jobId,jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增
  33. val jobId = nextJobId.getAndIncrement()
  34. // 如果partitions大小为0,即没有需要执行任务的分区,快速返回
  35. if (partitions.size == 0) {
  36. // Return immediately if the job is running 0 tasks
  37. return new JobWaiter[U](this, jobId, 0, resultHandler)
  38. }
  39. assert(partitions.size > 0)
  40. // func转化下,否则JobSubmitted无法接受这个func参数,T转变为_
  41. val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  42. // 创建一个JobWaiter对象
  43. val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  44. // eventProcessLoop加入一个JobSubmitted事件到事件队列中
  45. eventProcessLoop.post(JobSubmitted(
  46. jobId, rdd, func2, partitions.toArray, callSite, waiter,
  47. SerializationUtils.clone(properties)))
  48. // 返回JobWaiter
  49. waiter
  50. }

submitJob()方法一共做了5件事情:

第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;

第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;

第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;

第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;

第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。

这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:

[java] view plain copy

  1. // 创建DAGSchedulerEventProcessLoop类型的成员变量eventProcessLoop
  2. private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。

[java] view plain copy

  1. /**
  2. * An event loop to receive events from the caller and process all events in the event thread. It
  3. * will start an exclusive event thread to process all events.
  4. * EventLoop用来接收来自调用者的事件并在event thread中除了所有的事件。它将开启一个专门的事件处理线程处理所有的事件。
  5. *
  6. * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
  7. * handle events in time to avoid the potential OOM.
  8. */
  9. private[spark] abstract class EventLoop[E](name: String) extends Logging {
  10. // LinkedBlockingDeque类型的事件队列,队列元素为E类型
  11. private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
  12. // 标志位
  13. private val stopped = new AtomicBoolean(false)
  14. // 事件处理线程
  15. private val eventThread = new Thread(name) {
  16. // 设置为后台线程
  17. setDaemon(true)
  18. override def run(): Unit = {
  19. try {
  20. // 如果标志位stopped没有被设置为true,一直循环
  21. while (!stopped.get) {
  22. // 从事件队列中take一条事件
  23. val event = eventQueue.take()
  24. try {
  25. // 调用onReceive()方法进行处理
  26. onReceive(event)
  27. } catch {
  28. case NonFatal(e) => {
  29. try {
  30. onError(e)
  31. } catch {
  32. case NonFatal(e) => logError("Unexpected error in " + name, e)
  33. }
  34. }
  35. }
  36. }
  37. } catch {
  38. case ie: InterruptedException => // exit even if eventQueue is not empty
  39. case NonFatal(e) => logError("Unexpected error in " + name, e)
  40. }
  41. }
  42. }
  43. def start(): Unit = {
  44. if (stopped.get) {
  45. throw new IllegalStateException(name + " has already been stopped")
  46. }
  47. // Call onStart before starting the event thread to make sure it happens before onReceive
  48. onStart()
  49. eventThread.start()
  50. }
  51. def stop(): Unit = {
  52. if (stopped.compareAndSet(false, true)) {
  53. eventThread.interrupt()
  54. var onStopCalled = false
  55. try {
  56. eventThread.join()
  57. // Call onStop after the event thread exits to make sure onReceive happens before onStop
  58. onStopCalled = true
  59. onStop()
  60. } catch {
  61. case ie: InterruptedException =>
  62. Thread.currentThread().interrupt()
  63. if (!onStopCalled) {
  64. // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
  65. // it‘s already called.
  66. onStop()
  67. }
  68. }
  69. } else {
  70. // Keep quiet to allow calling `stop` multiple times.
  71. }
  72. }
  73. /**
  74. * Put the event into the event queue. The event thread will process it later.
  75. * 将事件加入到时间队列。事件线程过会会处理它。
  76. */
  77. def post(event: E): Unit = {
  78. // 将事件加入到待处理队列
  79. eventQueue.put(event)
  80. }
  81. /**
  82. * Return if the event thread has already been started but not yet stopped.
  83. */
  84. def isActive: Boolean = eventThread.isAlive
  85. /**
  86. * Invoked when `start()` is called but before the event thread starts.
  87. */
  88. protected def onStart(): Unit = {}
  89. /**
  90. * Invoked when `stop()` is called and the event thread exits.
  91. */
  92. protected def onStop(): Unit = {}
  93. /**
  94. * Invoked in the event thread when polling events from the event queue.
  95. *
  96. * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
  97. * and cannot process events in time. If you want to call some blocking actions, run them in
  98. * another thread.
  99. */
  100. protected def onReceive(event: E): Unit
  101. /**
  102. * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
  103. * will be ignored.
  104. */
  105. protected def onError(e: Throwable): Unit
  106. }

我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:

[java] view plain copy

  1. // LinkedBlockingDeque类型的事件队列,队列元素为E类型
  2. private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:

[java] view plain copy

  1. // 事件处理线程
  2. private val eventThread = new Thread(name) {
  3. // 设置为后台线程
  4. setDaemon(true)
  5. override def run(): Unit = {
  6. try {
  7. // 如果标志位stopped没有被设置为true,一直循环
  8. while (!stopped.get) {
  9. // 从事件队列中take一条事件
  10. val event = eventQueue.take()
  11. try {
  12. // 调用onReceive()方法进行处理
  13. onReceive(event)
  14. } catch {
  15. case NonFatal(e) => {
  16. try {
  17. onError(e)
  18. } catch {
  19. case NonFatal(e) => logError("Unexpected error in " + name, e)
  20. }
  21. }
  22. }
  23. }
  24. } catch {
  25. case ie: InterruptedException => // exit even if eventQueue is not empty
  26. case NonFatal(e) => logError("Unexpected error in " + name, e)
  27. }
  28. }
  29. }

那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:

[java] view plain copy

  1. /**
  2. * Put the event into the event queue. The event thread will process it later.
  3. * 将事件加入到时间队列。事件线程过会会处理它。
  4. */
  5. def post(event: E): Unit = {
  6. // 将事件加入到待处理队列
  7. eventQueue.put(event)
  8. }

言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:

[java] view plain copy

  1. /**
  2. * The main event loop of the DAG scheduler.
  3. * DAGScheduler中事件主循环
  4. */
  5. override def onReceive(event: DAGSchedulerEvent): Unit = {
  6. val timerContext = timer.time()
  7. try {
  8. // 调用doOnReceive()方法,将DAGSchedulerEvent类型的event传递进去
  9. doOnReceive(event)
  10. } finally {
  11. timerContext.stop()
  12. }
  13. }

继续看doOnReceive()方法,代码如下:

[java] view plain copy

  1. // 事件处理调度函数
  2. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  3. // 如果是JobSubmitted事件,调用dagScheduler.handleJobSubmitted()方法处理
  4. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
  5. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  6. // 如果是MapStageSubmitted事件,调用dagScheduler.handleMapStageSubmitted()方法处理
  7. case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
  8. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
  9. case StageCancelled(stageId) =>
  10. dagScheduler.handleStageCancellation(stageId)
  11. case JobCancelled(jobId) =>
  12. dagScheduler.handleJobCancellation(jobId)
  13. case JobGroupCancelled(groupId) =>
  14. dagScheduler.handleJobGroupCancelled(groupId)
  15. case AllJobsCancelled =>
  16. dagScheduler.doCancelAllJobs()
  17. case ExecutorAdded(execId, host) =>
  18. dagScheduler.handleExecutorAdded(execId, host)
  19. case ExecutorLost(execId) =>
  20. dagScheduler.handleExecutorLost(execId, fetchFailed = false)
  21. case BeginEvent(task, taskInfo) =>
  22. dagScheduler.handleBeginEvent(task, taskInfo)
  23. case GettingResultEvent(taskInfo) =>
  24. dagScheduler.handleGetTaskResult(taskInfo)
  25. case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
  26. dagScheduler.handleTaskCompletion(completion)
  27. case TaskSetFailed(taskSet, reason, exception) =>
  28. dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
  29. case ResubmitFailedStages =>
  30. dagScheduler.resubmitFailedStages()
  31. }

对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。

好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50667966

时间: 2024-08-05 01:11:14

Spark源码分析之二:Job的调度模型与运行反馈的相关文章

spark 源码分析之二 -- SparkContext 的初始化过程

创建或使用现有Session 从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下: 1 val spark = SparkSession 2 .builder 3 .appName("SparkTC") 4 .getOrCreate() 首先,使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Unity时钟定时器插件——Vision Timer源码分析之二

Unity时钟定时器插件--Vision Timer源码分析之二 By D.S.Qiu 尊重他人的劳动,支持原创,转载请注明出处:http.dsqiu.iteye.com 前面的已经介绍了vp_Timer(点击前往查看),vp_TimeUtility相对简单很多,vp_TimeUtility定义了个表示时间的结构Units: C#代码   /// <summary> /// represents a time measured in standard units /// </summar