一个Spark job的生命历程

一个job的生命历程

dagScheduler.runJob //(1)
--> submitJob ( eventProcessLoop.post(JobSubmitted,***) //(2)
    --> eventProcessLoop //(3)
        --> onReceive(event: DAGSchedulerEvent) //(4)
            --> doOnReceive(event: DAGSchedulerEvent) //(5)
                --> case JobSubmitted //(6)
                    --> dagScheduler.handleJobSubmitted //(7)
                        --> finalStage =createResultStage(finalRDD, func, partitions, jobId, callSite) //(8)
                        --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9)
                        --> jobIdToActiveJob(jobId) = job //(10)
                        --> activeJobs += job //(11)
                        --> finalStage.setActiveJob(job) //(12)
                        --> stageIds = jobIdToStageIds(jobId).toArray //(13)
                        --> stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //(14)
                        --> listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //(15)
                        --> submitStage(finalStage) //(16)
                            --> getMissingParentStages(stage).sortBy(_.id) //(17)
                                --> finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18)
                                    --> createShuffleMapStage(dep, firstJobId) //(19)
                                        -->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
                                --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20)
                                --> submitStage(finalStage)  //(21)//划分和提交stage算法精髓
                                    --> submitMissingTasks(stage, jobId.get) //(22)
                                        --> submitWaitingChildStages(stage) //(23)
                                --> markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))  //(24)

(1)所有的action算子都会触发一个job的调度,经过多次不同的runjob重载后停在这里调度 submitJob

(2)调用eventProcessLoop方法,并发送 JobSubmitted 消息给DAGSchedulerEventProcessLoop(DAGScheduler的循环响应函数体)

(3)eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

(4)onReceive 函数是接受 DAGSchedulerEventProcessLoop DAG调度程序的事件接受函数

(5)doOnReceive 实际是步骤4的事件处理函数

(6)根据步骤2的发送事件,触发 JobSubmitted 这个事件响应

(7)dagScheduler 的核心入口

(8)使用触发的job的最后一个RDD创建一个 finalstage,并且放入内存缓存中 stageIdToStage

(9)使用 finalStage 创建一个job。这个job最后一个stage就是final stage

(10)(11)(12)(13)(14)(15)把 job 加入各种内存缓存中,其实就是各个数据结构

(16)提交fianlStage。总是从最后开始往前推测。

(17)获取当前stage的父stage。stage的划分算法,主要在这里。waitingForVisit = new Stack[RDD[_]]。栈结构,从最后的stage往前的stage 放进栈中,实现先进后出。符合程序调用顺序。

(18)获取最后一个stage,finalstage

(19)生成一个 ShuffleMapStage

(20)利用finalestage 生成一个job

(21)划分和提交stage算法精髓,划分好stage之后全部放在waiting stage 数据结构中

(22)提交所有在 waiting stage 中的stage,从stage0...finalstage

(23)检查等待的阶段,现在有资格重新提交。提交依赖于给定父级阶段的阶段。当父阶段完成时调用成功

(24)所有的stage划分完并提交结束

------------------------------------------------------------------------------

stage划分算法非常重要,精通spark,必须对stage划分算法很清晰,知道自己编写的spark程序被划分为几个job,每个job被划分为几个stage,

每个stage包含了哪些代码,只有知道每个stage包括哪些代码后。在线上,如果发现某个stage执行特别慢,或者某个stage一直报错,才能针对

特定的stage包含的代码排查问题,或性能调优。

stage划分算法总结:

1.从finalstage倒推(通过 栈 数据结构实现)

2.通过宽依赖,进行stage的划分

3.通过递归,优先提交父stage

------------------------------------------------------------------------------

