Spark源码分析之四:Stage提交

各位看官,上一篇《Spark源码分析之Stage划分》详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交。

Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:

与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交。在handleJobSubmitted()方法的最后两行代码,便是Stage提交的处理。代码如下:

[java] view plain copy

  1. // 提交最后一个stage
  2. submitStage(finalStage)
  3. // 提交其他正在等待的stage
  4. submitWaitingStages()

从代码我们可以看出,Stage提交的逻辑顺序,是由后往前,即先提交最后一个finalStage,即ResultStage,然后再提交其parent stages,但是实际物理顺序是否如此呢?我们首先看下finalStage的提交,方法submitStage()代码如下:

[java] view plain copy

  1. /** Submits stage, but first recursively submits any missing parents. */
  2. // 提交stage,但是首先要递归的提交所有的missing父stage
  3. private def submitStage(stage: Stage) {
  4. // 根据stage获取jobId
  5. val jobId = activeJobForStage(stage)
  6. if (jobId.isDefined) {// 如果jobId已定义
  7. // 记录Debug日志信息:submitStage(stage)
  8. logDebug("submitStage(" + stage + ")")
  9. // 如果在waitingStages、runningStages或
  10. // failedStages任意一个中,不予处理
  11. // 既不在waitingStages中,也不在runningStages中,还不在failedStages中
  12. // 说明未处理过
  13. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  14. // 调用getMissingParentStages()方法,获取stage还没有提交的parent
  15. val missing = getMissingParentStages(stage).sortBy(_.id)
  16. logDebug("missing: " + missing)
  17. if (missing.isEmpty) {
  18. // 如果missing为空,说明是没有parent的stage或者其parent stages已提交,
  19. // 则调用submitMissingTasks()方法,提交tasks
  20. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  21. submitMissingTasks(stage, jobId.get)
  22. } else {
  23. // 否则,说明其parent还没有提交,递归,循环missing,提交每个stage
  24. for (parent <- missing) {
  25. submitStage(parent)
  26. }
  27. // 将该stage加入到waitingStages中
  28. waitingStages += stage
  29. }
  30. }
  31. } else {
  32. // 放弃该Stage
  33. abortStage(stage, "No active job for stage " + stage.id, None)
  34. }
  35. }

代码逻辑比较简单。根据stage获取到jobId,如果jobId未定义,说明该stage不属于明确的Job,则调用abortStage()方法放弃该stage。如果jobId已定义的话,则需要判断该stage属于waitingStages、runningStages、failedStages中任意一个,则该stage忽略,不被处理。顾名思义,waitingStages为等待处理的stages,spark采取由后往前的顺序处理stage提交,即先处理child stage,然后再处理parent stage,所以位于waitingStages中的stage,由于其child stage尚未处理,所以必须等待,runningStages为正在运行的stages,正在运行意味着已经提交了,所以无需再提交,而最后的failedStages就是失败的stages,既然已经失败了,再提交也还是会失败,徒劳无益啊~

此时,如果stage不位于上述三个数据结构中,则可以继续执行提交流程。接下来该怎么做呢?

首先调用getMissingParentStages()方法,获取stage还没有提交的parent,即missing;如果missing为空,说明该stage要么没有parent stage,要么其parent stages都已被提交,此时该stage就可以被提交,用于提交的方法submitMissingTasks()我们稍后分析。

如果missing不为空,则说明该stage还存在尚未被提交的parent stages,那么,我们就需要遍历missing,循环提交每个stage,并将该stage添加到waitingStages中,等待其parent stages都被提交后再被提交。

我们先看下这个missing是如何获取的。进入getMissingParentStages()方法,代码如下:

