工作原理图
源码分析:
1、submitTasks
在submitTasks方法中最后调用backend.reviveOffers()进行下一步的task调度分配
1 | override def submitTasks(taskSet: TaskSet) { |
2 | val tasks = taskSet.tasks |
3 | logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") |
4 | this.synchronized { |
5 | // 为taskSet创建TaskSetManager |
6 | // TaskSetManager用于对TaskSet的执行状况进行管理和监控 |
7 | val manager = createTaskSetManager(taskSet, maxTaskFailures) |
8 | // 将manager加入activeTaskSets缓存中 |
9 | activeTaskSets(taskSet.id) = manager |
10 | // 将manager加入schedulableBuilder中 |
11 | schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) |
12 | |
13 | if (!isLocal && !hasReceivedTask) { |
14 | starvationTimer.scheduleAtFixedRate(new TimerTask() { |
15 | override def run() { |
16 | if (!hasLaunchedTask) { |
17 | logWarning("Initial job has not accepted any resources; " + |
18 | "check your cluster UI to ensure that workers are registered " + |
19 | "and have sufficient resources") |
20 | } else { |
21 | this.cancel() |
22 | } |
23 | } |
24 | }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) |
25 | } |
26 | hasReceivedTask = true |
27 | } |
28 | backend.reviveOffers() |
29 | } |
2、makeOffers
调用过程:收到reviveOffers消息后调用makeOffers方法。
所属包:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
1 | def makeOffers() { |
2 | // resourceOffers方法用于实现任务分配算法,将各个task分配到executor上 |
3 | // launchTasks方法用于将所分配的task发送到对应的executor中执行 |
4 | // WorkerOffer封装了Application可用的资源 |
5 | launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => |
6 | new WorkerOffer(id, executorData.executorHost, executorData.freeCores) |
7 | }.toSeq)) |
8 | } |
3、resourceOffers
1 | def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { |
2 | // Mark each slave as alive and remember its hostname |
3 | // Also track if new executor is added |
4 | var newExecAvail = false |
5 | // 遍历Application可用的资源FreeCores获取节点主机信息 |
6 | for (o <- offers) { |
7 | executorIdToHost(o.executorId) = o.host |
8 | activeExecutorIds += o.executorId |
9 | if (!executorsByHost.contains(o.host)) { |
10 | executorsByHost(o.host) = new HashSet[String]() |
11 | executorAdded(o.executorId, o.host) |
12 | newExecAvail = true |
13 | } |
14 | for (rack <- getRackForHost(o.host)) { |
15 | hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host |
16 | } |
17 | } |
18 | |
19 | // Randomly shuffle offers to avoid always placing tasks on the same set of workers. |
20 | val shuffledOffers = Random.shuffle(offers) |
21 | // Build a list of tasks to assign to each worker. |
22 | val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) |
23 | // executor可用的cores序列(每个executor最多可用多少个cores) |
24 | val availableCpus = shuffledOffers.map(o => o.cores).toArray |
25 | // rootPool中取出排好序的TaskSet,TaskScheduler初始化时,创建完TaskSchedulerImpl、 |
26 | // SparkDeploySchedulerBackend之后,会执行initialize()方法,在该方法中会创建一个调度池, |
27 | // 所有提交的TaskSet先会放入该调度池,后面执行task分配分配算法时就从该调度池中取出排好序的TaskSet |
28 | val sortedTaskSets = rootPool.getSortedTaskSetQueue |
29 | for (taskSet <- sortedTaskSets) { |
30 | logDebug("parentName: %s, name: %s, runningTasks: %s".format( |
31 | taskSet.parent.name, taskSet.name, taskSet.runningTasks)) |
32 | if (newExecAvail) { |
33 | taskSet.executorAdded() |
34 | } |
35 | } |
36 | |
37 | // Take each TaskSet in our scheduling order, and then offer it each node in increasing order |
38 | // of locality levels so that it gets a chance to launch local tasks on all of them. |
39 | // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY |
40 | |
41 | // 本地化级别 |
42 | // PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快) |
43 | // NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中 |
44 | // NO_PREF:没有本地化级别 |
45 | // RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中 |
46 | // ANY:任意本地化级别 |
47 | |
48 | // 按照从最小本地化级别到最大本地化级别的顺序,尝试把taskSet中的task在executor上启动, |
49 | // 直到task在某种本地化级别下task全部启动 |
50 | var launchedTask = false |
51 | for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { |
52 | do { |
53 | launchedTask = resourceOfferSingleTaskSet( |
54 | taskSet, maxLocality, shuffledOffers, availableCpus, tasks) |
55 | } while (launchedTask) |
56 | } |
57 | |
58 | if (tasks.size > 0) { |
59 | hasLaunchedTask = true |
60 | } |
61 | return tasks |
62 | } |
4、resourceOfferSingleTaskSet
1 | private def resourceOfferSingleTaskSet( |
2 | taskSet: TaskSetManager, |
3 | maxLocality: TaskLocality, |
4 | shuffledOffers: Seq[WorkerOffer], |
5 | availableCpus: Array[Int], |
6 | tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { |
7 | var launchedTask = false |
8 | for (i <- 0 until shuffledOffers.size) { |
9 | val execId = shuffledOffers(i).executorId |
10 | val host = shuffledOffers(i).host |
11 | // 当前executor的可用cpu数量至少大于每个task要使用的cpu数量(默认是1) |
12 | if (availableCpus(i) >= CPUS_PER_TASK) { |
13 | try { |
14 | // 查找在executor上用那种本地化级别启动taskSet中的task |
15 | for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { |
16 | // 给指定的executor加上要启动task |
17 | tasks(i) += task |
18 | // 更新分配信息 |
19 | val tid = task.taskId |
20 | taskIdToTaskSetId(tid) = taskSet.taskSet.id |
21 | taskIdToExecutorId(tid) = execId |
22 | executorsByHost(host) += execId |
23 | availableCpus(i) -= CPUS_PER_TASK |
24 | assert(availableCpus(i) >= 0) |
25 | launchedTask = true |
26 | } |
27 | } catch { |
28 | case e: TaskNotSerializableException => |
29 | logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") |
30 | // Do not offer resources for this task, but don‘t throw an error to allow other |
31 | // task sets to be submitted. |
32 | return launchedTask |
33 | } |
34 | } |
35 | } |
36 | return launchedTask |
37 | } |
5、launchTasks
1 | def launchTasks(tasks: Seq[Seq[TaskDescription]]) { |
2 | for (task <- tasks.flatten) { |
3 | // 将每个executor要执行的task信息进行序列化 |
4 | val ser = SparkEnv.get.closureSerializer.newInstance() |
5 | val serializedTask = ser.serialize(task) |
6 | if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { |
7 | val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) |
8 | scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => |
9 | try { |
10 | var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + |
11 | "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + |
12 | "spark.akka.frameSize or using broadcast variables for large values." |
13 | msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, |
14 | AkkaUtils.reservedSizeBytes) |
15 | taskSet.abort(msg) |
16 | } catch { |
17 | case e: Exception => logError("Exception in error callback", e) |
18 | } |
19 | } |
20 | } |
21 | else { |
22 | val executorData = executorDataMap(task.executorId) |
23 | // 在对应的executor的资源中减去要使用的cpu资源 |
24 | executorData.freeCores -= scheduler.CPUS_PER_TASK |
25 | // 向executor发送launchTask消息来启动task |
26 | executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) |
27 | } |
28 | } |
29 | } |
说明:
1、resourceOffer方法功能:判断executor本地化级别的等待时间是否在一定范围内,如果在就认为task使用本地化级别可以在executor上启动。
2、TaskSetManager功能:对一个单独的TaskSet的任务进行调度,该类负责追踪每个task,如果task失败会负责重试,知道超过重试次数的限制,且会通过延迟调度为该TaskSet处理本地化调度机制,它主要接口是resourceOffer,在这个接口中,TaskSet会希望在一个节点上运行一个任务,并接受任务的状态改变消息,来知道它负责的task的状态改变了。
3、本地化级别种类:
PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快)
NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中
NO_PREF:没有本地化级别
RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中
ANY:任意本地化级别