【Spark】DAGScheduler源码浅析

DAGScheduler

DAGScheduler的主要任务是基于Stage构建DAG,决定每个任务的最佳位置

  • 记录哪个RDD或者Stage输出被物化
  • 面向stage的调度层,为job生成以stage组成的DAG,提交TaskSet给TaskScheduler执行
  • 重新提交shuffle输出丢失的stage

每一个Stage内,都是独立的tasks,他们共同执行同一个computefunction,享有相同的shuffledependencies。DAG在切分stage的时候是依照出现shuffle为界限的。

DAGScheduler实例化

下面的代码是SparkContext实例化DAGScheduler的过程:

  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }

下面代码显示了DAGScheduler的构造函数定义中,通过绑定TaskScheduler的方式创建,其中次构造函数去调用主构造函数来将sc的字段填充入参:

private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

作业提交与DAGScheduler操作

Action的大部分操作会进行作业(job)的提交,源码1.0版的job提交过程的大致调用链是:sc.runJob()–>dagScheduler.runJob–>dagScheduler.submitJob—>dagSchedulerEventProcessActor.JobSubmitted–>dagScheduler.handleJobSubmitted–>dagScheduler.submitStage–>dagScheduler.submitMissingTasks–>taskScheduler.submitTasks

具体的作业提交执行期的函数调用为:

  1. sc.runJob->dagScheduler.runJob->submitJob
  2. DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor(在源码1.4中,submitJob函数中,使用DAGSchedulerEventProcessLoop类进行事件的处理)
  3. eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
  4. job到stage的转换,生成finalStage并提交运行,关键是调用submitStage
  5. 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
  6. 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
  7. 提交task是调用函数submitMissingTasks来完成
  8. task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
  9. TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend
  10. LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
  11. receiveOffers->executor.launchTask->TaskRunner.run

DAGScheduler的runJob函数

DAGScheduler.runjob最后把结果通过resultHandler保存返回。

这里DAGScheduler的runJob函数调用DAGScheduler的submitJob函数来提交任务:

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      }
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

作业提交的调度

在Spark源码1.4.0中,DAGScheduler的submitJob函数不再使用DAGEventProcessActor进行事件处理和消息通信,而是使用DAGSchedulerEventProcessLoop类实例eventProcessLoop进行JobSubmitted事件的post动作。

下面是submitJob函数代码:

  /**
   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
   * can be used to block until the the job finishes executing or can be used to cancel the job.
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
    waiter
  }

当eventProcessLoop对象投递了JobSubmitted事件之后,对象内的eventThread线程实例对事件进行处理,不断从事件队列中取出事件,调用onReceive函数处理事件,当匹配到JobSubmitted事件后,调用DAGScheduler的handleJobSubmitted函数并传入jobid、rdd等参数来处理Job。

handleJobSubmitted函数

Job处理过程中handleJobSubmitted比较关键,该函数主要负责RDD的依赖性分析,生成finalStage,并根据finalStage来产生ActiveJob。

在handleJobSubmitted函数源码中,给出了部分注释:

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      //错误处理,告诉监听器作业失败,返回....
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // 很短、没有父stage的本地操作,比如 first() or take() 的操作本地执行
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        // collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
  }

小结

该篇文章介绍了DAGScheduler从SparkContext中进行实例化,到执行Action操作时提交任务调用runJob函数,进而介绍了提交任务的消息调度,和处理Job函数handleJobSubmitted函数。

由于在handleJobSubmitted函数中涉及到依赖性分析和stage的源码内容,于是我计划在下一篇文章里进行介绍和源码分析。

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-07-31 13:02:16

【Spark】DAGScheduler源码浅析的相关文章

【Spark】DAGScheduler源码浅析2

引入 上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的Spark源码分析 – DAGScheduler一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数. 事件处理 在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理.在Spark 1.0源码

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

【Spark Core】任务执行机制和Task源码浅析1

引言 上一小节<TaskScheduler源码与任务提交原理浅析2>介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息. 我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor执行的Task分为ShuffleMap

【Spark Core】任务执行机制和Task源码浅析2

引言 上一小节<任务执行机制和Task源码浅析1>介绍了Executor的注册过程. 这一小节,我将从Executor端,就接收LaunchTask消息之后Executor的执行任务过程进行介绍. 1. Executor的launchTasks函数 DriverActor提交任务,发送LaunchTask指令给CoarseGrainedExecutorBackend,接收到指令之后,让它内部的executor来发起任务,即调用空闲的executor的launchTask函数. 下面是Coars

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

Volley框架源码浅析(一)

尊重原创http://blog.csdn.net/yuanzeyao/article/details/25837897 从今天开始,我打算为大家呈现关于Volley框架的源码分析的文章,Volley框架是Google在2013年发布的,主要用于实现频繁而且粒度比较细小的Http请求,在此之前Android中进行Http请求通常是使用HttpUrlConnection和HttpClient进行,但是使用起来非常麻烦,而且效率比较地下,我想谷歌正式基于此种原因发布了Volley框架,其实出了Voll

PM2源码浅析

PM2工作原理 最近在玩一个游戏,<地平线:黎明时分>,最终Boss是一名叫黑底斯的人,所谓为人,也许不对,黑底斯是一段强大的毁灭进程,破坏了盖娅主进程,从而引发的整个大陆机械兽劣化故事. 为什么要讲这么一段呢,是希望大家可以更好地理解pm2的原理,要理解pm2就要理解god和santan的关系,god和santan的关系就相当于盖娅和黑底斯在pm2中的01世界中,每一行代码每一个字节都安静的工作god就是Daemon进程 守护进程,重启进程,守护node程序世界的安宁,santan就是进程的

Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置

Android源码浅析(一)--VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置 最近地方工作,就是接触源码的东西了,所以好东西还是要分享,系列开了这么多,完结 的也没几个,主要还是自己覆盖的太广了,却又不精通,嘿嘿,工作需要,所以写下了本篇博客 一.VMware 12 我选择的虚拟机试VMware,挺好用的感觉,下载VMware就不说了,善用搜索键嘛,这里我提供一个我现在在用的 下载地址:链接:http://pan.baidu.com/s/1k