【Spark Core】TaskScheduler源码与任务提交原理浅析2

引言

上一节《TaskScheduler源码与任务提交原理浅析1》介绍了TaskScheduler的创建过程,在这一节中,我将承接《Stage生成和Stage源码浅析》中的submitMissingTasks函数继续介绍task的创建和分发工作。

DAGScheduler中的submitMissingTasks函数

如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。

submitMissingTasks负责创建新的Task。

Spark将由Executor执行的Task分为ShuffleMapTask和ResultTask两种。

每个Stage生成Task的时候根据Stage中的isShuffleMap标记确定是否为ShuffleMapStage,如果标记为真,则这个Stage输出的结果会经过Shuffle阶段作为下一个Stage的输入,创建ShuffleMapTask;否则是ResultStage,这样会创建ResultTask,Stage的结果会输出到Spark空间;最后,Task是通过taskScheduler.submitTasks来提交的。

计算流程

submitMissingTasks的计算流程如下:

  1. 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
  2. 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
  3. 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task。
  4. 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的。
  5. 通过TaskScheduler提交TaskSet。

部分代码

下面是submitMissingTasks判断是否为ShuffleMapStage的部分代码,其中部分参数说明在注释中:

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        //stage.id:Stage的序号
        //taskBinary:这个在下面具体介绍
        //part:RDD对应的partition
        //locs:最适合的执行位置
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }
    } else {
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        //p:partition索引,表示从哪个partition读取数据
        //id:输出的分区索引,表示reduceID
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

关于taskBinary参数:这是RDD和ShuffleDependency的广播变量(broadcase version),作为序列化之后的结果。

这里将RDD和其依赖关系进行序列化,在executor运行task之前再进行反序列化。这种方式对不同的task之间提供了较好的隔离。

下面是submitMissingTasks进行任务提交的部分代码:

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }

TaskSchedulerImpl中的submitTasks

submitTasks的流程如下:

  1. 任务(tasks)会被包装成TaskSetManager(由于TaskSetManager不是线程安全的,所以源码中需要进行同步)
  2. TaskSetManager实例通过schedulableBuilder(分为FIFOSchedulableBuilder和FairSchedulableBuilder两种)投入调度池中等待调度
  3. 任务提交同时启动定时器,如果任务还未被执行,定时器会持续发出警告直到任务被执行
  4. 调用backend的reviveOffers函数,向backend的driverActor实例发送ReviveOffers消息,driveerActor收到ReviveOffers消息后,调用makeOffers处理函数
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

TaskSetManager调度

每个Stage一经确认,生成相应的TaskSet(即为一组tasks),其对应一个TaskSetManager通过Stage回溯到最源头缺失的Stage提交到调度池pool中,在调度池中,这些TaskSetMananger又会根据Job ID排序,先提交的Job的TaskSetManager优先调度,然后一个Job内的TaskSetManager ID小的先调度,并且如果有未执行完的父母Stage的TaskSetManager,则不会提交到调度池中。

reviveOffers函数代码

下面是CoarseGrainedSchedulerBackend的reviveOffers函数:

  override def reviveOffers() {
    driverActor ! ReviveOffers
  }

driveerActor收到ReviveOffers消息后,调用makeOffers处理函数。

DriverActor的makeOffers函数

makeOffers函数的处理逻辑是:

  1. 找到空闲的Executor,分发的策略是随机分发的,即尽可能将任务平摊到各个Executor
  2. 如果有空闲的Executor,就将任务列表中的部分任务利用launchTasks发送给指定的Executor

SchedulerBackend(这里实际是CoarseGrainedSchedulerBackend)负责将新创建的Task分发给Executor,从launchTasks代码中可以看出,在发送LauchTasks指令之前需要将TaskDescription序列化。

    // Make fake resource offers on all executors
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

TaskSchedulerImpl中的resourceOffers函数

任务是随机分发给各个Executor的,资源分配的工作由resourceOffers函数处理。

正如上面submitTasks函数提到的,在TaskSchedulerImpl中,这一组Task被交给一个新的TaskSetManager实例进行管理,所有的TaskSetManager经由SchedulableBuilder根据特定的调度策略进行排序,在TaskSchedulerImpl的resourceOffers函数中,当前被选择的TaskSetManager的ResourceOffer函数被调用并返回包含了序列化任务数据的TaskDescription,最后这些TaskDescription再由SchedulerBackend派发到ExecutorBackend去执行