/**
* 获取某个stage的父stage
* 对于一个stage,如果它的最后一个RDD的所有依赖都是窄依赖,将不会创建新的stage
* 如果其RDD会依赖某个RDD,用宽依赖的RDD创建一个新的stage,并立即返回这个stage
* @type {[type]}
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]

    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
            //遍历RDD的依赖,对于每种具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底层对应了3个RDD:
            //Map
          for (dep <- rdd.dependencies) {
            dep match {
                //如果是宽依赖
              case shufDep: ShuffleDependency[_, _, _] =>
                  //使用宽依赖的RDD创建一个 ShuffleMapStage,并且将isShuffleMap 设置为true,
                  //默认最后一个stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }

                //如果是窄依赖
              case narrowDep: NarrowDependency[_] =>
              //将依赖的RDD放入栈中
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    //
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
    //
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

-------------------------------------------------------------------------------------------------------------------------------

taskScheduler

-->taskSchedulerImpl (standalone模式)

-->SparkDeploySchedulerBackend (负责创建AppClient, 向master注册Application)

在TaskSchedulerImpl中,对一个单独的taskset的任务进行调度.这个类负责追踪每一个taskset,如果task失败的话

会负责重试spark,直到超过重试次数,并且会通知延迟调度,为这个taskSet处理本地化机制.它的主要接口是

resourceOffer,在这个接口中,taskset会希望在一个节点上运行一个任务,并且接受任务的状态改变消息,

来知道它负责的task的状态改变了.

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks //获取ttaskSet的task列表
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //每个taskSet都会创建一个manager,用于管理每个taskSet,并设定最大失败次数 maxTaskFailures
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      //尝试连接task,如果task失败,会负责重试spark,直到超过重试次数,并且会通知延迟调度
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      //通过 manager 获得活着的taskSet
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //利用已选择的调度器schedulableBuilder,把一个taskSet的manager加入调度管理池中
      /*
      def initialize(backend: SchedulerBackend) {
        this.backend = backend
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
              s"$schedulingMode")
          }
        }
        schedulableBuilder.buildPools()
      }*/
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    /**
      * 创建 taskScheduler 的时候,就是为 taskSchedulerImpl 创建一个 SparkDeploySchedulerBackend .
      * 它负责创建AppClient,向master注册Application
      */
    backend.reviveOffers()
  }

原文地址:https://www.cnblogs.com/liangjf/p/8322420.html

时间: 2024-10-30 23:49:45

一个Spark job的生命历程的相关文章

一个MFC程序的生命历程

[程序的诞生] Application object 产生,内存于是获得配置,初值也设立了. AfxWinMain 执行 AfxWinInit,后者又调用AfxInitThread,把消息队列尽量加大到96. AfxWinMain 执行 InitApplication.这是CWinApp的虚函数,但我们通常不改写它. AfxWinMain 执行 InitInstance.这是CWinApp的虚函数,我们必须改写它. CMyWinApp::InitInstance 'new' 了一个 CMyFra

连载《一个程序猿的生命周期》-《发展篇》- 12.向生活妥协的选择之路,你也面临吗?

本篇文章的主角是第二个加入我们团队的,暂且称他为G兄.是我第二家公司的同事,但是当时并没有交集,后来经过其他同事说起,被我招过来的.关于第二家公司的情况,请参见<而立之年,第一次跳槽,寻求转型> 在加入我们团队之前,G兄在一个不大不小的公司做内部OA系统,众所周知不会有什么太大发展,他当时也不太满意.在和他交流的过程中,我说的很直接:1.开发公司内部OA,并非公司实际产品,无法直接创造利润,就算是公司的产品,现在做OA的多了去了.2.OA开发完成后,只剩运维人员,假设裁掉一部分人员的话,你怎么

连载《一个程序猿的生命周期》-《发展篇》- 14.天要下雨,娘要嫁人,由他去吧。

1971年,林彪乘坐军用飞机向中蒙边境飞去,周恩来将情况汇报给毛主席,请示是否用导弹将飞机打下来.毛主席没有说话,过了很久,长叹一口气,说:"天要下雨,娘要嫁人,由他去吧".随马云一起创业不在少数,最终离开的人也不在少数,马云说聪明的人都离开了阿里. 我在忙的时候,G兄又发来一条消息,说:强哥,我打算辞职了,想跟您谈谈.其实有什么可谈的,俗话说事不过三,前两次已经挽留了,也表示了诚意,即使我这再困难也不会有任何谈的意义了.出于礼貌象征性的交流了一下,他表示:**公司来电话了,月薪2万,

连载《一个程序猿的生命周期》-《发展篇》- 11.在麻木中寻找“源动力”

