Spark源码分析之六:Task调度(二)

话说在《Spark源码分析之五:Task调度(一)》一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法。这个方法针对接收到的ReviveOffers事件进行处理。代码如下:

[java] view plain copy

  1. // Make fake resource offers on all executors
  2. // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)
  3. private def makeOffers() {
  4. // Filter out executors under killing
  5. // 过滤掉under killing的executors
  6. val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  7. // 利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源
  8. val workOffers = activeExecutors.map { case (id, executorData) =>
  9. // 创建WorkerOffer对象
  10. new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  11. }.toSeq
  12. // 调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks
  13. // 这个scheduler就是TaskSchedulerImpl
  14. launchTasks(scheduler.resourceOffers(workOffers))
  15. }

代码逻辑很简单,一共分为三步:

第一,从executorDataMap中过滤掉under killing的executors,得到activeExecutors;

第二,利用activeExecutors中executorData的executorHost、freeCores,获取workOffers,即资源;

第三,调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks:这个scheduler就是TaskSchedulerImpl。

我们逐个进行分析,首先看看这个executorDataMap,其定义如下:

[java] view plain copy

  1. private val executorDataMap = new HashMap[String, ExecutorData]

它是CoarseGrainedSchedulerBackend掌握的集群中executor的数据集合,key为String类型的executorId,value为ExecutorData类型的executor详细信息。ExecutorData包含的主要内容如下:

1、executorEndpoint:RpcEndpointRef类型,RPC终端的引用,用于数据通信;

2、executorAddress:RpcAddress类型,RPC地址,用于数据通信;

3、executorHost:String类型,executor的主机;

4、freeCores:Int类型,可用处理器cores;

5、totalCores:Int类型,处理器cores总数;

6、logUrlMap:Map[String, String]类型,日志url映射集合。

这样,通过executorDataMap这个集合我们就能知道集群当前executor的负载情况,方便资源分析并调度任务。那么executorDataMap内的数据是何时及如何更新的呢?go on,继续分析。
        对于第一步中,过滤掉under killing的executors,其实现是对executorDataMap中的所有executor调用executorIsAlive()方法中,判断是否在executorsPendingToRemove和executorsPendingLossReason两个数据结构中,这两个数据结构中的executors,都是即将移除或者已丢失的executor。

第二步,在过滤掉已失效或者马上要失效的executor后,利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源,这个workOffers更简单,是一个WorkerOffer对象,它代表了系统的可利用资源。WorkerOffer代码如下:

[java] view plain copy

  1. /**
  2. * Represents free resources available on an executor.
  3. */
  4. private[spark]
  5. case class WorkerOffer(executorId: String, host: String, cores: Int)

而最重要的第三步,先是调用scheduler.resourceOffers(workOffers),即TaskSchedulerImpl的resourceOffers()方法,然后再调用launchTasks()方法将tasks加载到executor上去执行。

我们先看下TaskSchedulerImpl的resourceOffers()方法。代码如下:

[java] view plain copy

  1. /**
  2. * Called by cluster manager to offer resources on slaves. We respond by asking our active task
  3. * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
  4. * that tasks are balanced across the cluster.
  5. *
  6. * 被集群manager调用以提供slaves上的资源。我们通过按照优先顺序询问活动task集中的task来回应。
  7. * 我们通过循环的方式将task调度到每个节点上以便tasks在集群中可以保持大致的均衡。
  8. */
  9. def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  10. // Mark each slave as alive and remember its hostname
  11. // Also track if new executor is added
  12. // 标记每个slave节点为alive活跃的,并且记住它的主机名
  13. // 同时也追踪是否有executor被加入
  14. var newExecAvail = false
  15. // 循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源
  16. for (o <- offers) {
  17. // 利用HashMap存储executorId->host映射的集合
  18. executorIdToHost(o.executorId) = o.host
  19. // Number of tasks running on each executor
  20. // 每个executor上运行的task的数目,这里如果之前没有的话,初始化为0
  21. executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
  22. // 每个host上executors的集合
  23. // 这个executorsByHost被用来计算host活跃性,反过来我们用它来决定在给定的主机上何时实现数据本地性
  24. if (!executorsByHost.contains(o.host)) {// 如果executorsByHost中不存在对应的host
  25. // executorsByHost中添加一条记录,key为host,value为new HashSet[String]()
  26. executorsByHost(o.host) = new HashSet[String]()
  27. // 发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理
  28. // eventProcessLoop.post(ExecutorAdded(execId, host))
  29. // 调用DAGScheduler的executorAdded()方法处理
  30. executorAdded(o.executorId, o.host)
  31. // 新的slave加入时,标志位newExecAvail设置为true
  32. newExecAvail = true
  33. }
  34. // 更新hostsByRack
  35. for (rack <- getRackForHost(o.host)) {
  36. hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
  37. }
  38. }
  39. // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
  40. // 随机shuffle offers以避免总是把任务放在同一组workers上执行
  41. val shuffledOffers = Random.shuffle(offers)
  42. // Build a list of tasks to assign to each worker.
  43. // 构造一个task列表,以分配到每个worker
  44. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  45. // 可以使用的cpu资源
  46. val availableCpus = shuffledOffers.map(o => o.cores).toArray
  47. // 获得排序好的task集合
  48. // 先调用Pool.getSortedTaskSetQueue()方法
  49. // 还记得这个Pool吗,就是调度器中的调度池啊
  50. val sortedTaskSets = rootPool.getSortedTaskSetQueue
  51. // 循环每个taskSet
  52. for (taskSet <- sortedTaskSets) {
  53. // 记录日志
  54. logDebug("parentName: %s, name: %s, runningTasks: %s".format(
  55. taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  56. // 如果存在新的活跃的executor(新的slave节点被添加时)
  57. if (newExecAvail) {
  58. // 调用executorAdded()方法
  59. taskSet.executorAdded()
  60. }
  61. }
  62. // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  63. // of locality levels so that it gets a chance to launch local tasks on all of them.
  64. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  65. var launchedTask = false
  66. // 按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性
  67. // 位置本地性规则的顺序是:PROCESS_LOCAL(同进程)、NODE_LOCAL(同节点)、NO_PREF、RACK_LOCAL(同机架)、ANY(任何)
  68. for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
  69. do {
  70. // 调用resourceOfferSingleTaskSet()方法进行任务集调度
  71. launchedTask = resourceOfferSingleTaskSet(
  72. taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
  73. } while (launchedTask)
  74. }
  75. // 设置标志位hasLaunchedTask
  76. if (tasks.size > 0) {
  77. hasLaunchedTask = true
  78. }
  79. return tasks
  80. }

首先来看下它的主体流程。如下:

1、设置标志位newExecAvail为false,这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;

2、循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源:

2.1、更新executorIdToHost,executorIdToHost为利用HashMap存储executorId->host映射的集合;

2.2、更新executorIdToTaskCount,executorIdToTaskCount为每个executor上运行的task的数目集合,这里如果之前没有的话,初始化为0;

2.3、如果新的slave加入:

2.3.1、executorsByHost中添加一条记录,key为host,value为new HashSet[String]();

2.3.2、发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理;

2.3.3、新的slave加入时,标志位newExecAvail设置为true;

2.4、更新hostsByRack;

3、随机shuffle offers(集群中可用executor资源)以避免总是把任务放在同一组workers上执行;

4、构造一个task列表,以分配到每个worker,针对每个executor按照其上的cores数目构造一个cores数目大小的ArrayBuffer,实现最大程度并行化;

5、获取可以使用的cpu资源availableCpus;

6、调用Pool.getSortedTaskSetQueue()方法获得排序好的task集合,即sortedTaskSets;

7、循环sortedTaskSets中每个taskSet:

7.1、如果存在新加入的slave,则调用taskSet的executorAdded()方法,动态调整位置策略级别,这么做很容易理解,新的slave节点加入了,那么随之而来的是数据有可能存在于它上面,那么这时我们就需要重新调整任务本地性规则;

8、循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性:

8.1、对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度;

9、设置标志位hasLaunchedTask,并返回tasks。

接下来,我们详细解释下其中的每个步骤。

第1步不用讲,只是设置标志位newExecAvail为false,并且记住这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;

第2步是集群中的可用executor资源offers的循环处理,更新一些数据结构,并且,在新的slave加入时,标志位newExecAvail设置为true,并且发送一个ExecutorAdded事件,交由DAGScheduler的handleExecutorAdded()方法处理。我们来看下DAGScheduler的这个方法:

[java] view plain copy

  1. private[scheduler] def handleExecutorAdded(execId: String, host: String) {
  2. // remove from failedEpoch(execId) ?
  3. if (failedEpoch.contains(execId)) {
  4. logInfo("Host added was in lost list earlier: " + host)
  5. failedEpoch -= execId
  6. }
  7. submitWaitingStages()
  8. }

很简单,先将对应host从failedEpoch中移除,failedEpoch存储的是系统探测到的失效节点的集合,存储的是execId->host的对应关系。接下来便是调用submitWaitingStages()方法提交等待的stages。这个方法我们之前分析过,这里不再赘述。但是存在一个疑点,之前stage都已提交了,这里为什么还要提交一遍呢?留待以后再寻找答案吧。

第3步随机shuffle offers以避免总是把任务放在同一组workers上执行,这也没什么特别好讲的,为了避免所谓的热点问题而采取的一种随机策略而已。

第4步也是,构造一个task列表,以分配到每个worker,针对每个executor,创建一个ArrayBuffer,存储的类型为TaskDescription,大小为executor的cores,即最大程度并行化,充分利用executor的cores。

第5步就是获取到上述executor集合中cores集合availableCpus,即可以使用的cpu资源;

下面我们重点分析下第6步,它是调用Pool.getSortedTaskSetQueue()方法,获得排序好的task集合。还记得这个Pool吗?它就是上篇文章《Spark源码分析之五:Task调度(一)》里讲到的调度器的中的调度池啊,我们看下它的getSortedTaskSetQueue()方法。代码如下:

[java] view plain copy

  1. override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
  2. // 创建一个ArrayBuffer,存储TaskSetManager
  3. var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
  4. // schedulableQueue为Pool中的一个调度队列,里面存储的是TaskSetManager
  5. // 在TaskScheduler的submitTasks()方法中,通过层层调用,最终通过Pool的addSchedulable()方法将之前生成的TaskSetManager加入到schedulableQueue中
  6. // 而TaskSetManager包含具体的tasks
  7. // taskSetSchedulingAlgorithm为调度算法,包括FIFO和FAIR两种
  8. // 这里针对调度队列,<span style="font-family: Arial, Helvetica, sans-serif;">按照调度算法对其排序,</span>生成一个序列sortedSchedulableQueue,
  9. val sortedSchedulableQueue =
  10. schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
  11. // 循环sortedSchedulableQueue中所有的TaskSetManager,通过其getSortedTaskSetQueue来填充sortedTaskSetQueue
  12. for (schedulable <- sortedSchedulableQueue) {
  13. sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
  14. }
  15. // 返回sortedTaskSetQueue
  16. sortedTaskSetQueue
  17. }

首先,创建一个ArrayBuffer,用来存储TaskSetManager,然后,对Pool中已经存储好的TaskSetManager,即schedulableQueue队列,按照taskSetSchedulingAlgorithm调度规则或算法来排序,得到sortedSchedulableQueue,并循环其内的TaskSetManager,通过其getSortedTaskSetQueue()方法来填充sortedTaskSetQueue,最后返回。TaskSetManager的getSortedTaskSetQueue()方法也很简单,追加ArrayBuffer[TaskSetManager]即可,如下:

[java] view plain copy

  1. override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
  2. var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
  3. sortedTaskSetQueue += this
  4. sortedTaskSetQueue
  5. }

我们着重来讲解下这个调度准则或算法taskSetSchedulingAlgorithm,其定义如下:

[java] view plain copy

  1. // 调度准则,包括FAIR和FIFO两种
  2. var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
  3. schedulingMode match {
  4. case SchedulingMode.FAIR =>
  5. new FairSchedulingAlgorithm()
  6. case SchedulingMode.FIFO =>
  7. new FIFOSchedulingAlgorithm()
  8. }
  9. }

它包括两种,FAIR和FIFO,下面我们以FIFO为例来讲解。代码在SchedulingAlgorithm.scala中,如下:

[java] view plain copy

  1. private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  2. // 比较函数
  3. override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
  4. val priority1 = s1.priority
  5. val priority2 = s2.priority
  6. // 先比较priority,即优先级
  7. // priority相同的话,再比较stageId
  8. // 前者小于后者的话,返回true,否则为false
  9. var res = math.signum(priority1 - priority2)
  10. if (res == 0) {
  11. val stageId1 = s1.stageId
  12. val stageId2 = s2.stageId
  13. res = math.signum(stageId1 - stageId2)
  14. }
  15. if (res < 0) {
  16. true
  17. } else {
  18. false
  19. }
  20. }
  21. }

很简单,就是先比较两个TaskSetManagerder的优先级priority,优先级相同再比较stageId。而这个priority在TaskSet生成时,就是jobId,也就是FIFO是先按照Job的顺序再按照Stage的顺序进行顺序调度,一个Job完了再调度另一个Job,Job内是按照Stage的顺序进行调度。关于priority生成的代码如下所示:

[java] view plain copy

  1. // 利用taskScheduler.submitTasks()提交task
  2. // jobId即为TaskSet的priority
  3. taskScheduler.submitTasks(new TaskSet(
  4. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

比较复杂的是FairSchedulingAlgorithm,代码如下:

[java] view plain copy

  1. private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  2. override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
  3. val minShare1 = s1.minShare
  4. val minShare2 = s2.minShare
  5. val runningTasks1 = s1.runningTasks
  6. val runningTasks2 = s2.runningTasks
  7. val s1Needy = runningTasks1 < minShare1
  8. val s2Needy = runningTasks2 < minShare2
  9. val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
  10. val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
  11. val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
  12. val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
  13. var compare: Int = 0
  14. // 前者的runningTasks<minShare而后者相反的的话,返回true;
  15. // runningTasks为正在运行的tasks数目,minShare为最小共享cores数;
  16. // 前面两个if判断的意思是两个TaskSetManager中,如果其中一个正在运行的tasks数目小于最小共享cores数,则优先调度该TaskSetManager
  17. if (s1Needy && !s2Needy) {
  18. return true
  19. } else if (!s1Needy && s2Needy) {// 前者的runningTasks>=minShare而后者相反的的话,返回true
  20. return false
  21. } else if (s1Needy && s2Needy) {
  22. // 如果两者的正在运行的tasks数目都比最小共享cores数小的话,再比较minShareRatio
  23. // minShareRatio为正在运行的tasks数目与最小共享cores数的比率
  24. compare = minShareRatio1.compareTo(minShareRatio2)
  25. } else {
  26. // 最后比较taskToWeightRatio,即权重使用率,weight代表调度池对资源获取的权重,越大需要越多的资源
  27. compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
  28. }
  29. if (compare < 0) {
  30. true
  31. } else if (compare > 0) {
  32. false
  33. } else {
  34. s1.name < s2.name
  35. }
  36. }
  37. }

它的调度逻辑主要如下:

1、优先看正在运行的tasks数目是否小于最小共享cores数,如果两者只有一个小于,则优先调度小于的那个,原因是既然正在运行的Tasks数目小于共享cores数,说明该节点资源比较充足,应该优先利用;

2、如果不是只有一个的正在运行的tasks数目是否小于最小共享cores数的话,则再判断正在运行的tasks数目与最小共享cores数的比率;

3、最后再比较权重使用率,即正在运行的tasks数目与该TaskSetManager的权重weight的比,weight代表调度池对资源获取的权重,越大需要越多的资源。

到此为止,获得了排序好的task集合,我们来到了第7步:如果存在新加入的slave,则调用taskSet的executorAdded()方法,即TaskSetManager的executorAdded()方法,代码如下:

[java] view plain copy

  1. def executorAdded() {
  2. recomputeLocality()
  3. }

没说的,继续追踪,看recomputeLocality()方法。代码如下:

[java] view plain copy

  1. // 重新计算位置
  2. def recomputeLocality() {
  3. // 首先获取之前的位置Level
  4. // currentLocalityIndex为有效位置策略级别中的索引,默认为0
  5. val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
  6. // 计算有效的位置Level
  7. myLocalityLevels = computeValidLocalityLevels()
  8. // 获得位置策略级别的等待时间
  9. localityWaits = myLocalityLevels.map(getLocalityWait)
  10. // 设置当前使用的位置策略级别的索引
  11. currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
  12. }

首先说下这个currentLocalityIndex,它的定义为:

[java] view plain copy

  1. var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels

它是有效位置策略级别中的索引,指示当前的位置信息。也就是我们上一个task被launched所使用的Locality Level。

接下来看下myLocalityLevels,它是任务集TaskSet中应该使用哪种位置Level的数组,在TaskSetManager对象实例化时即被初始化,变量定义如下:

[java] view plain copy

  1. // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
  2. // 确定在我们的任务集TaskSet中应该使用哪种位置Level,以便我们做延迟调度
  3. var myLocalityLevels = computeValidLocalityLevels()

computeValidLocalityLevels()方法为计算该TaskSet使用的位置策略的方法,代码如下:

[java] view plain copy

  1. /**
  2. * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
  3. * added to queues using addPendingTask.
  4. * 计算该TaskSet使用的位置策略。假设所有的任务已经通过addPendingTask()被添加入队列
  5. */
  6. private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
  7. // 引入任务位置策略
  8. import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
  9. // 创建ArrayBuffer类型的levels,存储TaskLocality
  10. val levels = new ArrayBuffer[TaskLocality.TaskLocality]
  11. // 如果pendingTasksForExecutor不为空,且PROCESS_LOCAL级别中TaskSetManager等待分配下一个任务的时间不为零,且
  12. // 如果pendingTasksForExecutor中每个executorId在sched的executorIdToTaskCount中存在
  13. // executorIdToTaskCount为每个executor上运行的task的数目集合
  14. if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
  15. pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
  16. levels += PROCESS_LOCAL
  17. }
  18. // 如果pendingTasksForHost不为空,且NODE_LOCAL级别中TaskSetManager等待分配下一个任务的时间不为零,且
  19. // 如果pendingTasksForHost中每个host在sched的executorsByHost中存在
  20. // executorsByHost为每个host上executors的集合
  21. if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
  22. pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
  23. levels += NODE_LOCAL
  24. }
  25. // 如果存在没有位置信息的task,则添加NO_PREF级别
  26. if (!pendingTasksWithNoPrefs.isEmpty) {
  27. levels += NO_PREF
  28. }
  29. // 同样处理RACK_LOCAL级别
  30. if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
  31. pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
  32. levels += RACK_LOCAL
  33. }
  34. // 最后加上一个ANY级别
  35. levels += ANY
  36. logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
  37. // 返回
  38. levels.toArray
  39. }

这里,我们先看下其中几个比较重要的数据结构。在TaskSetManager中,存在如下几个数据结构:

[java] view plain copy

  1. // 每个executor上即将被执行的tasks的映射集合
  2. private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
  3. // 每个host上即将被执行的tasks的映射集合
  4. private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
  5. // 每个rack上即将被执行的tasks的映射集合
  6. private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
  7. // Set containing pending tasks with no locality preferences.
  8. // 存储所有没有位置信息的即将运行tasks的index索引的集合
  9. var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
  10. // Set containing all pending tasks (also used as a stack, as above).
  11. // 存储所有即将运行tasks的index索引的集合
  12. val allPendingTasks = new ArrayBuffer[Int]

这些数据结构,存储了task与不同位置的载体的对应关系。在TaskSetManager对象被构造时,有如下代码被执行:

[java] view plain copy

  1. // Add all our tasks to the pending lists. We do this in reverse order
  2. // of task index so that tasks with low indices get launched first.
  3. // 将所有的tasks添加到pending列表。我们用倒序的任务索引一遍较低索引的任务可以被优先加载
  4. for (i <- (0 until numTasks).reverse) {
  5. addPendingTask(i)
  6. }

它对TaskSetManager中的tasks的索引倒序处理。addPendingTask()方法如下:

