spark--job和DAGScheduler源码

一个job对应一个action操作,action执行会有先后顺序;

每个job执行会先构建一个DAG路径,一个job会含有多个stage,主要逻辑在DAGScheduler。

spark提交job的源码见(SparkContext.scala的runJob方法):

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

DAGScheduler--job调度的核心入口:

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {//创建finalStage
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.//创建一个stage对象,并且将stage加入到DAGScheduler内存缓存中
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
//创建job
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()//将job加入到内存缓存中
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
   //使用submitStage() 方法提交finalStage   submitStage(finalStage)
  }

原文地址:https://www.cnblogs.com/parent-absent-son/p/11747750.html

时间: 2024-10-19 11:28:45

spark--job和DAGScheduler源码的相关文章

【Spark】DAGScheduler源码浅析2

引入 上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的Spark源码分析 – DAGScheduler一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数. 事件处理 在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理.在Spark 1.0源码

精通Spark:Spark内核剖析、源码解读、性能优化和商业案例实战

这是世界上第一个Spark内核高端课程: 1, 该课程在对Spark的13个不同版本源码彻底研究基础之上提炼而成: 2, 课程涵盖Spark所有内核精髓的剖析: 3, 课程中有大量的核心源码解读: 4, 全景展示Spark商业案例下规划.部署.开发.管理技术: 5, 涵盖Spark核心优化技巧 该课程是Spark的高端课程,其前置课程是“18小时内掌握Spark:把云计算大数据速度提高100倍以上!”. 培训对象 1,  系统架构师.系统分析师.高级程序员.资深开发人员: 2, 牵涉到大数据处理

spark streaming task 序列化源码

spark streaming task 序列化源码 1.入口 val kafkaStreams = (1 to recerverNum).map { i => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )} val unifiedStream = ssc.union(kafkaStreams) val u

Scala 深入浅出实战经典 第48讲:Scala类型约束代码实战及其在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-64讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2 技术爱好者尤其是大数据爱好者 可以加DT大数据梦工厂的qq群 DT大数据梦工厂① :462923555 DT大数据梦工厂②:437123764 DT大数据梦工厂③

68:Scala并发编程原生线程Actor、Cass Class下的消息传递和偏函数实战解析及其在Spark中的应用源码解析

今天给大家带来的是王家林老师的scala编程讲座的第68讲:Scala并发编程原生线程Actor.Cass Class下的消息传递和偏函数实战解析 昨天讲了Actor的匿名Actor及消息传递,那么我们今天来看一下原生线程Actor及CassClass下的消息传递,让我们从代码出发: case class Person(name:String,age:Int)//定义cass Class class HelloActor extends Actor{//预定义一个Actor  def act()

Scala 深入浅出实战经典 第60讲:Scala中隐式参数实战详解以及在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/IVN4EuFlmKk/优酷:http://v.youku.com/v_show/id_

Spark 1.0.1源码安装

apache 网站上面已经有了已经构建好了的版本,我这里还是自己利用午休时间重新构建一下(jdk,python,scala的安装就省略了,自己可以去安装) http://www.apache.org/dist/spark/spark-1.0.1/ 具体官网的下载链接可以去这里 我下载的是http://www.apache.org/dist/spark/spark-1.0.1/spark-1.0.1.tgz源码包 下载对应的linux服务器上面,然后解压 wget http://www.apach

Spark MLlib之线性回归源码分析

1.理论基础 线性回归(Linear Regression)问题属于监督学习(Supervised Learning)范畴,又称分类(Classification)或归纳学习(Inductive Learning);这类分析中训练数据集中给出的数据类标是确定的:机器学习的目标是,对于给定的一个训练数据集,通过不断的分析和学习产生一个联系属性集合和类标集合的分类函数(Classification Function)或预测函数(Prediction Function),这个函数称为分类模型(Clas

Scala 深入浅出实战经典 第65讲:Scala中隐式转换内幕揭秘、最佳实践及其在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/NGgUD5FBQaA/优酷:http://v.youku.com/v_show/id_