[java] view plain copy

  1. private def getMissingParentStages(stage: Stage): List[Stage] = {
  2. // 存储尚未提交的parent stages,用于最后结果的返回
  3. val missing = new HashSet[Stage]
  4. // 已被处理的RDD集合
  5. val visited = new HashSet[RDD[_]]
  6. // We are manually maintaining a stack here to prevent StackOverflowError
  7. // caused by recursively visiting
  8. // 待处理RDD栈,后入先出
  9. val waitingForVisit = new Stack[RDD[_]]
  10. // 定义函数visit
  11. def visit(rdd: RDD[_]) {
  12. // 通过visited判断rdd是否已处理
  13. if (!visited(rdd)) {
  14. // 添加到visited,下次不会再处理
  15. visited += rdd
  16. val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
  17. if (rddHasUncachedPartitions) {
  18. // 循环rdd的dependencies
  19. for (dep <- rdd.dependencies) {
  20. dep match {
  21. // 宽依赖
  22. case shufDep: ShuffleDependency[_, _, _] =>
  23. // 调用getShuffleMapStage,获取ShuffleMapStage
  24. val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
  25. if (!mapStage.isAvailable) {
  26. missing += mapStage
  27. }
  28. // 窄依赖,直接将RDD压入waitingForVisit栈
  29. case narrowDep: NarrowDependency[_] =>
  30. waitingForVisit.push(narrowDep.rdd)
  31. }
  32. }
  33. }
  34. }
  35. }
  36. // 将stage的rdd压入到waitingForVisit顶部
  37. waitingForVisit.push(stage.rdd)
  38. // 循环处理waitingForVisit,对弹出的每个rdd调用函数visit
  39. while (waitingForVisit.nonEmpty) {
  40. visit(waitingForVisit.pop())
  41. }
  42. // 返回stage列表
  43. missing.toList
  44. }

有没有些似曾相识的感觉呢?对了,和《Spark源码分析之Stage划分》一文中getParentStages()方法、getAncestorShuffleDependencies()方法结构类似,也是定义了三个数据结构和一个visit()方法。三个数据结构分别是:

1、missing:HashSet[Stage]类型,存储尚未提交的parent stages,用于最后结果的返回;

2、visited:HashSet[RDD[_]]类型,已被处理的RDD集合,位于其中的RDD不会被重复处理;

3、waitingForVisit:Stack[RDD[_]]类型,等待被处理的RDD栈,后入先出。

visit()方法的处理逻辑也比较简单,大致如下:

通过RDD是否在visited中判断RDD是否已处理,若未被处理,添加到visited中,然后循环rdd的dependencies,如果是宽依赖ShuffleDependency,调用getShuffleMapStage(),获取ShuffleMapStage(此次调用则是直接取出已生成的stage,因为划分阶段已将stage全部生成,拿来主义即可),判断该stage的isAvailable标志位,若为false,则说明该stage未被提交过,加入到missing集合,如果是窄依赖NarrowDependency,直接将RDD压入waitingForVisit栈,等待后续处理,因为窄依赖的RDD同属于同一个stage,加入waitingForVisit只是为了后续继续沿着DAG图继续往上处理。

那么,整个missing的获取就一目了然,将final stage即ResultStage的RDD压入到waitingForVisit顶部,循环处理即可得到missing。

至此,各位可能有个疑问,这个ShuffleMapStage的isAvailable为什么能决定该stage是否已被提交呢?卖个关子,后续再分析。

submitStage()方法已分析完毕,go on,我们再回归到handleJobSubmitted()方法,在调用submitStage()方法提交finalStage之后,实际上只是将最原始的parent stage提交,其它child stage均存储在了waitingStages中,那么,接下来,我们就要调用submitWaitingStages()方法提交其中的stage。代码如下:

[java] view plain copy

  1. /**
  2. * Check for waiting or failed stages which are now eligible for resubmission.
  3. * Ordinarily run on every iteration of the event loop.
  4. */
  5. private def submitWaitingStages() {
  6. // TODO: We might want to run this less often, when we are sure that something has become
  7. // runnable that wasn‘t before.
  8. logTrace("Checking for newly runnable parent stages")
  9. logTrace("running: " + runningStages)
  10. logTrace("waiting: " + waitingStages)
  11. logTrace("failed: " + failedStages)
  12. // 将waitingStages转换为数组
  13. val waitingStagesCopy = waitingStages.toArray
  14. // 清空waitingStages
  15. waitingStages.clear()
  16. // 循环waitingStagesCopy,挨个调用submitStage()方法进行提交
  17. for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
  18. submitStage(stage)
  19. }
  20. }

很简单,既然stages的顺序已经梳理正确,将waitingStages转换为数组waitingStagesCopy,针对每个stage挨个调用submitStage()方法进行提交即可。