[java] view plain copy

  1. /** Add a task to all the pending-task lists that it should be on. */
  2. // 添加一个任务的索引到所有相关的pending-task索引列表
  3. private def addPendingTask(index: Int) {
  4. // Utility method that adds `index` to a list only if it‘s not already there
  5. // 定义了一个如果索引不存在添加索引至列表的工具方法
  6. def addTo(list: ArrayBuffer[Int]) {
  7. if (!list.contains(index)) {
  8. list += index
  9. }
  10. }
  11. // 遍历task的优先位置
  12. for (loc <- tasks(index).preferredLocations) {
  13. loc match {
  14. case e: ExecutorCacheTaskLocation => // 如果为ExecutorCacheTaskLocation
  15. // 添加任务索引index至pendingTasksForExecutor列表
  16. addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
  17. case e: HDFSCacheTaskLocation => {// 如果为HDFSCacheTaskLocation
  18. // 调用sched(即TaskSchedulerImpl)的getExecutorsAliveOnHost()方法,获得指定Host上的Alive Executors
  19. val exe = sched.getExecutorsAliveOnHost(loc.host)
  20. exe match {
  21. case Some(set) => {
  22. // 循环host上的每个Alive Executor,添加任务索引index至pendingTasksForExecutor列表
  23. for (e <- set) {
  24. addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
  25. }
  26. logInfo(s"Pending task $index has a cached location at ${e.host} " +
  27. ", where there are executors " + set.mkString(","))
  28. }
  29. case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
  30. ", but there are no executors alive there.")
  31. }
  32. }
  33. case _ => Unit
  34. }
  35. // 添加任务索引index至pendingTasksForHost列表
  36. addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
  37. // 根据获得任务优先位置host获得机架rack,循环,添加任务索引index至pendingTasksForRack列表
  38. for (rack <- sched.getRackForHost(loc.host)) {
  39. addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
  40. }
  41. }
  42. // 如果task没有位置属性,则将任务的索引index添加到pendingTasksWithNoPrefs,pendingTasksWithNoPrefs为存储所有没有位置信息的即将运行tasks的index索引的集合
  43. if (tasks(index).preferredLocations == Nil) {
  44. addTo(pendingTasksWithNoPrefs)
  45. }
  46. // 将任务的索引index加入到allPendingTasks,allPendingTasks为存储所有即将运行tasks的index索引的集合
  47. allPendingTasks += index  // No point scanning this whole list to find the old task there
  48. }

鉴于上面注释很清晰,这里,我们只说下重点,它是根据task的preferredLocations,来决定该往哪个数据结构存储的。最终,将task的位置信息,存储到不同的数据结构中,方便后续任务调度的处理。

同时,在TaskSetManager中TaskSchedulerImpl类型的变量中,还存在着如下几个数据结构:

[java] view plain copy

  1. // Number of tasks running on each executor
  2. // 每个executor上正在运行的tasks的数目
  3. private val executorIdToTaskCount = new HashMap[String, Int]
  4. // The set of executors we have on each host; this is used to compute hostsAlive, which
  5. // in turn is used to decide when we can attain data locality on a given host
  6. // 每个host上executors的集合
  7. // 这个executorsByHost被用来计算host活跃性,反过来我们用它来决定在给定的主机上何时实现数据本地性
  8. protected val executorsByHost = new HashMap[String, HashSet[String]]
  9. // 每个rack上hosts的映射关系
  10. protected val hostsByRack = new HashMap[String, HashSet[String]]

它反映了当前集群中executor、host、rack的对应关系。而在computeValidLocalityLevels()方法中,根据task的位置属性和当前集群中executor、host、rack的对应关系,依靠上面这两组数据结构,就能很方便的确定该TaskSet的TaskLocality Level,详细流程不再赘述,读者可自行阅读代码。

这里,我们只说下getLocalityWait()方法,它是获取Locality级别对应TaskSetManager等待分配下一个任务的时间,代码如下:

[java] view plain copy

  1. // 获取Locality级别对应TaskSetManager等待分配下一个任务的时间
  2. private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
  3. // 默认等待时间,取自参数spark.locality.wait,默认为3s
  4. val defaultWait = conf.get("spark.locality.wait", "3s")
  5. // 根据不同的TaskLocality,取不同的参数,设置TaskLocality等待时间
  6. // PROCESS_LOCAL取参数spark.locality.wait.process
  7. // NODE_LOCAL取参数spark.locality.wait.node
  8. // RACK_LOCAL取参数spark.locality.wait.rack
  9. val localityWaitKey = level match {
  10. case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
  11. case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
  12. case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
  13. case _ => null
  14. }
  15. if (localityWaitKey != null) {
  16. conf.getTimeAsMs(localityWaitKey, defaultWait)
  17. } else {
  18. 0L
  19. }
  20. }

不同的Locality级别对应取不同的参数。为什么要有这个Locality级别对应TaskSetManager等待分配下一个任务的时间呢?我们先留个小小的疑问。

回到recomputeLocality()方法,接下来便是调用computeValidLocalityLevels()这个方法,计算当前最新的有效的位置策略Level,为什么要再次计算呢?主要就是新的slave节点加入,我们需要重新评估下集群中task位置偏好与当前集群executor、host、rack等整体资源的关系,起到了一个位置策略级别动态调整的一个效果。

然后,便是获得位置策略级别的等待时间localityWaits、设置当前使用的位置策略级别的索引currentLocalityIndex,不再赘述。

好了,第7步就分析完了,有些细节留到以后再归纳整理吧。

接着分析第8步,循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性,也就是对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度。显然,我们需要首先看下resourceOfferSingleTaskSet()这个方法。代码如下:

[java] view plain copy

  1. private def resourceOfferSingleTaskSet(
  2. taskSet: TaskSetManager,
  3. maxLocality: TaskLocality,
  4. shuffledOffers: Seq[WorkerOffer],
  5. availableCpus: Array[Int],
  6. tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  7. // 标志位launchedTask初始化为false,用它来标记是否有task被成功分配或者launched
  8. var launchedTask = false
  9. // 循环shuffledOffers,即每个可用executor
  10. for (i <- 0 until shuffledOffers.size) {
  11. // 获取其executorId和host
  12. val execId = shuffledOffers(i).executorId
  13. val host = shuffledOffers(i).host
  14. // 如果executor上可利用cpu数目大于每个task需要的数目,则继续task分配
  15. // CPUS_PER_TASK为参数spark.task.cpus配置的值,未配置的话默认为1
  16. if (availableCpus(i) >= CPUS_PER_TASK) {
  17. try {
  18. // 调用TaskSetManager的resourceOffer()方法,处理返回的每个TaskDescription
  19. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
  20. // 分配task成功
  21. // 将task加入到tasks对应位置
  22. // 注意,tasks为一个空的,根据shuffledOffers和其可用cores生成的有一定结构的列表
  23. tasks(i) += task
  24. // 更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、
  25. // executorsByHost、availableCpus等数据结构
  26. val tid = task.taskId
  27. taskIdToTaskSetManager(tid) = taskSet // taskId与TaskSetManager的映射关系
  28. taskIdToExecutorId(tid) = execId // taskId与ExecutorId的映射关系
  29. executorIdToTaskCount(execId) += 1// executor上正在运行的task数目加1
  30. executorsByHost(host) += execId// host上对应的executor的映射关系
  31. availableCpus(i) -= CPUS_PER_TASK// 可以Cpu cores减少相应数目
  32. // 确保availableCpus(i)不小于0
  33. assert(availableCpus(i) >= 0)
  34. // 标志位launchedTask设置为true
  35. launchedTask = true
  36. }
  37. } catch {
  38. case e: TaskNotSerializableException =>
  39. logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
  40. // Do not offer resources for this task, but don‘t throw an error to allow other
  41. // task sets to be submitted.
  42. return launchedTask
  43. }
  44. }
  45. }
  46. return launchedTask
  47. }

该方法的主体流程如下:

1、标志位launchedTask初始化为false,用它来标记是否有task被成功分配或者launched;

2、循环shuffledOffers,即每个可用executor:

2.1、获取其executorId和host;

2.2、如果executor上可利用cpu数目大于每个task需要的数目,则继续task分配;

2.3、调用TaskSetManager的resourceOffer()方法,处理返回的每个TaskDescription:

2.3.1、分配task成功,将task加入到tasks对应位置(注意,tasks为一个空的,根据shuffledOffers和其可用cores生成的有一定结构的列表);

2.3.2、更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、executorsByHost、availableCpus等数据结构;

2.3.3、确保availableCpus(i)不小于0;

2.3.4、标志位launchedTask设置为true;

3、返回launchedTask。

其他都好说,我们只看下TaskSetManager的resourceOffer()方法。代码如下:

[java] view plain copy

  1. /**
  2. * Respond to an offer of a single executor from the scheduler by finding a task
  3. *
  4. * NOTE: this function is either called with a maxLocality which
  5. * would be adjusted by delay scheduling algorithm or it will be with a special
  6. * NO_PREF locality which will be not modified
  7. *
  8. * @param execId the executor Id of the offered resource
  9. * @param host  the host Id of the offered resource
  10. * @param maxLocality the maximum locality we want to schedule the tasks at
  11. */
  12. @throws[TaskNotSerializableException]
  13. def resourceOffer(
  14. execId: String,
  15. host: String,
  16. maxLocality: TaskLocality.TaskLocality)
  17. : Option[TaskDescription] =
  18. {
  19. if (!isZombie) {
  20. // 当前时间
  21. val curTime = clock.getTimeMillis()
  22. // 确定可以被允许的位置策略:allowedLocality
  23. var allowedLocality = maxLocality
  24. // 如果maxLocality不为TaskLocality.NO_PREF
  25. if (maxLocality != TaskLocality.NO_PREF) {
  26. // 获取被允许的Locality,主要是看等待时间
  27. allowedLocality = getAllowedLocalityLevel(curTime)
  28. // 如果allowedLocality大于maxLocality,将maxLocality赋值给allowedLocality
  29. if (allowedLocality > maxLocality) {
  30. // We‘re not allowed to search for farther-away tasks
  31. allowedLocality = maxLocality
  32. }
  33. }
  34. // 出列task,即分配task
  35. dequeueTask(execId, host, allowedLocality) match {
  36. case Some((index, taskLocality, speculative)) => {
  37. // 找到对应的task
  38. // Found a task; do some bookkeeping and return a task description
  39. val task = tasks(index)
  40. val taskId = sched.newTaskId()
  41. // Do various bookkeeping
  42. // 更新copiesRunning
  43. copiesRunning(index) += 1
  44. val attemptNum = taskAttempts(index).size
  45. // 创建TaskInfo
  46. val info = new TaskInfo(taskId, index, attemptNum, curTime,
  47. execId, host, taskLocality, speculative)
  48. // 更新taskInfos
  49. taskInfos(taskId) = info
  50. // 更新taskAttempts
  51. taskAttempts(index) = info :: taskAttempts(index)
  52. // Update our locality level for delay scheduling
  53. // NO_PREF will not affect the variables related to delay scheduling
  54. // 设置currentLocalityIndex、lastLaunchTime
  55. if (maxLocality != TaskLocality.NO_PREF) {
  56. currentLocalityIndex = getLocalityIndex(taskLocality)
  57. lastLaunchTime = curTime
  58. }
  59. // Serialize and return the task
  60. // 开始时间
  61. val startTime = clock.getTimeMillis()
  62. // 序列化task,得到serializedTask
  63. val serializedTask: ByteBuffer = try {
  64. Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
  65. } catch {
  66. // If the task cannot be serialized, then there‘s no point to re-attempt the task,
  67. // as it will always fail. So just abort the whole task-set.
  68. case NonFatal(e) =>
  69. val msg = s"Failed to serialize task $taskId, not attempting to retry it."
  70. logError(msg, e)
  71. abort(s"$msg Exception during serialization: $e")
  72. throw new TaskNotSerializableException(e)
  73. }
  74. if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
  75. !emittedTaskSizeWarning) {
  76. emittedTaskSizeWarning = true
  77. logWarning(s"Stage ${task.stageId} contains a task of very large size " +
  78. s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
  79. s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
  80. }
  81. // 添加running task
  82. addRunningTask(taskId)
  83. // We used to log the time it takes to serialize the task, but task size is already
  84. // a good proxy to task serialization time.
  85. // val timeTaken = clock.getTime() - startTime
  86. val taskName = s"task ${info.id} in stage ${taskSet.id}"
  87. logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
  88. s"$taskLocality, ${serializedTask.limit} bytes)")
  89. // 调用DagScheduler的taskStarted()方法,标记Task已启动
  90. sched.dagScheduler.taskStarted(task, info)
  91. // 返回TaskDescription,其中包含taskId、attemptNumber、execId、index、serializedTask等重要信息
  92. // attemptNumber是推测执行原理必须使用的,即拖后腿的任务可以执行多份,谁先完成用谁的结果
  93. return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
  94. taskName, index, serializedTask))
  95. }
  96. case _ =>
  97. }
  98. }
  99. None
  100. }

resourceOffer()方法的处理流程大体如下:

1、记录当前时间;

2、 确定可以被允许的位置策略:allowedLocality;

3、出列task,即分配task;

3.1、如果找到对应的task,即task可以被分配:

3.1.1、完成获得taskId、更新copiesRunning、获得attemptNum、创建TaskInfo、更新taskInfos、更新taskAttempts、设置currentLocalityIndex、lastLaunchTime等基础数据结构的更新;

3.1.2、序列化task,得到serializedTask;

3.1.3、添加running task;

3.1.4、调用DagScheduler的taskStarted()方法,标记Task已启动;

3.1.5、返回TaskDescription,其中包含taskId、attemptNumber、execId、index、serializedTask等重要信息,attemptNumber是推测执行原理必须使用的,即拖后腿的任务可以执行多份,谁先完成用谁的结果。

首先说下这个allowedLocality,如果maxLocality不为TaskLocality.NO_PREF,我们需要调用getAllowedLocalityLevel(),传入当前时间,得到allowedLocality,getAllowedLocalityLevel()方法逻辑比较简单,代码如下:

[java] view plain copy

  1. /**
  2. * Get the level we can launch tasks according to delay scheduling, based on current wait time.
  3. * 基于当前的等待是时间,得到我们可以调度task的级别
  4. */
  5. private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
  6. // Remove the scheduled or finished tasks lazily
  7. // 判断task是否可以被调度
  8. def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
  9. var indexOffset = pendingTaskIds.size
  10. // 循环
  11. while (indexOffset > 0) {
  12. // 索引递减
  13. indexOffset -= 1
  14. // 获得task索引
  15. val index = pendingTaskIds(indexOffset)
  16. // 如果对应task不存在任何运行实例,且未执行成功,可以调度,返回true
  17. if (copiesRunning(index) == 0 && !successful(index)) {
  18. return true
  19. } else {
  20. // 从pendingTaskIds中移除
  21. pendingTaskIds.remove(indexOffset)
  22. }
  23. }
  24. false
  25. }
  26. // Walk through the list of tasks that can be scheduled at each location and returns true
  27. // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
  28. // already been scheduled.
  29. def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
  30. val emptyKeys = new ArrayBuffer[String]
  31. // 循环pendingTasks
  32. val hasTasks = pendingTasks.exists {
  33. case (id: String, tasks: ArrayBuffer[Int]) =>
  34. // 判断task是否可以被调度
  35. if (tasksNeedToBeScheduledFrom(tasks)) {
  36. true
  37. } else {
  38. emptyKeys += id
  39. false
  40. }
  41. }
  42. // The key could be executorId, host or rackId
  43. // 移除数据
  44. emptyKeys.foreach(id => pendingTasks.remove(id))
  45. hasTasks
  46. }
  47. // 从当前索引currentLocalityIndex开始,循环myLocalityLevels
  48. while (currentLocalityIndex < myLocalityLevels.length - 1) {
  49. // 是否存在待调度task,根据不同的Locality Level,调用moreTasksToRunIn()方法从不同的数据结构中获取,
  50. // NO_PREF直接看pendingTasksWithNoPrefs是否为空
  51. val moreTasks = myLocalityLevels(currentLocalityIndex) match {
  52. case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
  53. case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
  54. case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
  55. case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
  56. }
  57. if (!moreTasks) {// 不存在可以被调度的task
  58. // This is a performance optimization: if there are no more tasks that can
  59. // be scheduled at a particular locality level, there is no point in waiting
  60. // for the locality wait timeout (SPARK-4939).
  61. // 记录lastLaunchTime
  62. lastLaunchTime = curTime
  63. logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
  64. s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
  65. // 位置策略索引加1
  66. currentLocalityIndex += 1
  67. } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
  68. // Jump to the next locality level, and reset lastLaunchTime so that the next locality
  69. // wait timer doesn‘t immediately expire
  70. // 更新localityWaits
  71. lastLaunchTime += localityWaits(currentLocalityIndex)
  72. // 位置策略索引加1
  73. currentLocalityIndex += 1
  74. logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
  75. s"${localityWaits(currentLocalityIndex)}ms")
  76. } else {
  77. // 返回当前位置策略级别
  78. return myLocalityLevels(currentLocalityIndex)
  79. }
  80. }
  81. // 返回当前位置策略级别
  82. myLocalityLevels(currentLocalityIndex)
  83. }

