Spark系列(十)TaskSchedule工作原理

工作原理图

源码分析:

1、submitTasks

在submitTasks方法中最后调用backend.reviveOffers()进行下一步的task调度分配

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      // 为taskSet创建TaskSetManager
      // TaskSetManager用于对TaskSet的执行状况进行管理和监控
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      // 将manager加入activeTaskSets缓存中
      activeTaskSets(taskSet.id) = manager
10        // 将manager加入schedulableBuilder中
11        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
12        
13        if (!isLocal && !hasReceivedTask) {
14          starvationTimer.scheduleAtFixedRate(new TimerTask() {
15            override def run() {
16              if (!hasLaunchedTask) {
17                logWarning("Initial job has not accepted any resources; " +
18                  "check your cluster UI to ensure that workers are registered " +
19                  "and have sufficient resources")
20              } else {
21                this.cancel()
22              }
23            }
24          }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
25        }
26        hasReceivedTask = true
27      }
28      backend.reviveOffers()
29    }

 

2、makeOffers

调用过程:收到reviveOffers消息后调用makeOffers方法。

所属包:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

def makeOffers() {
    // resourceOffers方法用于实现任务分配算法,将各个task分配到executor上
    // launchTasks方法用于将所分配的task发送到对应的executor中执行
    // WorkerOffer封装了Application可用的资源
    launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
      new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toSeq))
}

 

3、resourceOffers

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    // 遍历Application可用的资源FreeCores获取节点主机信息
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
10          executorsByHost(o.host) = new HashSet[String]()
11          executorAdded(o.executorId, o.host)
12          newExecAvail = true
13        }
14        for (rack <- getRackForHost(o.host)) {
15          hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
16        }
17      }
18   
19      // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
20      val shuffledOffers = Random.shuffle(offers)
21      // Build a list of tasks to assign to each worker.
22      val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
23      // executor可用的cores序列(每个executor最多可用多少个cores)
24      val availableCpus = shuffledOffers.map(o => o.cores).toArray
25      // rootPool中取出排好序的TaskSet,TaskScheduler初始化时,创建完TaskSchedulerImpl、
26      // SparkDeploySchedulerBackend之后,会执行initialize()方法,在该方法中会创建一个调度池,
27      // 所有提交的TaskSet先会放入该调度池,后面执行task分配分配算法时就从该调度池中取出排好序的TaskSet
28      val sortedTaskSets = rootPool.getSortedTaskSetQueue
29      for (taskSet <- sortedTaskSets) {
30        logDebug("parentName: %s, name: %s, runningTasks: %s".format(
31          taskSet.parent.name, taskSet.name, taskSet.runningTasks))
32        if (newExecAvail) {
33          taskSet.executorAdded()
34        }
35      }
36   
37      // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
38      // of locality levels so that it gets a chance to launch local tasks on all of them.
39      // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
40      
41      // 本地化级别
42      // PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快)
43      // NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中
44      // NO_PREF:没有本地化级别
45      // RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中
46      // ANY:任意本地化级别
47      
48      // 按照从最小本地化级别到最大本地化级别的顺序,尝试把taskSet中的task在executor上启动,
49      // 直到task在某种本地化级别下task全部启动
50      var launchedTask = false
51      for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
52        do {
53          launchedTask = resourceOfferSingleTaskSet(
54              taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
55        } while (launchedTask)
56      }
57   
58      if (tasks.size > 0) {
59        hasLaunchedTask = true
60      }
61      return tasks
62    }

 

4、resourceOfferSingleTaskSet

private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
10        val host = shuffledOffers(i).host
11        // 当前executor的可用cpu数量至少大于每个task要使用的cpu数量(默认是1)
12        if (availableCpus(i) >= CPUS_PER_TASK) {
13          try {
14            // 查找在executor上用那种本地化级别启动taskSet中的task
15            for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
16              // 给指定的executor加上要启动task
17              tasks(i) += task
18              // 更新分配信息
19              val tid = task.taskId
20              taskIdToTaskSetId(tid) = taskSet.taskSet.id
21              taskIdToExecutorId(tid) = execId
22              executorsByHost(host) += execId
23              availableCpus(i) -= CPUS_PER_TASK
24              assert(availableCpus(i) >= 0)
25              launchedTask = true
26            }
27          } catch {
28            case e: TaskNotSerializableException =>
29              logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
30              // Do not offer resources for this task, but don‘t throw an error to allow other
31              // task sets to be submitted.
32              return launchedTask
33          }
34        }
35      }
36      return launchedTask
37    }

 

5、launchTasks

def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        // 将每个executor要执行的task信息进行序列化
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
            try {
10                var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
11                  "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
12                  "spark.akka.frameSize or using broadcast variables for large values."
13                msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
14                  AkkaUtils.reservedSizeBytes)
15                taskSet.abort(msg)
16              } catch {
17                case e: Exception => logError("Exception in error callback", e)
18              }
19            }
20          }
21          else {
22            val executorData = executorDataMap(task.executorId)
23            // 在对应的executor的资源中减去要使用的cpu资源
24            executorData.freeCores -= scheduler.CPUS_PER_TASK
25            // 向executor发送launchTask消息来启动task
26            executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
27          }
28        }
29      }

 

说明:

1、resourceOffer方法功能:判断executor本地化级别的等待时间是否在一定范围内,如果在就认为task使用本地化级别可以在executor上启动。

2、TaskSetManager功能:对一个单独的TaskSet的任务进行调度,该类负责追踪每个task,如果task失败会负责重试,知道超过重试次数的限制,且会通过延迟调度为该TaskSet处理本地化调度机制,它主要接口是resourceOffer,在这个接口中,TaskSet会希望在一个节点上运行一个任务,并接受任务的状态改变消息,来知道它负责的task的状态改变了。

3、本地化级别种类:
PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快)
NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中
NO_PREF:没有本地化级别
RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中
ANY:任意本地化级别

时间: 2024-07-31 14:55:54

Spark系列(十)TaskSchedule工作原理的相关文章

Spark系列(八)Worker工作原理

工作原理图   源代码分析 包名:org.apache.spark.deploy.worker 启动driver入口点:registerWithMaster方法中的case LaunchDriver 1  case LaunchDriver(driverId, driverDesc) => { 2      logInfo(s"Asked to launch driver $driverId") 3      // 创建DriverRunner对象启动Driver 4     

Spark系列(九)DAGScheduler工作原理

以wordcount为示例进行深入分析 1  object wordcount { 2    3    def main(args: Array[String]) { 4      val conf = new SparkConf() 5      conf.setAppName("wordcount").setMaster("local") 6    7      val sc = new SparkContext(conf) 8      // 产生Hadoop

Nginx知多少系列之(四)工作原理

原文:Nginx知多少系列之(四)工作原理 目录 1.前言 2.安装 3.配置文件详解 4.工作原理 5.Linux下托管.NET Core项目 6.Linux下.NET Core项目负载均衡 7.Linux下.NET Core项目Nginx+Keepalived高可用(主从模式) 8.Linux下.NET Core项目Nginx+Keepalived高可用(双主模式) 9.Linux下.NET Core项目LVS+Keepalived+Nginx高可用集群 10.构建静态服务器 11.日志分析

line-height系列——定义和工作原理总结

一.line-height的定义和工作原理总结 line-height的属性值: normal    默认  设置合理的行间距. number  设置数字,此数字会与当前的字体尺寸相乘来设置行间距line-height:1.5; length    设置固定的行间距.  例如:line-height:18px; %          基于当前字体尺寸的百分比行间距. 例如:line-height:150%; line-height 的定义: 首先认识下文字的四条线 从上到下四条线分别是顶线.中线

Spark基本工作原理与RDD

Spark基本工作原理 1.分布式 2.主要基于内存(少数情况基于磁盘) 3.迭代式计算 RDD以及其特点 1.RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集. 2.RDD在抽象上来说是一种元素集合,包含了数据.它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作.(分布式数据集) 3.RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建:有时也可以通过应

第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手机.

【SEO系列教程】第一部分:SEO概念-搜索引擎工作原理

从今天开始,二毛为大家更新SEO系列教程,今天更新第一部分:SEO概念,主要讲述如何学习SEO这门课程以及搜索引擎工作原理. 本次课程不会一五一十的讲解SEO的全部细微概念,在讲到一些我们已经熟知的SEO概念,我会一句带过,如果大家有不理解的地方,请在下面跟帖,我会第一时间恢复大家,还望谅解. 另外说明一下,SEO系列教程在每周三更新,其余时间大家可以在下面自由交流,希望这套SEO教程能够对大家的SEO学习有所帮助,请大家随时关注二毛博客最新动态,谢谢. 好了,下面我们进入正题: 今天的知识点:

JAVA之旅(二十五)——文件复制,字符流的缓冲区,BufferedWriter,BufferedReader,通过缓冲区复制文件,readLine工作原理,自定义readLine

JAVA之旅(二十五)--文件复制,字符流的缓冲区,BufferedWriter,BufferedReader,通过缓冲区复制文件,readLine工作原理,自定义readLine 我们继续IO上个篇幅讲 一.文本复制 读写都说了,我们来看下其他的操作,我们首先来看复制 复制原理:其实就是将C盘下的文件数据存储到D盘的一个文件中 实现的步骤: 1.在D盘创建一个文件,用于存储文件中的数据 2.定义读取流和文件关联 3.通过不断的读写完成数据的存储 关闭资源 package com.lgl.hel

第82讲:Spark Streaming第一讲:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 3.案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手