TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解

  1. TaskSchedulerBackend与SchedulerBackend
  2. FIFO与FAIR两种调度模式
  3. Task数据本地性资源的分配

一、TaskScheduler运行过程(Spark-shell角度)

1.启动Spark-shell

当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend。这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master,并从集群中获得ExecutorBackend的计算资源;(这就是为什么启动时日志没有DriverEndpoint信息的原因,因为此时应用程序内部还未发生具体计算资源的调度)

2.TaskScheduler运行时机

DAGScheduler划分好Stage后,会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有的任务TaskSet,TaskSetManager会根据locality aware来为Task奉陪计算资源,监控Task的执行状态。(例如重试、慢任务以及进行推测式执行等)

二、TaskScheduler与SchedulerBackend

1.底层调度的总流程

(1)TaskScheduler提交Tasks

TaskScheduler.submitTasks方法主要作用是将TaskSet加入到TaskSetManager中进行管理。

//TaskScheduler里面只是定义了submitTasks方法,具体实现是在TaskSchedulerImpl
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
    //创建TaskSetManager,并设置最大失败重试次数
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
    //记录Stage中提交的TaskSetManager
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
    //如果重复提交同一个TaskSet或者Tasks不在当前的TaskSet中则会报错
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
    //添加TaskManager到调度队列中,schedulableBuilder是应用程序级别的调度器
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)<span style="white-space:pre">   </span>//1
    //为慢任务启动备份任务
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }<pre name="code" class="plain"> // default scheduler is FIFO
?     private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")  (TaskSchedulerImpl)
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true }//调用SparkDeploySchedulerBackend分配具体计算资源 backend.reviveOffers() //2 }

(2)添加TaskSetManager

SchedulerBuilder.addTaskSetManger(根据SchedulerMode的不同,FIFO与FAIR实现不同)方法会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在那个ExecutorBackend中。

默认的调度顺序为FIFO;Spark应用程序目前支持两种调度模式FIFO和FAIR可以通过Spark-env.sh中的Spark.Scheduler.mode来进行具体的设置

def initialize(backend: SchedulerBackend) {
  this.backend = backend
  // temporarily set rootPool name to empty
  rootPool = new Pool("", schedulingMode, 0, 0)
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
    }
  }

并且默认情况下是FIFO的方式:

// default scheduler is FIFO
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

schedulableBuilder是一个接口,里面定义了addTaskSetManager方法。

private[spark] trait SchedulableBuilder {
  def rootPool: Pool

  def buildPools()

  def addTaskSetManager(manager: Schedulable, properties: Properties)
}

schedulableBuilder确定了TaskSetManager调度顺序。

知道了schedulableBuilder是咋回事之后,那么真正的调用就开始啦!

然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中;

CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。backend.reviveOffers()

而scheduleBackend只是定义了reviveOffers方法。def reviveOffers(): Unit

reviveOffers方法的具体实现是在:在CoarseGrainedSchedulerBackend实现,给DriverEndpoint发送ReviveOffers消息。

override def reviveOffers() {
  driverEndpoint.send(ReviveOffers)
}

ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器。

// Internal messages in driver
case object ReviveOffers extends CoarseGrainedClusterMessage

此时DriverEndpoint收到ReviveOffers后,路由到makeOffers中。

case ReviveOffers =>
  makeOffers()

首先会准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息),因为之前的资源已经分配好了,现在只需要关系有哪些cores可以用于Task计算。

// Make fake resource offers on all executors
private def makeOffers() {
  // Filter out executors under killing
  val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
//产生集合,里面包含executor的ID,freeCores
  val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq
  launchTasks(scheduler.resourceOffers(workOffers))
}

将可用的计算资源准备好后,下面就可以为每个Task分配计算资源了

TaskSchedulerImpl.resourceOffers为每一个Task具体分配计算资源。输入是workOffers代表可用的资源,实质上是ExecutorBackend的列表。

launchTasks(scheduler.resourceOffers(workOffers))

输出值是:TaskDescription的二维数组

// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

TaskDescription源码:

被TaskSetManager.resourceOffer创建的。而TaskDescription是用来描述哪些要发送到executorbackend上计算的Task。也就是说TaskDescription此时描述的这个Task,是已经确定好了在哪个ExecutorBackend上运行。而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManager的resourceOffers方法来定的。

/**
 * Description of a task that gets passed onto executors to be executed, usually created by
 * [[TaskSetManager.resourceOffer]].
 */
private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task‘s TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?算法的实现具体如下:

具体到resourceOffers查看源码如下:

1. 通过Random.shuffle打散的是executorBackend的计算资源,防止Task集中分布到某些机器上,为了负载均衡。

// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)

2.根据每个ExecutorBackend的cores的个数声明类型为TaskDecription的ArrayBuffer数组。

// Build a list of tasks to assign to each worker.
//为每个worker创建了一个ArrayBuffer实例,
//每个executor上能放多少个TaskDescription就可以运行多少个Task。
//tasks的数组长度是由cores的多少决定的,cores也决定了worker上可以运行多少//个任务。
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// getSortedTaskSetQueue对TaskSetManager按照调度策略进行排序,将排序好的结//果赋值给sortedTaskSets
val sortedTaskSets = rootPool.getSortedTaskSetQueue

3.如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdd来获取最新的完整的可用计算的计算资源,因为在执行中集群中的资源可能会动态的改变的。

for (taskSet <- sortedTaskSets) {
  logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  if (newExecAvail) { //如果有可用的新的executor
    taskSet.executorAdded()
  }

4.下面的增强for循环执行是这样的,每取出一个taskSet,maxLocality就会依次从PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY遍历。从优先级高到低来遍历。追求最高级别的优先级本地性。maxLocality会传入resourceOfferSingleTaskSet.

 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  // of locality levels so that it gets a chance to launch local tasks on all of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  var launchedTask = false
  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }

  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}

下面具体看一下resourceOfferSingleTaskSet源码

5. 此时的maxLocality就传入到了resourceOffer,通过调用TastSetManager的resourceOffer来确定Task应该运行在哪个ExecutorBackend的具体的Locality Level;

for (i <- 0 until shuffledOffers.size) {//循环遍历当前存在的executor
  val execId = shuffledOffers(i).executorId //获取executor的ID
  val host = shuffledOffers(i).host //executor的host名字
  if (availableCpus(i) >= CPUS_PER_TASK) {  //每台机器可用的计算资源
    try {
      for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
        tasks(i) += task
        val tid = task.taskId
        taskIdToTaskSetManager(tid) = taskSet
        taskIdToExecutorId(tid) = execId
        executorIdToTaskCount(execId) += 1
        executorsByHost(host) += execId
        availableCpus(i) -= CPUS_PER_TASK
        assert(availableCpus(i) >= 0)
        launchedTask = true
      }

6.确定好Task具体在哪个ExecutorBackend执行之后,通过luanchTasks把任务发送给ExecutorBackend去执行。

launchTasks(scheduler.resourceOffers(workOffers))

补讲:

1.Task默认的最大重试次数是4次:

def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))

2.Spark应用程序目前支持两种调度器:FIFO、FAIR,可以通过spark-env.sh中spark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式:

private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
val schedulingMode: SchedulingMode = try {
  SchedulingMode.withName(schedulingModeConf.toUpperCase)

3.TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中;

4.数据本地优先级从高到底以此为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY,其中NO_PREF是指机器本地性

5.每个Task默认分配的core数为1

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

6.TaskSet类详解TaskSet包含了一系列高层调度器交给底层调度器的任务的集合。

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],//任意类型的Task
    val stageId: Int,   //Task属于哪个Stage
    val stageAttemptId: Int, //尝试的Id
    val priority: Int,  //优先级
    val properties: Properties) {
  val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}

调度的时候,底层是有一个pool调度池,这个调度池会规定Stage提交之后具体执行的优先级。

TaskSetManager(TaskSet的管理者)

实例化的时候要完成TaskSchedulerImpl工作的。

private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet, //接收提交的任务的集合
    val maxTaskFailures: Int,//最大失败提交次数
    clock: Clock = new SystemClock())
  extends Schedulable with Logging {

  val conf = sched.sc.conf

7.DAGScheduler是从数据层面考虑preferedLocation的,确定数据在哪,而TaskScheduler是从具体计算Task角度考虑计算的本地性,在哪计算,优先考虑在内存中。

8.Task进行广播时候的AKKAFrameSize大小为128MB,如果任务大于128MB-200K的时候,则Task会直接被丢弃掉。

/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
  val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)

如果小于128 MB-200K的话会通过CoarseGrainedSchedulerBackend去luanch到具体的ExecutorBackend上。executorEndpoint就会把当前的Task发送到要运行的executorBackend上。通过LaunchTask实现。

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
时间: 2024-10-05 18:25:50

TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解的相关文章

第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详

</pre></h2><div><p>本节课内容:</p><p>1.     TaskSchedulerBackend与SchedulerBackend</p><p>2.     FIFO与FAIR两种调度模式</p><p>3.     Task数据本地性资源的分配</p></div><h3>一.Scheduler运行过程(Spark-shell角度)

第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

Spark 是分布式计算框架,多台机器之间必然存在着通信.Spark在早期版本采用Akka实现.现在在Akka的上层抽象出了一个RpcEnv.RpcEnv负责管理机器之间的通信. RpcEnv包含了如下三大核心: RpcEndpoint 消息循环体,负责接收并处理消息.Spark中的Master.Worker都是RpcEndpoint . RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必须获取它的RpcEndpointRef,通过RpcEn

深入理解spark-两种调度模式FIFO,FAIR模式

前面我们应知道了一个任务提交会由DAG拆分为job,stage,task,最后提交给TaskScheduler,在提交taskscheduler中会根据master初始化taskscheduler和schedulerbackend两个类,并且初始化一个调度池: 1.调度池比较 根据mode初始化调度池pool def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool

Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等(DT大数据梦工厂)

内容: 1.Spark 1.6 RPC解析: 2.RPCEnv源码解析: 3.RPCEndpoint等源码解析: 以前和现在的RPC都是采用Akka,以前和现在的不同就在于RPCEnv,现在就是基于RPCEnv去做RPC通信的 ==========Spark 1.6 RPC解析============ 1.Spark 1.6推出了以RPCEnv.RPCEndpoint.RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka: 2.Akka是基于Sc

【Spark Core】TaskScheduler源码与任务提交原理浅析2

引言 上一节<TaskScheduler源码与任务提交原理浅析1>介绍了TaskScheduler的创建过程,在这一节中,我将承接<Stage生成和Stage源码浅析>中的submitMissingTasks函数继续介绍task的创建和分发工作. DAGScheduler中的submitMissingTasks函数 如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tas

【Spark Core】TaskScheduler源代码与任务提交原理浅析2

引言 上一节<TaskScheduler源代码与任务提交原理浅析1>介绍了TaskScheduler的创建过程,在这一节中,我将承接<Stage生成和Stage源代码浅析>中的submitMissingTasks函数继续介绍task的创建和分发工作. DAGScheduler中的submitMissingTasks函数 假设一个Stage的全部的parent stage都已经计算完毕或者存在于cache中.那么他会调用submitMissingTasks来提交该Stage所包括的T

机器学习Spark Mllib算法源码及实战详解进阶与提高视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

Spark 性能相关参数配置详解-任务调度篇

随着Spark的逐渐成熟完善, 越来越多的可配置参数被添加到Spark中来, 本文试图通过阐述这其中部分参数的工作原理和配置思路, 和大家一起探讨一下如何根据实际场合对Spark进行配置优化. 由于篇幅较长,所以在这里分篇组织,如果要看最新完整的网页版内容,可以戳这里:http://spark-config.readthedocs.org/,主要是便于更新内容 schedule调度相关 调度相关的参数设置,大多数内容都很直白,其实无须过多的额外解释,不过基于这些参数的常用性(大概会是你针对自己的

第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本課主題 Master 资源调度的源码鉴赏 Spark 的 Worker 是基于什么逻辑去启动 Executor 资源调度管理 任務調度與資源是通過 DAGScheduler.TaskScheduler.SchedulerBackend 等進行的作業調度 資源調度是指應用程序如何獲得資源 任務調度是在資源調度的基礎上進行的,沒有資源調度那麼任務調度就成為了無源之水無本之木 Master 资源调度的源码鉴赏 因為 Master 負責資源管理和調度,所以資源調度方法 scheduer 位於 Mast