resourceOffers主要做了3件事:

  1. 从Workers里面随机抽出一些来执行任务。
  2. 通过TaskSetManager找出和Worker在一起的Task,最后编译打包成TaskDescription返回。
  3. 将Worker–>Array[TaskDescription]的映射关系返回。
  /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    // 遍历worker提供的资源,更新executor相关的映射
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }
    // 从worker当中随机选出一些来,防止任务都堆在一个机器上
    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // getSortedTask函数对taskset进行排序
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer,把本地性最高的Task分给Worker
    // 本地性是根据当前的等待时间来确定的任务本地性的级别。
    // 它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。

    //1. 首先依次遍历 sortedTaskSets, 并对于每个 Taskset, 遍历 TaskLocality
    //2. 越 local 越优先, 找不到(launchedTask 为 false)才会到下个 locality 级别
    //3. (封装在resourceOfferSingleTaskSet函数)在多次遍历offer list,
    //因为一次taskSet.resourceOffer只会占用一个core,
    //而不是一次用光所有的 core, 这样有助于一个 taskset 中的 task 比较均匀的分布在workers上
    //4. 只有在该taskset, 该locality下, 对所有worker offer都找不到合适的task时,
    //才跳到下个 locality 级别
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

TaskDescription代码:

private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task‘s TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
  private val buffer = new SerializableBuffer(_serializedTask)

  def serializedTask: ByteBuffer = buffer.value

  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}

DriverActor的launchTasks函数

launchTasks函数流程:

  1. launchTasks函数将resourceOffers函数返回的TaskDescription信息进行序列化
  2. 向executorActor发送封装了serializedTask的LaunchTask消息

由于受到Akka Frame Size尺寸的限制,如果发送数据过大,会被截断。

    // Launch tasks returned by a set of resource offers
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSet.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

参考资料

Spark大数据处理,高彦杰著,机械工业出版社

Spark技术内幕: Task向Executor提交的源码解析

Spark源码系列(三)作业运行过程

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-12 11:11:07

【Spark Core】TaskScheduler源码与任务提交原理浅析2的相关文章

【Spark Core】TaskScheduler源码与任务提交原理浅析1

引言 上一节<Stage生成和Stage源码浅析>中,我介绍了Stage生成划分到提交Stage的过程,分析最终归结到submitStage的递归提交Stage,其中要通过submitMissingTasks函数创建task集合来实现任务的创建和分发. 在接下来的几篇文章中,我将具体介绍一下任务创建和分发的过程,为了让逻辑更加清楚,我将分成几篇文章进行介绍,好保证简明清晰,逻辑连贯,前后统一. TaskScheduler介绍 TaskScheduler的主要任务是提交taskset到集群运算并

【Spark Core】TaskScheduler源代码与任务提交原理浅析2

引言 上一节<TaskScheduler源代码与任务提交原理浅析1>介绍了TaskScheduler的创建过程,在这一节中,我将承接<Stage生成和Stage源代码浅析>中的submitMissingTasks函数继续介绍task的创建和分发工作. DAGScheduler中的submitMissingTasks函数 假设一个Stage的全部的parent stage都已经计算完毕或者存在于cache中.那么他会调用submitMissingTasks来提交该Stage所包括的T

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Spark on K8S源码解析.md

Spark on K8S源码解析 sparkk8s time: 2019-12-19 Spark on k8s源码解析 1. Spark Submit spark-submit.sh spark-class.sh SparkSubmit 第一步,初始化spark应用配置 第二步,执行spark应用 Spark on k8s源码解析 本文基于spark-3.0.0 preview源码,来分析spark作业基于K8S的提交过程. spark on k8s的代码位置位于: 关于kubernetes目录

【Spark】DAGScheduler源码浅析2

引入 上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的Spark源码分析 – DAGScheduler一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数. 事件处理 在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理.在Spark 1.0源码

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark SQL Catalyst源码分析之Optimizer

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以

Spark SQL Catalyst源码分析之Physical Plan

前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized L