Spark分析之DAGScheduler

DAGScheduler的主要功能
1、接收用户提交的job;
2、将job根据类型划分为不同的stage,并在每一个stage内产生一系列的task,并封装成TaskSet;
3、向TaskScheduler提交TaskSet;

以如下示例描述Job提交过程:

val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
val textFile = sc.textFile("xxx")
val result = textFile.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey(_ + _)
result.collect

RDD.collect

  ==>sc.runJob                  ##########################至此完成了将RDD提交DAGScheduler########################

    ==>dagScheduler.runJob

      ==>dagScheduler.submitJob

        ==>eventProcessActor ! JobSubmitted

def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal...) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal...)
}

//完成job到stage的转换,生成finalStage并提交
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean...){
     //注意:该RDD是final RDD,而不是一系列的RDD,用finalRDD来创建finalStage
     //newStage操作对应会生成新的result stage或者shuffle stage:内部有一个isShuffleMap变量来标识该stage是shuffle or result
     var finalStage: Stage = newStage(rdd, partitions.size, None, jobId, Some(callSite))

    //使用finalStage来构建job
    val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

    //对于简单的job,没有依赖关系并且只有一个partition,该类job会使用local thread处理而并非提交到TaskScheduler上处理
    if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
        runLocally(job)
    } else {
        submitStage(finalStage)
    }
}

handleJobSubmitted方法完成了job到stage的转换,生成finalStage;每个job都有一个finalStage。

newStage()方法分析:根据finalRDD生成finalStage

private def newStage(
      rdd: RDD[_],  numTasks: Int,     //task个数就是partitions个数
      shuffleDep: Option[ShuffleDependency[_,_]],
      jobId: Int, callSite: Option[String] = None) : Stage = {
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
   ......
}

private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_,_] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              visit(dep.rdd)
          }
        }
      }
    }
    visit(rdd)
    parents.toList
}

private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage
      case None =>
        val stage =
          newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        stage
    }

newStage()后产生的finalStage中已经包含了该stage的所有依赖的父Stage;通过getParentStages()方法构建该stage的依赖关系;

生成stage实例,stage的id通过nextStageId的值加一得到,task的个数就是partitions的个数;

有两种类型的Stage:ShuffleStage和ResultStage;

Stage内部有一个isShuffleMap变量标识该Stage是shuffle还是result类型;

Spark对stage的划分是按照宽依赖来进行区分的:根据RDD的依赖关系,如果遇到宽依赖则创建ShuffleStage;

submitStage()方法分析:计算stage之间的依赖关系(Stage DAG)并对依赖关系进行处理

private def submitStage(stage: Stage) {
 if (!waiting(stage) && !running(stage) && !failed(stage)) {
  val missing = getMissingParentStages(stage).sortBy(_.id)  //根据final stage发现是否有parent stage
  if (missing == Nil) { // 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
   submitMissingTasks(stage, jobId.get)
   running += stage //设置当前的stage为running,因为当前的stage没有未处理完的依赖的stage
  } else { //如果有parent stage,需要先submit parent, 因为stage之间需要顺序执行
   for (parent <- missing) {
    submitStage(parent)
   }
   waiting += stage   //当前stage放入到waiting列表中,表示该stage需要等待parent先执行完成
  }
 }
}

//根据final stage的parents找出所有的parent stage
private def getMissingParentStages(stage: Stage): List[Stage] = {
 ......
 dep match {
  //如果是ShuffleDependency,则新建一个shuffle map stage,且该stage是可用的话则加入missing中
  case shufDep: ShuffleDependency[_,_] =>  //ShuffleDependecy
   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
   if (!mapStage.isAvailable) {
    missing += mapStage
   }
  case narrowDep: NarrowDependency[_] =>  //NarrowDependecy
   visit(narrowDep.rdd)
 }
}

getMissParentStages(stage)处理步骤:

1、根据该stage得到该stage的parent,也就是RDD的依赖关系,生成parentStage是通过RDD的dependencies;

2、如果依赖关系是宽依赖,则生成一个mapStage来作为finalStage的parent;也就是说对于需要shuffle操作的job,会生成mapStage和finalStage进行处理

3、如果依赖关系是窄依赖,不会生成新的stage。也就是说对于不需要shuffle的job只需要一个finalStage;

注意:getMissParentStages(stage)得到的结果集是按照stageid的降序排列的

submitStage()处理步骤:

1、计算该stage的getMissParentStages(),如果当前stage没有任何依赖或者所有的依赖都已执行完,则提交该stage;

2、如果发现该stage有依赖的stage未执行,则先执行完所有依赖的父stage(根据getMissParentStages()方法得到的结果集降序来执行stage);

