【Spark Core】TaskScheduler源码与任务提交原理浅析1

引言

上一节《Stage生成和Stage源码浅析》中,我介绍了Stage生成划分到提交Stage的过程,分析最终归结到submitStage的递归提交Stage,其中要通过submitMissingTasks函数创建task集合来实现任务的创建和分发。

在接下来的几篇文章中,我将具体介绍一下任务创建和分发的过程,为了让逻辑更加清楚,我将分成几篇文章进行介绍,好保证简明清晰,逻辑连贯,前后统一。

TaskScheduler介绍

TaskScheduler的主要任务是提交taskset到集群运算并汇报结果。

具体而言:

* 出现shuffle输出lost要报告fetch failed错误

* 碰到straggle任务需要放到别的节点上重试

* 为每个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)

TaskScheduler创建

《SparkContext源码解读》一文中,我介绍了在SparkContext初始化时创建TaskScheduler和DAGScheduler。这里具体描述一下其创建过程。

SparkContext创建过程中会调用createTaskScheduler函数来启动TaskScheduler任务调度器:

  // Create and start the scheduler
  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

createTaskScheduler函数中,TaskScheduler会根据部署方式而选择不同的SchedulerBackend来处理.

针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合:

  • Local模式:TaskSchedulerImpl + LocalBackend
  • Spark集群模式:TaskSchedulerImpl + SparkDepolySchedulerBackend
  • Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend
  • Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend
  /**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    // Regular expression used for local[N] and local[*] master formats
    val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
    // Regular expression for local[N, maxRetries], used in tests with failing tasks
    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
    // Regular expression for connecting to Spark deploy clusters
    val SPARK_REGEX = """spark://(.*)""".r
    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
    val MESOS_REGEX = """(mesos|zk)://.*""".r
    // Regular expression for connection to Simr cluster
    val SIMR_REGEX = """simr://(.*)""".r

    // When running locally, don‘t try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        ...

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        ...

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

        .....

以Standalone模式为例,backend根据不同的部署方式实例化,后又作为scheduler对象的一个成员变量对scheduler调用initialize函数:

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的关系

TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。

下图描述了TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的UML关系,其中TaskSchedulerImpl是task schduler的具体实现,其中混入了TaskScheduler特质,而SparkDeploySchedulerBackend等具体的资源收集类继承自CoarseGrainedSchedulerBackend这一父类,而CoarseGrainedSchedulerBackend混入了SchedulerBackend特质:

这里还是以Spark Standalone集群模式为例,分析TaskSchedulerImpl与SparkDepolySchedulerBackend类中的具体操作。

  • 资源信息收集

    SparkDepolySchedulerBackend类就是专门负责收集Worker的资源信息,在它的父类CoarseGrainedSchedulerBackend中的DriverActor就是与Worker通信的Actor。

    Worker启动后会向Driver发送RegisterExecutor消息,此消息中就包含了Executor为Application分配的计算资源信息,而接收该消息的Actor也正是DriverActor。

  • 资源分配

    TaskSchedulerImpl类就是负责为Task分配资源的。在CoarseGrainedSchedulerBackend获取到可用资源后就会通过makeOffers方法通知TaskSchedulerImpl对资源进行分配,TaskSchedulerImpl的resourceOffers方法就是负责为Task分配计算资源的,在为Task分配好资源后又会通过lauchTasks方法发送LaunchTask消息通知Worker上的Executor执行Task。

TaskScheduler创建中函数调用链

SparkContext的createTaskScheduler创建schedulerBackend和taskScheduler–>根据不同的调度方式选择具体的scheduler和backend构造器–>调用TaskSchedulerImpl的initialize方法为scheduler的成员变量backend赋值–>createTaskScheduler返回创建好的(schedulerBackend, taskScheduler)–>调用TaskScheduler.start()启动–>实际上在TaskSchedulerImpl的start方法中调用backend.start()来启动SchedulerBackend。

TaskScheduler是在Application执行过程中,为它进行任务调度的,是属于Driver侧的。对应于一个Application就会有一个TaskScheduler,TaskScheduler和Application是一一对应的。TaskScheduler对资源的控制也比较鲁棒,一个Application申请Worker的计算资源,只要Application不结束就会一直被占有。 

小结

这一篇文章,我们介绍了TaskScheduler的创建过程,TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的关系还有创建过程的调用链,给大家一个初始印象。在下一篇中,我将承接Stage划分完毕后进行task创建和分发流程,进行细致的介绍。

参考资料

Spark源码分析(三)-TaskScheduler创建

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-18 15:47:29

【Spark Core】TaskScheduler源码与任务提交原理浅析1的相关文章

【Spark Core】TaskScheduler源码与任务提交原理浅析2

引言 上一节<TaskScheduler源码与任务提交原理浅析1>介绍了TaskScheduler的创建过程,在这一节中,我将承接<Stage生成和Stage源码浅析>中的submitMissingTasks函数继续介绍task的创建和分发工作. DAGScheduler中的submitMissingTasks函数 如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tas

【Spark Core】TaskScheduler源代码与任务提交原理浅析2

引言 上一节<TaskScheduler源代码与任务提交原理浅析1>介绍了TaskScheduler的创建过程,在这一节中,我将承接<Stage生成和Stage源代码浅析>中的submitMissingTasks函数继续介绍task的创建和分发工作. DAGScheduler中的submitMissingTasks函数 假设一个Stage的全部的parent stage都已经计算完毕或者存在于cache中.那么他会调用submitMissingTasks来提交该Stage所包括的T

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Spark on K8S源码解析.md

Spark on K8S源码解析 sparkk8s time: 2019-12-19 Spark on k8s源码解析 1. Spark Submit spark-submit.sh spark-class.sh SparkSubmit 第一步,初始化spark应用配置 第二步,执行spark应用 Spark on k8s源码解析 本文基于spark-3.0.0 preview源码,来分析spark作业基于K8S的提交过程. spark on k8s的代码位置位于: 关于kubernetes目录

【Spark】DAGScheduler源码浅析2

引入 上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的Spark源码分析 – DAGScheduler一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数. 事件处理 在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理.在Spark 1.0源码

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark SQL Catalyst源码分析之Optimizer

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以

Spark SQL Catalyst源码分析之Physical Plan

前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized L