Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法

上篇文章《

Spark 源码解析 : DAGScheduler中的DAG划分与提交

》介绍了DAGScheduler的Stage划分算法。

本文继续分析Stage被封装成TaskSet,并将TaskSet提交到集群的Executor执行的过程

在DAGScheduler的submitStage方法中,将Stage划分完成,生成拓扑结构,当一个stage没有父stage时候,会调用DAGScheduler的submitMissingTasks方法来提交该stage包含tasks。

首先来分析一下DAGScheduler的submitMissingTasks方法


1.获取Task的最佳计算位置:

  1. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  2. stage match {
  3. case s: ShuffleMapStage =>
  4. partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
  5. case s: ResultStage =>
  6. val job = s.activeJob.get
  7. partitionsToCompute.map { id =>
  8. val p = s.partitions(id)
  9. (id, getPreferredLocs(stage.rdd, p))
  10. }.toMap
  11. }
  12. }

核心是其中的getPreferredLocs方法,根据RDD的数据信息得到task的最佳计算位置,从而获取较好的数据本地性。其中的细节这里先跳过,在以后的文章在做分析

2.序列化Task的Binary,并进行广播。Executor端在执行task时会向反序列化Task。


3.根据stage的不同类型创建,为stage的每个分区创建创建task,并封装成TaskSet。Stage分两种类型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。

  1. val tasks: Seq[Task[_]] = try {
  2. stage match {
  3. case stage: ShuffleMapStage =>
  4. partitionsToCompute.map { id =>
  5. val locs = taskIdToLocations(id)
  6. val part = stage.rdd.partitions(id)
  7. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  8. taskBinary, part, locs, stage.internalAccumulators)
  9. }
  10. case stage: ResultStage =>
  11. val job = stage.activeJob.get
  12. partitionsToCompute.map { id =>
  13. val p: Int = stage.partitions(id)
  14. val part = stage.rdd.partitions(p)
  15. val locs = taskIdToLocations(id)
  16. new ResultTask(stage.id, stage.latestInfo.attemptId,
  17. taskBinary, part, locs, id, stage.internalAccumulators)
  18. }
  19. }