还记得我卖的那个关子吗?ShuffleMapStage的isAvailable为什么能决定该stage是否已被提交呢?现在来解开这个谜团。首先,看下ShuffleMapStage的isAvailable是如何定义的,在ShuffleMapStage中,代码如下:

[java] view plain copy

  1. /**
  2. * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
  3. * This should be the same as `outputLocs.contains(Nil)`.
  4. * 如果map stage已就绪的话返回true,即所有分区均有shuffle输出。这个将会和outputLocs.contains保持一致。
  5. */
  6. def isAvailable: Boolean = _numAvailableOutputs == numPartitions

它是通过判断_numAvailableOutputs和numPartitions是否相等来确定stage是否已被提交(或者说准备就绪可以提交is ready)的,而numPartitions很好理解,就是stage中的全部分区数目,那么_numAvailableOutputs是什么呢?

[java] view plain copy

  1. private[this] var _numAvailableOutputs: Int = 0
  2. /**
  3. * Number of partitions that have shuffle outputs.
  4. * When this reaches [[numPartitions]], this map stage is ready.
  5. * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.
  6. *
  7. * 拥有shuffle的分区数量。
  8. * 当这个numAvailableOutputs达到numPartitions时,这个map stage也就准备好了。
  9. * 这个应与outputLocs.filter(!_.isEmpty).size保持一致
  10. */
  11. def numAvailableOutputs: Int = _numAvailableOutputs

可以看出,_numAvailableOutputs就是拥有shuffle outputs的分区数量,当这个numAvailableOutputs达到numPartitions时,这个map stage也就准备好了。

那么这个_numAvailableOutputs开始时默认为0,它是在何时被赋值的呢?通篇看完ShuffleMapStage的源码,只有两个方法对_numAvailableOutputs的值做修改,代码如下:

[java] view plain copy

  1. def addOutputLoc(partition: Int, status: MapStatus): Unit = {
  2. val prevList = outputLocs(partition)
  3. outputLocs(partition) = status :: prevList
  4. if (prevList == Nil) {
  5. _numAvailableOutputs += 1
  6. }
  7. }
  8. def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
  9. val prevList = outputLocs(partition)
  10. val newList = prevList.filterNot(_.location == bmAddress)
  11. outputLocs(partition) = newList
  12. if (prevList != Nil && newList == Nil) {
  13. _numAvailableOutputs -= 1
  14. }
  15. }

什么时候调用的这个addOutputLoc()方法呢?答案就在DAGScheduler的newOrUsedShuffleStage()方法中。方法主要逻辑如下:

[java] view plain copy

  1. if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
  2. // 如果mapOutputTracker中存在
  3. // 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象
  4. val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
  5. // 反序列化
  6. val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
  7. // 循环
  8. (0 until locs.length).foreach { i =>
  9. if (locs(i) ne null) {
  10. // locs(i) will be null if missing
  11. // 将
  12. stage.addOutputLoc(i, locs(i))
  13. }
  14. }
  15. } else {
  16. // 如果mapOutputTracker中不存在,注册一个
  17. // Kind of ugly: need to register RDDs with the cache and map output tracker here
  18. // since we can‘t do it in the RDD constructor because # of partitions is unknown
  19. logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
  20. // 注册的内容为
  21. // 1、根据shuffleDep获取的shuffleId;
  22. // 2、rdd中分区的个数
  23. mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  24. }

这个方法在stage划分过程中,第一轮被调用,此时mapOutputTracker中并没有注册shuffle相关信息,所以走的是else分支,调用mapOutputTracker的registerShuffle()方法注册shuffle,而在stage提交过程中,第二轮被调用,此时shuffle已在mapOutputTracker中注册,则会根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象,反序列化并循环调用stage的addOutputLoc()方法,更新stage的outputLocs,并累加_numAvailableOutputs,至此,关子卖完,再有疑问,后续再慢慢分析吧。

到了这里,就不得不分析下真正提交stage的方法submitMissingTasks()了。莫慌,慢慢看,代码如下:

