spark 笔记 13: 再看DAGScheduler,stage状态更新流程

当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程。

=======================DAG task完成后的更新流程===================

->CoarseGrainedSchedulerBackend::receiveWithLogging  --调度器的事件接收器

->case StatusUpdate(executorId, taskId, state, data) --状态更新事件(来源于CoarseGrainedExecutorBackend)

->scheduler.statusUpdate(taskId, state, data.value) --状态更新

->taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) --将成功的时间封装到一个executor排队执行

->getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {

->val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { --反序列化结果

->scheduler.handleSuccessfulTask(taskSetManager, tid, result) --处理成功的task

->taskSetManager.handleSuccessfulTask(tid, taskResult)

-> sched.dagScheduler.taskEnded(tasks(index) ... result.metrics)  --另起一段

->maybeFinishTaskSet()  --判断是否taskSet结束了,更新状态。注意:在DAG里,调度的粒度是taskSet。

->sched.taskSetFinished(this)  --如果taskSet结束了,更新DAG的这个调度单元

->activeTaskSets -= manager.taskSet.id  --从active taskSet中删除tid

->manager.parent.removeSchedulable(manager)

->schedulableQueue.remove(schedulable)  --从调度队列中删除tid

->schedulableNameToSchedulable.remove(schedulable.name)  --删除调度单元。

->makeOffers(executorId) --将这个executorId分配给其他task使用

->DAGScheduler::taskEnded  --任务结束事件处理流程

->eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)

->def receive

->case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics)

->dagScheduler.handleTaskCompletion(completion)  --Responds to a task finishing.

//This is called inside the event loop so it assumes that it can modify the scheduler‘s internal state

->event.reason match => case Success =>  --task结果是成功的

->if (event.accumUpdates != null)   --如果是状态更新

->event.accumUpdates.foreach { case (id, partialValue)  --更新状态

->listenerBus.post(SparkListenerTaskEnd(...)) --通知listener任务结束

->stage.pendingTasks -= task

->task match {

->case rt: ResultTask[_, _] =>  --如果是ResultTask

->if (job.numFinished == job.numPartitions)  --如果所有的分片数据都完成

->markStageAsFinished(stage) --那么这个Stage就是结束了

->runningStages -= stage --从running状态中删除

->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知Stage结束

->cleanupStateForJobAndIndependentStages(job) --清除依赖关系

->val registeredStages = jobIdToStageIds.get(job.jobId) --找到这个job对应的所有Stage(job对应多个stage)

->stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach

//查找所有stage,找出注册了依赖于这个job所在stage的。

->case (stageId, stage) =>

->val jobSet = stage.jobIds

->if (!jobSet.contains(job.jobId)) --这些存在依赖的stage中,应该包含这个job的注册

->logError("Job %d not registered for stage %d even though that stage was registered for the job"

.format(job.jobId, stageId))

->if (jobSet.isEmpty)  // no other job needs this stage 没有其他job了,这个依赖的stage也结束了。

-> removeStage(stageId) --删除stage

->listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) --通知job结束

->job.listener.taskSucceeded(rt.outputId, event.result) --通知task成功

->case smt: ShuffleMapTask =>  --如果是shuffleMapTask

->if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) --如果stage的所有task都完成

->markStageAsFinished(stage) --标志stage完成

->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知stage完成

->logInfo("looking for newly runnable stages") --stage完成了,意味着依赖这个stage的stage可以执行了

->mapOutputTracker.registerMapOutputs --(?用处不明)

->clearCacheLocs()

->if (stage.outputLocs.exists(_ == Nil)) // Some tasks had failed; let‘s resubmit this stage

->submitStage(stage)

->else

->val newlyRunnable = new ArrayBuffer[Stage]

-> for (stage <- waitingStages if getMissingParentStages(stage) == Nil) 如果一个stage没有依赖其他stage

->newlyRunnable += stage --这个没有依赖的stage就可以执行了

->waitingStages --= newlyRunnable

->runningStages ++= newlyRunnable

->for {stage <- newlyRunnable.sortBy(_.id); jobId <- activeJobForStage(stage)}

->submitMissingTasks(stage, jobId) --将这些没有依赖的stage的所有active job提交执行

->submitWaitingStages() --//Check for waiting or failed stages which are now eligible for resubmission.

//Ordinarily run on every iteration of the event loop. 每个事件处理都会触发去检查waiting状态的stage是否能够执行了。

->logTrace("Checking for newly runnable parent stages")

->waitingStages.clear()

->for (stage <- waitingStagesCopy.sortBy(_.jobId))

->submitStage(stage)

========================end================================

来自为知笔记(Wiz)