4.调用TaskScheduler的submitTasks,提交TaskSet

  1. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  2. stage.pendingPartitions ++= tasks.map(_.partitionId)
  3. logDebug("New pending partitions: " + stage.pendingPartitions)
  4. taskScheduler.submitTasks(new TaskSet(
  5. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  6. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

submitTasks方法的实现在TaskScheduler的实现类TaskSchedulerImpl中。

4.1 TaskSchedulerImpl的submitTasks方法首先创建TaskSetManager。

  1. val manager = createTaskSetManager(taskSet, maxTaskFailures)
  2. val stage = taskSet.stageId
  3. val stageTaskSets =
  4. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  5. stageTaskSets(taskSet.stageAttemptId) = manager

TaskSetManager负责管理TaskSchedulerImpl中一个单独TaskSet,跟踪每一个task,如果task失败,负责重试task直到达到task重试次数的最多次数。并且通过延迟调度来执行task的位置感知调度。

  1. private[spark] class TaskSetManager(
  2. sched: TaskSchedulerImpl,//绑定的TaskSchedulerImpl
  3. val taskSet: TaskSet,
  4. val maxTaskFailures: Int, //失败最大重试次数
  5. clock: Clock = new SystemClock())
  6. extends Schedulable with Logging

4.2 将TaskSetManger加入schedulableBuilder

  1. schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //将TaskSetManager加入rootPool调度池中,由schedulableBuilder决定调度顺序

schedulableBuilder的类型是 SchedulerBuilder,SchedulerBuilder是一个trait,有两个实现FIFOSchedulerBuilder和 FairSchedulerBuilder,并且默认采用的是FIFO方式

  1. // default scheduler is FIFO
  2. private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

而schedulableBuilder的创建是在SparkContext创建SchedulerBackend和TaskScheduler后调用TaskSchedulerImpl的初始化方法进行创建的。

  1. def initialize(backend: SchedulerBackend) {
  2. this.backend = backend
  3. // temporarily set rootPool name to empty
  4. rootPool = new Pool("", schedulingMode, 0, 0)
  5. schedulableBuilder = {
  6. schedulingMode match {
  7. case SchedulingMode.FIFO =>
  8. new FIFOSchedulableBuilder(rootPool)
  9. case SchedulingMode.FAIR =>
  10. new FairSchedulableBuilder(rootPool, conf)
  11. }
  12. }
  13. schedulableBuilder.buildPools()
  14. }

schedulableBuilder是TaskScheduler中一个重要成员,他根据调度策略决定了TaskSetManager的调度顺序。

4.3 接下来调用SchedulerBackend的riviveOffers方法对Task进行调度,决定task具体运行在哪个Executor中。

调用CoarseGrainedSchedulerBackend的riviveOffers方法,该方法给driverEndpoint发送ReviveOffer消息

  1. override def reviveOffers() {
  2. driverEndpoint.send(ReviveOffers)
  3. }

driverEndpoint收到ReviveOffer消息后调用makeOffers方法

  1. // Make fake resource offers on all executors
  2. private def makeOffers() {
  3. //过滤出活跃状态的Executor
  4. val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  5.      //将Executor封装成WorkerOffer对象
  6. val workOffers = activeExecutors.map { case (id, executorData) =>
  7. new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  8. }.toSeq
  9. launchTasks(scheduler.resourceOffers(workOffers))
  10. }

注意:上面代码中的executorDataMap,在客户的向Master注册Application的时候,Master已经为Application分配并启动好Executor,然后注册给CoarseGrainedSchedulerBackend,注册信息就是存储在executorDataMap数据结构中。

准备好计算资源后,接下来TaskSchedulerImpl基于这些计算资源为task分配Executor。

我们看一下TaskSchedulerImpl的resourceOffers方法:

  1. // 随机打乱offers
  2. val shuffledOffers = Random.shuffle(offers)
  3. // 构建一个二维数组,保存每个Executor上将要分配的那些task
  4. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  5. val availableCpus = shuffledOffers.map(o => o.cores).toArray
  1.  
  2.    //根据SchedulerBuilder的调度算法,给TaskManager排好序
    1. val sortedTaskSets = rootPool.getSortedTaskSetQueue
  3. for (taskSet <- sortedTaskSets) {
  4. logDebug("parentName: %s, name: %s, runningTasks: %s".format(
  5. taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  6. if (newExecAvail) {
  7. taskSet.executorAdded()
  8. }
  9. }
  10. // 使用双重循环,对每一个taskset 依照调度的顺序,依次按照本地性级别顺序尝试启动task
  11. // 数据本地性级别顺序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  12. var launchedTask = false
  13. for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
  14. do {
  15. launchedTask = resourceOfferSingleTaskSet(
  16. taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
  17. } while (launchedTask)
  18. }
  19. if (tasks.size > 0) {
  20. hasLaunchedTask = true
  21. }
  22. return tasks

下面看看 resourceOfferSingleTaskSet 方法:

用当前的数据本地性,调用TaskSetManager的resourceOffer方法,在当前executor上分配task
  1. private def resourceOfferSingleTaskSet(
  2. taskSet: TaskSetManager,
  3. maxLocality: TaskLocality,
  4. shuffledOffers: Seq[WorkerOffer],
  5. availableCpus: Array[Int],
  6. tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  7. var launchedTask = false
  8. for (i <- 0 until shuffledOffers.size) {
  9. val execId = shuffledOffers(i).executorId
  10. val host = shuffledOffers(i).host
  11.        //如果executor 的cup数大于 每个task的cup数目(值为1)
  12. if (availableCpus(i) >= CPUS_PER_TASK) {
  13. try {
  14.        //
  15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
  16. tasks(i) += task
  17. val tid = task.taskId
  18. taskIdToTaskSetManager(tid) = taskSet
  19. taskIdToExecutorId(tid) = execId
  20. executorIdToTaskCount(execId) += 1
  21. executorsByHost(host) += execId
  22. availableCpus(i) -= CPUS_PER_TASK
  23. assert(availableCpus(i) >= 0)
  24. launchedTask = true
  25. }
  26. }

为Task分配好资源之后,DriverEndpint调用launchTask方法将task在Executor上启动运行。task在Executor上的启动运行过程,在后面的文章中会继续分析,敬请关注。

总结一下调用过程:

TaskSchedulerImpl#submitTasks

CoarseGrainedSchedulerBackend#riviveOffers

CoarseGrainedSchedulerBackend$DriverEndpoint#makeOffers

|-TaskSchedulerImpl#resourceOffers(offers) 为offers分配task

|- TaskSchedulerImpl#resourceOfferSingleTaskSet

CoarseGrainedSchedulerBackend$DriverEndpoint#launchTask

来自为知笔记(Wiz)

时间: 2024-10-05 10:16:35

Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法的相关文章

[Spark传奇行动] 第34课:Stage划分和Task最佳位置算法源码彻底解密

本課主題 Job Stage 划分算法解密 Task 最佳位置算法實現解密 引言 作业调度的划分算法以及 Task 的最佳位置的算法,因为 Stage 的划分是DAGScheduler 工作的核心,这也是关系到整个作业有集群中该怎么运行:其次就是数据本地性,Spark 一舨的代码都是链式表达的,这就让一个任务什么时候划分成 Stage,在大数据世界要追求最大化的数据本地性,所有最大化的数据本地性就是在数据计算的时候,数据就在内存中.最后就是 Spark 的实现算法时候的略的怎么样.希望这篇文章能

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

本节课的内容 1.     Job Stage的划分算法 2.     Task最佳计算位置算法 一.Stage划分算法 由于Spark的算子构建一般都是链式的,这就涉及了要如何进行这些链式计算,Spark的策略是对这些算子,鲜花分Stage,然后在进行计算. 由于数据是分布式的存储在各个节点上的,所以为了减少网络传输的开销,就必须最大化的追求数据本地性,所谓的数据本地性是指,在计算时,数据本身已经在内存中或者利用已有缓存无需计算的方式获取数据. 1.      Stage划分算法思想 (1)一

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

第2课 Scala面向对象彻底精通及Spark源码SparkContext,RDD阅读总结

第2课:Scala面向对象彻底精通及Spark源码阅读本期内容:1 Scala中的类.object实战详解 2 Scala中的抽象类.接口实战详解 3 综合案例及Spark源码解析 一:定义类class HiScala{private var name = "Spark" def sayName(){println(name)}def getName = name} Scala中,变量与类中的方法是同等级的,可以直接赋值给方法. scala中的get与set与Java中的get,set

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Spark on K8S源码解析.md

Spark on K8S源码解析 sparkk8s time: 2019-12-19 Spark on k8s源码解析 1. Spark Submit spark-submit.sh spark-class.sh SparkSubmit 第一步,初始化spark应用配置 第二步,执行spark应用 Spark on k8s源码解析 本文基于spark-3.0.0 preview源码,来分析spark作业基于K8S的提交过程. spark on k8s的代码位置位于: 关于kubernetes目录