在确定allowedLocality后,我们就需要调用dequeueTask()方法,出列task,进行调度。代码如下:

[java] view plain copy

  1. /**
  2. * Dequeue a pending task for a given node and return its index and locality level.
  3. * Only search for tasks matching the given locality constraint.
  4. *
  5. * @return An option containing (task index within the task set, locality, is speculative?)
  6. */
  7. private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
  8. : Option[(Int, TaskLocality.Value, Boolean)] =
  9. {
  10. // 首先调用dequeueTaskFromList()方法,对PROCESS_LOCAL级别的task进行调度
  11. for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
  12. return Some((index, TaskLocality.PROCESS_LOCAL, false))
  13. }
  14. // PROCESS_LOCAL未调度到task的话,再调度NODE_LOCAL级别
  15. if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
  16. for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
  17. return Some((index, TaskLocality.NODE_LOCAL, false))
  18. }
  19. }
  20. // NODE_LOCAL未调度到task的话,再调度NO_PREF级别
  21. if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
  22. // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
  23. for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
  24. return Some((index, TaskLocality.PROCESS_LOCAL, false))
  25. }
  26. }
  27. // NO_PREF未调度到task的话,再调度RACK_LOCAL级别
  28. if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
  29. for {
  30. rack <- sched.getRackForHost(host)
  31. index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
  32. } {
  33. return Some((index, TaskLocality.RACK_LOCAL, false))
  34. }
  35. }
  36. // 最好是ANY级别的调度
  37. if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
  38. for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
  39. return Some((index, TaskLocality.ANY, false))
  40. }
  41. }
  42. // find a speculative task if all others tasks have been scheduled
  43. // 如果所有的class都被调度的话,寻找一个speculative task,同MapReduce的推测执行原理的思想
  44. dequeueSpeculativeTask(execId, host, maxLocality).map {
  45. case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  46. }

很简单,按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度。最后,如果所有的class都被调度的话,寻找一个speculative task,同MapReduce的推测执行原理的思想。

至此,我们得到了TaskDescription,也就知道了哪个Task需要在哪个节点上执行,而Task调度也就全讲完了。

题外话:

要透彻的、清晰的讲解一个复杂的流程,是很费力的,短短几篇文章也是远远不够的。Task调度这两篇文章,重在叙述一个完整的流程,同时讲解部分细节。在这两篇文章的叙述中,肯定会有很多细节没讲清楚、讲透彻,甚至会有些理解错误的地方,希望高手不吝赐教,以免继续误导大家。

针对部分细节,和对流程的深入理解,我以后还会陆续推出博文,进行详细讲解,并归纳总结,谢谢大家!

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50699939

时间: 2025-01-04 21:59:50

Spark源码分析之六:Task调度(二)的相关文章

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Accuracy(准确率), Precision(精确率), 和F1-Measure, 结合Spark源码分析

例子 某大学一个系,总共100人,其中男90人,女10人,现在根据每个人的特征,预测性别 Accuracy(准确率) Accuracy=预测正确的数量需要预测的总数 计算 由于我知道男生远多于女生,所以我完全无视特征,直接预测所有人都是男生 我预测所的人都是男生,而实际有90个男生,所以 预测正确的数量 = 90 需要预测的总数 = 100 Accuracy = 90 / 100 = 90% 问题 在男女比例严重不均匀的情况下,我只要预测全是男生,就能获得极高的Accuracy. 所以在正负样本

手机自动化测试:appium源码分析之bootstrap十二

手机自动化测试:appium源码分析之bootstrap十二 poptest是国内唯一一家培养测试开发工程师的培训机构,以学员能胜任自动化测试,性能测试,测试工具开发等工作为目标.如果对课程感兴趣,请大家咨询qq:908821478. ScrollTo package io.appium.android.bootstrap.handler; import com.android.uiautomator.core.UiObject; import com.android.uiautomator.c