时间: 2024-08-02 15:59:02

spark 笔记 13: 再看DAGScheduler,stage状态更新流程的相关文章

第13章 MySQL服务器的状态--高性能MySQL学习笔记

13.1 系统变量 -- 服务器配置变量 MySQL通过SHOW VARIABLES  SQL命令显示许多系统变量. 13.2 状态变量--SHOW STATUS SHOW STATUS 命令会在一个由两列(名称/值)组成的表格里显示服务器状态变量.这些变量都是只读的. SHOW STATUS默认显示会话变量,SHOW GLOBAL STATUS显示全局变量. 也可以从INFORMATION_SCHEMA.GLOBAL_STATUS和INFORMATION_SCHEMA.SESSION_STAT

spark 笔记 10: TaskScheduler相关

任务调度器的接口类.应用程序可以定制自己的调度器来执行.当前spark只实现了一个任务调度器TaskSchedulerImpl ===================task scheduler begin==================== -> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作. -> new TaskSetManager(this, taskSet, maxTaskFailures)

spark 笔记 2: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf ucb关于spark的论文,对spark中核心组件RDD最原始.本质的理解,没有比这个更好的资料了.必读. Abstract RDDs provide a restricted form of shared memory, based on coarse grained transformations rather than fine-grained updates to s

再看Ajax

再回顾Ajax相关的内容,再次梳理学习还是很有必要的,尤其是实际的开发中,ajax更是必不可少,仔细学习以便避免不必要的错误. 文章导读: --1.使用XMLHttpRequest---------- 1.1 必备知识点 1.2 send()方法 1.3  再看CORS --2.HTTP请求和响应---------------- 2.1 Request Header中的参数 2.2 Response Header中的参数 2.3 GET请求和POST请求的区别 --3.jQuery中的Ajax-

操作系统概念学习笔记 13 死锁(一)

操作系统概念学习笔记 13 死锁(一) 所有申请的资源都被其他等待进程占有,那么该等待进程有可能在无法改变其状态,这种情况称为死锁(deadlock). 系统模型 进程在使用资源之前必须先申请资源,在使用资源之后要释放资源.进程所申请的资源数量不能超过系统所有资源的总量. 在正常操作模式下,进程只能按如下顺序使用资源: ①申请:如果申请不能立即被允许,那么申请进程必须等待,直到它获得该资源为止. ②使用:进程对资源进行操作. ③释放:进程释放资源 资源的申请与释放为系统调用.其他资源的申请与释放

《30天自制操作系统》笔记(13)——总结

<30天自制操作系统>笔记(13)——总结 进度回顾 上一篇介绍了操作系统实现多任务的方法.操作系统利用CPU的far模式的JMP指令.寄存器TR.GDT.TSS和PIT中断这些功能实现了多任务,可见CPU在设计时就考虑到了计算机要具有多任务处理的能力.也就是说,CPU.PIC等硬件支持什么功能,操作系统才能实现什么功能. 至此全书已经读了一半.我发现后半部分读不下去,也没必要再读了.本篇就对所有的笔记做一总结,至此<30天自制操作系统>这本书就暂且不读了. 所学所感 我们可以把C

HTML&CSS基础学习笔记13—无序列表

无序列表 有时我们的工作繁忙,杂事很多,怕忘记,就会把事情一件件列出来,防止忘记. 它们的排列顺序对于我们来说并不重要,可以随意调换,我们将它称为无序列表,HTML里用<ul>标签来表示无序列表,列表里的项目则用<li>标签来表示: 1 2 3 4 5 <ul>     <li></li>     <li></li>     ... </ul> 看一段实例代码: 对于的浏览器显示结果是这样的: 更多内容学习,请

再看GS接包过程

再看GS接包过程 bool GameServer::ProcessLoop(packet& rPkt) { if(false == m_spDataLayer->Recv(rPkt)) return true;//没数据了 if(rPkt.is_data) { if(!rPkt.data) return false; GameChannel* pGC = m_vecChannel[rPkt.channel_id]; if(pGC) pGC->OnReceiveData(rPkt.dat

python 学习笔记 13 -- 常用的时间模块之time

Python 没有包含对应日期和时间的内置类型,不过提供了3个相应的模块,可以采用多种表示管理日期和时间值: *    time 模块由底层C库提供与时间相关的函数.它包含一些函数用于获取时钟时间和处理器的运行时间,还提供了基本解析和字符串格式化工具 *    datetime 模块为日期.时间以及日期时间值提供一个更高层接口.datetime 中的类支持算术.比较和时区配置. *    calendar 模块可以创建周.月和年的格式化表示.它还可以用来计算重复事件.给定日期是星期几,以及其他基