公司全体人员在南戴河召开了半年会,原董事长在大会上一再强调"求生存"(尽管取得了不错的成绩).对此,我有很强共鸣,这10多年走过来始终有一种危机感,直到现在也一刻不敢放松.强烈的求生欲望,不断的在思考发展方向,所以<一个程序猿的生命周期>第一册的命名为<生存篇>(下载). 我时常在问自己一个问题:我的源动力是什么?是否就是董事长说的"求生存"?有答案嘛?也许有吧!!! 我差不多将近一个月的时间就回山里老家一次,尽管人丁越来越少,但是仍然那么亲

连载《一个程序猿的生命周期》-《发展篇》 - 10.欠薪的高薪和稳定的相对低薪,你会选择哪个?

注:看本篇文章前,请先看<发展的路上,艰难做出抉择> 在现在的公司做工业(大)数据平台,刚开始来就我一个人,算是总体牵头人或是负责人吧.肯定是有压力,但是经过一年的努力,基础框架已经基本建好.数据链路已经打通.现在6个人的团体,总体来讲比较满意,2个80后(其中一个是89年).4个90后,已经度过了磨合期,开始走向正轨,从长远来看仍然需要扩充人员.尽管有工作压力,领导也表示放开招人,但是在招聘的过程中也是本着符合价值的基本原则,并没有一味的高薪招揽人员. 团队里只有一个人是通过社会招聘进来的,

连载《一个程序猿的生命周期》-《发展篇》 - 9.赌局结束了,一个时代的结束

注:看本篇文章前,先看<中层管理危机>和<用"厚道"向对"操蛋"的社会> 第一家公司分裂成了三个公司,从那走后一直与R总(原直接领导)和W总(原总经理)保持着联系,在他们之间也保持着利益平衡(也是生活压力所迫),相互之间保持着一种微妙的状态和关系,大家也清楚是怎么回事.我深知这种状态不会维持太长时间,现在只是偶尔联系,没有了利益关系. 前些天采摘西瓜,回来给R总送去些(他自己来家拉的),每年如此.路旁短暂交流最近的个人的情况.公司的情况.项目

连载《一个程序猿的生命周期》-《发展篇》 - 5.奶奶终于“自杀”了

打我记事起,奶奶裹过的小脚有点外八字,走起路来颤颤巍巍,但是又很利索,身体干瘦如柴,面部棱角清晰可见,头发黑白相间.妈妈说起爷爷.奶奶来,一顿牢骚,叙述各种遭遇.各种不好.慢慢的长大后,妈妈说的都被被一一印证了,面对这种事实,我心理多少有些埋怨.但是上初中后,我的思想转变了,爷爷奶奶还能活多久呢,过去的就让它过去吧. 正常的去看往他们,有时劝劝父母,多少起了一些作用,母亲对他们还说的过去,一直到他们离开这个世界.上班之后,每次回老家都去他们那转一圈,有了孩子之后,带孩子一起去.不知道从什么时候,

连载《一个程序猿的生命周期》- 第1册《生存》篇全集 下载

    有网友问是不是以后就不更新了?还会继续更新的,平时在不断的学习和实践,想写的东西还很多.只是现在这个阶段“没办法”写出来,写的太客套.理论的话,显得有点心灵鸡汤,谁喝多了也会腻:可是又没有办法写的太具体.有事件.有感悟的文章是最理想的,所谓的有血有肉. 对未来充满信心,所有还会继续写下去.希望能够影响更多的人!!!! 目       录 前言 3 第一章 猿人出山,坎坷前行 5 第1节 一贫如洗的家境 5 第2节 从大山走出的程序猿 7 第3节 城市校园生活 8 第4节 父亲下岗和我的电

读《一个程序猿的生命周期》有感

<一个程序猿的生命周期>中的作者来自贫困家庭,可以说是一个从大山里走出来的人.他深知,只有刻苦学习才能改变现状,所以他学习非常努力. 但人生总不是这样一帆风顺的,总会在我们的人生路上给与我们一些打击.意想不到的事情发生了,作者的母亲生病了失去了劳动能力.更让人意想不到的 事情也发生了,作者的父亲下岗了失去了工作岗位.也就是说,作者一下子成为了家里的支撑. 当时父亲下岗后领到了6000元的血汗钱,作者顶着巨大的压力拿了4500去买了一台电脑.因为作者想的还是比较长远的,没有电脑他又怎么能学好自