Spark源码阅读(1): Stage划分

Spark中job由action动作生成,那么stage是如何划分的呢?一般的解答是根据宽窄依赖划分。那么我们深入源码看看吧

一个action 例如count,会在多次runJob中传递,最终会到一个函数

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
dagScheduler是DAGScheduler的一个实例,因此,后面的工作都交给DAGScheduler进行。

在dagScheduler.runJob主要是调用submitJob函数

submitJob的主要工作是向job scheduler 提交一个job并且创建一个JobWaiter对象。 这个JobWaiter对象一直阻塞到job完成或者取消。
其中提交动作实际上是将Job提交给一个DAGSchedulerEventProcessLoop,在这个里面处理job的运行。主要代码如下那么DAGSchedulerEventProcessLoop是如何处理的呢?

val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))

  

DAGSchedulerEventProcessLoop中回去接受,然后去查看是哪种情况。从这里可以看出,

还有很多操作会被塞进这个Loop中。这么做的原因呢?(解耦?)

override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}

  

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)

case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)

...
}

  

DAGSchedulerEventProcessLoop的父类EventLoop有开线程,因此上述处理工作会在另一个线程中进行。

private val eventThread = new Thread(name) {
setDaemon(true)

override def run(): Unit = {
try {
while (!stopped.get) {
...
}

  

另外,其实所有的处理工作还是在DAGScheduler中进行。接下来深入handleJobSubmitted
这个给出全部代码。主要逻辑是利用最后一个RDD去生成ResultStage。生成之后创建ActiveJob并记录相关信息,并通过submitStage(finalStage)处理

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
submitWaitingStages()
}

  

接下来分两个脉络进行(1)newResultStage 和 (2)submitStage

(1) newResultStage
在这里要生成一个ResultStage,这个stage的创建是需要其父stage的信息的,所以通过getParentStagesAndId获取

private def newResultStage(
rdd: RDD[_],
numTasks: Int,
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

  

这个里面由getParentStages进行。利用一个栈来处理未访问的rdd,首先是末尾的rdd,然后看其依赖。
如果是一个ShuffleDependency

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can‘t do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}

  

这里面通过判断一个RDD的依赖是不是Shuffle的来进行。像reduceByKey这样的操作。RDD1是转换前的,RDD2为转换后的,
那么RDD2的依赖就是ShuffleDependency, 这个ShuffleDependency对象中也有RDD,其RDD就是RDD1.
如果是ShuffleDependency的话就通过getShuffleDependency来获得。
那么这段代码的大致原理就是,先把末尾的RDD加入stack中,也就是waitingForVisit, 然后获取是ShuffleDependency的stage。
所以根据ShuffleDependency来划分stage。但是好像还没有看到如何将中间的那些RDD放到一个stage中。

继续深入getShuffleMapStage,

private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, firstJobId)
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage

stage
}
}

  

如果这个ShuffleDependency已经被生成过ShuffleMapStage, 那么直接获取,如果没有则需要注册。

深入registerShuffleDependencies这个函数,

private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) # 获取这个ShuffleDependency前面的还没在
# shuffleToMapStage中注册的ShuffleDependency
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}

  

那么newOrUsedShuffleStage这个函数是干嘛的呢?用于生成一个stage,并注册到shuffleToMapStage

(2) submitStage

时间: 2024-08-04 21:30:20

Spark源码阅读(1): Stage划分的相关文章

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp

【Spark】配置Spark源码阅读环境

Scala构建工具(SBT)的使用 SBT介绍 SBT是Simple Build Tool的简称,如果读者使用过Maven,那么可以简单将SBT看做是Scala世界的Maven,虽然二者各有优劣,但完成的工作基本是类似的. 虽然Maven同样可以管理Scala项目的依赖并进行构建,但SBT的某些特性却让人如此着迷,比如: 使用Scala作为DSL来定义build文件(one language rules them all); 通过触发执行(trigger execution)特性支持持续的编译与

Spark源码阅读笔记之Broadcast(一)

Spark源码阅读笔记之Broadcast(一) Spark会序列化在各个任务上使用到的变量,然后传递到Executor中,由于Executor中得到的只是变量的拷贝,因此对变量的改变只在该Executor有效.序列化后的任务的大小是有限制的(由spark.akka.frameSize决定,值为其减去200K,默认为10M-200K),spark会进行检查,超出该限制的任务会被抛弃.因此,对于需要共享比较大的数据时,需要使用Broadcast. Spark实现了两种传输Broadcast的机制:

第3课 Scala函数式编程彻底精通及Spark源码阅读笔记

本课内容: 1:scala中函数式编程彻底详解 2:Spark源码中的scala函数式编程 3:案例和作业 函数式编程开始: def fun1(name: String){ println(name) } //将函数名赋值给一个变量,那么这个变量就是一个函数了. val fun1_v = fun1_ 访问 fun1_v("Scala") 结果:Scala 匿名函数:参数名称用 => 指向函数体 val fun2=(content: String) => println(co

Spark修炼之道(高级篇)——Spark源码阅读:第三节 Spark Job的提交

前一我们分析了SparkContext的创建,这一节,我们介绍在RDD执行的时候,如何提交job进行分析,同样是下面的源码: import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String]) { if (args.length == 0) { System.err.println("Usage: SparkWordCount <inputfile>

spark源码阅读 RDDs

RDDs弹性分布式数据集 spark就是实现了RDDs编程模型的集群计算平台.有很多RDDs的介绍,这里就不仔细说了,这儿主要看源码. abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { 相关类 Dependency 宽依赖和窄依

spark源码阅读--shuffle过程分析

ShuffleManager(一) 本篇,我们来看一下spark内核中另一个重要的模块,Shuffle管理器ShuffleManager.shuffle可以说是分布式计算中最重要的一个概念了,数据的join,聚合去重等操作都需要这个步骤.另一方面,spark之所以比mapReduce的性能高其中一个主要的原因就是对shuffle过程的优化,一方面spark的shuffle过程更好地利用内存(也就是我们前面在分析内存管理时所说的执行内存),另一方面对于shuffle过程中溢写的磁盘文件归并排序和引

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括: 类:org.apache.spark.deploy.master.Master 说明:负责整个集群的资源调度及Application的管理. 消息类型: 接收Worker发送的消息 1. RegisterWorker 2. ExecutorStateChanged 3. WorkerSchedulerStateResponse 4. Heartbeat 向Worker发送的消息 1. Registered