submitMissingTasks()方法分析:把stage根据parition拆分成task生成TaskSet,并提交到TaskScheduler

private def submitMissingTasks(stage: Stage, jobId: Int) {
 //首先根据stage所依赖的RDD的partition的分布,会产生出与partition数量相等的task
 var tasks = ArrayBuffer[Task[_]]()

 //对于finalStage或是mapStage会产生不同的task。
 //检查该stage时是否ShuffleMap,如果是则生成ShuffleMapTask
 if (stage.isShuffleMap) { //mapStage:表示还有其他stage依赖此stage
  for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
   //task根据partition的locality进行分布
   val locs = getPreferredLocs(stage.rdd, p)
   tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
  }
 } else { //finalStage:该类型stage直接输出结果生成ResultTask
  val job = resultStageToJob(stage)
  for (id <- 0 until job.numPartitions if !job.finished(id)) {
   val partition = job.partitions(id)
   val locs = getPreferredLocs(stage.rdd, partition)
   //由于是ResultTask,因此需要传入定义的func,也就是如果处理结果返回
   tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
  }
 }
 //向TaskSchuduler提交任务,以stage为单位,一个stage对应一个TaskSet
 taskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
}

submitMissingTask()方法的处理步骤:

1、通过stage.isShuffleMap来决定生成的是ShuffleMapTask还是ResultTask;

2、如果是ShuffleMapTask则根据stage所依赖的RDD的partition分布,产生和partition数量相同的task,这些task根据partition的locality进行分布’

3、把stage对应生成所有的task封装到一个TaskSet中,提交给TaskScheduler的submitTasks()方法进行调度;

##########################至此完成了DAGScheduler提交TaskSet到TaskSchuduler########################

Spark分析之DAGScheduler,布布扣,bubuko.com

时间: 2024-10-13 09:42:54

Spark分析之DAGScheduler的相关文章

Spark分析之Job Scheduling Process

经过前面文章的SparkContext.DAGScheduler.TaskScheduler分析,再从总体上了解Spark Job的调度流程 1.SparkContext将job的RDD DAG图提交给DAGScheduler: 2.DAGScheduler将job分解成Stage DAG,将每个Stage的Task封装成TaskSet提交给TaskScheduler:窄依赖以pipeline方式执行,效率高: 3.TaskScheduler将TaskSet中的一个个Task提交到集群中去运行:

Spark分析之Master

override def preStart() { logInfo("Starting Spark master at " + masterUrl) webUi.bind() //绑定WEBUI masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIM

Spark分析之Master、Worker以及Application三者之间如何建立连接

Master.preStart(){ webUi.bind() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除 case CheckForWorkerTimeOut => { timeOutDeadWorkers() } /** Check for, and remove, any timed-out

Spark分析之Worker

override def preStart() { webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.bind() //创建并绑定UI registerWithMaster() //注册到Master } def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + master

Spark分析之Standalone运行过程分析

一.集群启动过程--启动Master $SPARK_HOME/sbin/start-master.sh start-master.sh脚本关键内容: spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT 日志信息:$SPARK_HOME/logs/ 14/0

Spark分析之Dependency

在Spark中,每一个RDD是对于数据集在某一状态下的表现形式,比如说:map.filter.group by等都算一次操作,这个状态有可能是从前一状态转换而来的: 因此换句话说一个RDD可能与之前的RDD(s)有依赖关系:RDD之间存在依赖关系: 根据依赖关系的不同,可以将RDD分成两种不同的类型:宽依赖和窄依赖. 窄依赖:一个父RDD的partition至多被子RDD的某个partition使用一次: 宽依赖:一个父RDD的partition会被子RDD的partition使用多次,需要sh

[大数据从入门到放弃系列教程]第一个spark分析程序

文章施工中,由于部分网站会在我还没有写完就抓取到这篇文章,导致你看到的内容不完整,请点击这里: 或者复制访问 http://www.cnblogs.com/blog5277/p/8580007.html 来查看更完整的内容 [大数据从入门到放弃系列教程]第一个spark分析程序 原文链接:http://www.cnblogs.com/blog5277/p/8580007.html 原文作者:博客园--曲高终和寡 *********************分割线******************

spark 笔记 7: DAGScheduler

在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的.这是一个很重要的类.在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了.另外,应该先看看DAG相关的概念

Spark系列(九)DAGScheduler工作原理

以wordcount为示例进行深入分析 1  object wordcount { 2    3    def main(args: Array[String]) { 4      val conf = new SparkConf() 5      conf.setAppName("wordcount").setMaster("local") 6    7      val sc = new SparkContext(conf) 8      // 产生Hadoop