[java] view plain copy

  1. /** Called when stage‘s parents are available and we can now do its task. */
  2. private def submitMissingTasks(stage: Stage, jobId: Int) {
  3. logDebug("submitMissingTasks(" + stage + ")")
  4. // Get our pending tasks and remember them in our pendingTasks entry
  5. // 清空stage的pendingPartitions
  6. stage.pendingPartitions.clear()
  7. // First figure out the indexes of partition ids to compute.
  8. // 首先确定该stage需要计算的分区ID索引
  9. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  10. // Create internal accumulators if the stage has no accumulators initialized.
  11. // Reset internal accumulators only if this stage is not partially submitted
  12. // Otherwise, we may override existing accumulator values from some tasks
  13. if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
  14. stage.resetInternalAccumulators()
  15. }
  16. // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
  17. // with this Stage
  18. val properties = jobIdToActiveJob(jobId).properties
  19. // 将stage加入到runningStages中
  20. runningStages += stage
  21. // SparkListenerStageSubmitted should be posted before testing whether tasks are
  22. // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  23. // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  24. // event.
  25. // 开启一个stage时,需要调用outputCommitCoordinator的stageStart()方法,
  26. stage match {
  27. // 如果为ShuffleMapStage
  28. case s: ShuffleMapStage =>
  29. outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
  30. // 如果为ResultStage
  31. case s: ResultStage =>
  32. outputCommitCoordinator.stageStart(
  33. stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  34. }
  35. // 创建一个Map:taskIdToLocations,存储的是id->Seq[TaskLocation]的映射关系
  36. // 对stage中指定RDD的每个分区获取位置信息,映射成id->Seq[TaskLocation]的关系
  37. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  38. stage match {
  39. // 如果是ShuffleMapStage
  40. case s: ShuffleMapStage =>
  41. partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
  42. // 如果是ResultStage
  43. case s: ResultStage =>
  44. val job = s.activeJob.get
  45. partitionsToCompute.map { id =>
  46. val p = s.partitions(id)
  47. (id, getPreferredLocs(stage.rdd, p))
  48. }.toMap
  49. }
  50. } catch {
  51. case NonFatal(e) =>
  52. stage.makeNewStageAttempt(partitionsToCompute.size)
  53. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  54. abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
  55. runningStages -= stage
  56. return
  57. }
  58. // 标记新的stage attempt
  59. stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  60. // 发送一个SparkListenerStageSubmitted事件
  61. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  62. // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  63. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  64. // the serialized copy of the RDD and for each task we will deserialize it, which means each
  65. // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  66. // might modify state of objects referenced in their closures. This is necessary in Hadoop
  67. // where the JobConf/Configuration object is not thread-safe.
  68. // 对stage进行序列化,如果是ShuffleMapStage,序列化rdd和shuffleDep,如果是ResultStage,序列化rdd和func
  69. var taskBinary: Broadcast[Array[Byte]] = null
  70. try {
  71. // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  72. // 对于ShuffleMapTask,序列化并广播,广播的是rdd和shuffleDep
  73. // For ResultTask, serialize and broadcast (rdd, func).
  74. // 对于ResultTask,序列化并广播,广播的是rdd和func
  75. val taskBinaryBytes: Array[Byte] = stage match {
  76. case stage: ShuffleMapStage =>
  77. // 序列化ShuffleMapStage
  78. closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
  79. case stage: ResultStage =>
  80. // 序列化ResultStage
  81. closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
  82. }
  83. // 通过sc广播序列化的task
  84. taskBinary = sc.broadcast(taskBinaryBytes)
  85. } catch {
  86. // In the case of a failure during serialization, abort the stage.
  87. case e: NotSerializableException =>
  88. abortStage(stage, "Task not serializable: " + e.toString, Some(e))
  89. runningStages -= stage
  90. // Abort execution
  91. return
  92. case NonFatal(e) =>
  93. abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))
  94. runningStages -= stage
  95. return
  96. }
  97. // 针对stage的每个分区构造task,形成tasks:ShuffleMapStage生成ShuffleMapTasks,ResultStage生成ResultTasks
  98. val tasks: Seq[Task[_]] = try {
  99. stage match {
  100. // 如果是ShuffleMapStage
  101. case stage: ShuffleMapStage =>
  102. partitionsToCompute.map { id =>
  103. // 位置信息
  104. val locs = taskIdToLocations(id)
  105. val part = stage.rdd.partitions(id)
  106. // 创建ShuffleMapTask,其中包括位置信息
  107. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  108. taskBinary, part, locs, stage.internalAccumulators)
  109. }
  110. // 如果是ResultStage
  111. case stage: ResultStage =>
  112. val job = stage.activeJob.get
  113. partitionsToCompute.map { id =>
  114. val p: Int = stage.partitions(id)
  115. val part = stage.rdd.partitions(p)
  116. val locs = taskIdToLocations(id)
  117. // 创建ResultTask
  118. new ResultTask(stage.id, stage.latestInfo.attemptId,
  119. taskBinary, part, locs, id, stage.internalAccumulators)
  120. }
  121. }
  122. } catch {
  123. case NonFatal(e) =>
  124. abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
  125. runningStages -= stage
  126. return
  127. }
  128. // 如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成
  129. if (tasks.size > 0) {
  130. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  131. // 赋值pendingPartitions
  132. stage.pendingPartitions ++= tasks.map(_.partitionId)
  133. logDebug("New pending partitions: " + stage.pendingPartitions)
  134. // 利用taskScheduler.submitTasks()提交task
  135. taskScheduler.submitTasks(new TaskSet(
  136. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  137. // 记录提交时间
  138. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  139. } else {
  140. // Because we posted SparkListenerStageSubmitted earlier, we should mark
  141. // the stage as completed here in case there are no tasks to run
  142. // 标记stage已完成
  143. markStageAsFinished(stage, None)
  144. val debugString = stage match {
  145. case stage: ShuffleMapStage =>
  146. s"Stage ${stage} is actually done; " +
  147. s"(available: ${stage.isAvailable}," +
  148. s"available outputs: ${stage.numAvailableOutputs}," +
  149. s"partitions: ${stage.numPartitions})"
  150. case stage : ResultStage =>
  151. s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
  152. }
  153. logDebug(debugString)
  154. }
  155. }

submitMissingTasks()方法,最主要的就是针对每个stage生成一组Tasks,即TaskSet,并调用TaskScheduler的submitTasks()方法提交tasks。它主要做了以下几件事情:

1、清空stage的pendingPartitions;

2、首先确定该stage需要计算的分区ID索引,保存至partitionsToCompute;

3、将stage加入到runningStages中,标记stage正在运行,与上面的阐述对应;

4、开启一个stage时,需要调用outputCommitCoordinator的stageStart()方法;

5、创建一个Map:taskIdToLocations,存储的是id->Seq[TaskLocation]的映射关系,并对stage中指定RDD的每个分区获取位置信息,映射成id->Seq[TaskLocation]的关系;

6、标记新的stage attempt,并发送一个SparkListenerStageSubmitted事件;

7、对stage进行序列化并广播,如果是ShuffleMapStage,序列化rdd和shuffleDep,如果是ResultStage,序列化rdd和func;

8、最重要的,针对stage的每个分区构造task,形成tasks:ShuffleMapStage生成ShuffleMapTasks,ResultStage生成ResultTasks;

9、如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成。

至此,stage提交的主体流程已全部分析完毕,后续的Task调度与执行留待以后分析,而stage提交部分细节或者遗漏之处,特别是task生成时的部分细节,也留待以后再细细琢磨吧~

晚安!

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50679842

时间: 2024-08-05 01:11:15

Spark源码分析之四:Stage提交的相关文章

Apache Spark源码分析-- Job的提交与运行

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建. 实验环境搭建 在进行后续操作前,确保下列条件已满足. 1. 下载spark binary 0.9.1 2. 安装scala 3. 安装sbt 4. 安装java 启动spark-shell单机模式运行,即local模式 local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME MASTER=local bin/spark-shell "MASTER=local&quo

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Accuracy(准确率), Precision(精确率), 和F1-Measure, 结合Spark源码分析

例子 某大学一个系,总共100人,其中男90人,女10人,现在根据每个人的特征,预测性别 Accuracy(准确率) Accuracy=预测正确的数量需要预测的总数 计算 由于我知道男生远多于女生,所以我完全无视特征,直接预测所有人都是男生 我预测所的人都是男生,而实际有90个男生,所以 预测正确的数量 = 90 需要预测的总数 = 100 Accuracy = 90 / 100 = 90% 问题 在男女比例严重不均匀的情况下,我只要预测全是男生,就能获得极高的Accuracy. 所以在正负样本