Spark 资源调度 与 任务调度

  • Spark 资源调度与任务调度的流程(Standalone):

    • 启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master掌握了集群资源状况。
    • 当 Spark 提交一个 Application 后, 根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图。
    • 任务提交后, Spark 会在任务端创建两个对象: DAGSchedular 和 TaskScheduler
    • DAGSchedular 是任务调度的高层调度器, 是一个对象
    • DAGScheduler 的主要作用是 将 DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的Stage, 然后将 stage 以 TaskSet 的形式 提交给 TaskScheduler
      • TaskScheduler 是任务调度的底层调度器
      • TaskSet 其实就是一个集合, 里面封装的就是一个个task任务, 也就是stage中的并行度 task 任务
        package org.apache.spark.scheduler
        
        import java.util.Properties
        
        /**
         * A set of tasks submitted together to the low-level TaskScheduler,
         * usually representing missing partitions of a particular stage.
         * 一同被提交到低等级的任务调度器的 一组任务集, 通常代表了一个特定的 stage(阶
         * 段) 的 缺失的分区
         */
        private[spark] class TaskSet(
            // 任务数组
            val tasks: Array[Task[_]],
            // 阶段Id
            val stageId: Int,
            // 尝试的阶段Id(也就是下级Stage?)
            val stageAttemptId: Int,
            // 优先级
            val priority: Int,
            // 是个封装过的Hashtable
            val properties: Properties) {
            // 拼接 阶段Id 和 尝试的阶段Id
          val id: String = stageId + "." + stageAttemptId
          // 重写 toString
          override def toString: String = "TaskSet " + id
        }
      • TaskScheduler 会遍历 TaskSet 集合, 拿到每个 task 后将 task发送到 Executor 中执行
      • 其实就是发送到Executor中的线程池ThreadPool去执行
      • 当 task 执行失败时, 则由TaskSchedular负责重试, 将 task重新发送给 Executor 去执行, 默认重试 3 次
          //提交task,最后一行  backend.reviveOffers()  调用的是CoarseGrainedSchedulerBackend对象中的方法
          override def submitTasks(taskSet: TaskSet) {
            // 获取任务数组
            val tasks = taskSet.tasks
            logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
            // 加同步锁
            this.synchronized {
               // 创建任务集管理器 参数: 任务集, 最大容忍任务失败次数
              val manager = createTaskSetManager(taskSet, maxTaskFailures)
               // 阶段Id
              val stage = taskSet.stageId
        
                // taskSetsByStageIdAndAttempt 是一个 HashMap[Int, TaskSetManager]
                /* getOrElseUpdate(key: A, op: => B): B=
                * 如果 key 已经在这个 map 中, 就返回其对应的value
                * 否则就根据已知的表达式 'op' 计算其对应的value 并将其存储到 map中, 并返回该 value
                */
        val stageTaskSets =
                taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
                // 将阶段任务集合设置为任务管理器
              stageTaskSets(taskSet.stageAttemptId) = manager
                // 获取冲突的任务集 如果 stageTaskSets 的任务集 不是传入的任务集 并且stageTaskSets的任务集不是僵尸进程 那么它就是冲突的任务集
              val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
                ts.taskSet != taskSet && !ts.isZombie
              }
                // 如果有冲突的任务集
              if (conflictingTaskSet) {
                throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
                  s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
              }
                // 通过可调度的构造器创建一个任务集管理器
              schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
              // 如故不是本地提交 或者 没有接收到任务
              if (!isLocal && !hasReceivedTask) {
                  // 通过饥饿的计时器 来 根据 固定的比例进行调度
                  // scheduleAtFIxedRate 方法的三个参数: 时间任务, 延迟时间, 周期 如果延迟时间或周期值为父会抛出异常
                starvationTimer.scheduleAtFixedRate(new TimerTask() {
                  override def run() {
                      // 如果没有发送任务
                    if (!hasLaunchedTask) {
                      logWarning("Initial job has not accepted any resources; " +
                        "check your cluster UI to ensure that workers are registered " +
                        "and have sufficient resources")
                    } else {
                        // 如果发送了任务, 就取消
                      this.cancel()
                    }
                  }
                  // 默认的饥饿超时临界值: 15s
                }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
              }
              hasReceivedTask = true
            }
            // 调用 CoarseGrainedSchedulerBackend对象中的方法
            backend.reviveOffers()
          }
      • 如果重试 3 次 依然失败, 那么这个 task 所在的 stage 就失败了
      • 如果 stage 失败了则由 DAGScheduler 来负责重试, 重新发送 TaskSet 到 TaskScheduler, Stage 默认重试 4 次。
      • 如果重试 4 次 以后依然失败, 那么 该 job 就失败了。
      • 一个 job 失败了, Application 就失败了。
    • TaskScheduler 不仅能重试失败的 task, 还会重试 straggling(直译是挣扎的, 这边可以意译为缓慢的) task(执行速度比其他task慢太多的task)
      /**
          * TaskScheduler 启动
          */
        override def start() {
          //StandaloneSchedulerBackend 启动
          backend.start()
      
          if (!isLocal && conf.getBoolean("spark.speculation", false)) {
            logInfo("Starting speculative execution thread")
              // 启动定期执行推测任务线程
            speculationScheduler.scheduleWithFixedDelay(new Runnable {
              override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
                  // 检查所有活跃的jon中是否有可推测的任务
                checkSpeculatableTasks()
              }
            }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
          }
        }
      // Check for speculatable tasks in all our active jobs.
      // 检查是否有可推测的任务
        def checkSpeculatableTasks() {
          // 是否应该重新激活
          var shouldRevive = false
          // 加同步锁
          synchronized {
             // 检查是否有可推测的任务(传入执行推测所需的最小时间)
            shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
          }
            // 如果需要重新激活
          if (shouldRevive) {
            // 就尝试运行推测任务
            backend.reviveOffers()
          }
        }
    • Spark 推测执行机制:
      • 如果有运行缓慢的 task, 那么 TaskScheduler 会启动一个新的 task 来与该运行缓慢的 task 执行相同的处理逻辑。
      • 两个 task 哪个先执行完, 就以哪个 task 的执行结果为准。
      • 在 Spark 中推测执行默认是关闭的。
      • 推测执行可以通过 spark.speculation 属性来配置
          /**
           * Return a speculative task for a given executor if any are available
           * 如果有卡壳的进程,就向已知的executor进程返回一个推测任务
           * The task should not have an attempt running on this host, in case
           * the host is slow.
           * 该任务不应有任何尝试任务在该主机上运行, 以防止该主机是有延迟的
           * In addition, the task should meet the given locality constraint.
           * 此外, 该任务需要满足已知的本地约束
           */
          // Labeled as protected to allow tests to override providing speculative tasks if necessary
          // 标注为 protected 以允许测试 来重写 提供的推测任务(如果需要的话)
          protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
            : Option[(Int, TaskLocality.Value)] =
          {
            // 从推测式执行任务列表中移除已经成功完成的task
            speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
        
            // 1.判断 task 是否可以在该executor对应的Host上执行, 判断条件为:
            // 2.没有taskAttempt在该Host上运行
            // 3. 该 executor 没有在 task 的黑名单中(task 在该executor上失败过, 并且仍在‘黑暗’时间中)
            def canRunOnHost(index: Int): Boolean = {
              !hasAttemptOnHost(index, host) &&
                !isTaskBlacklistedOnExecOrNode(index, execId, host)
            }
            // 判断推测执行任务集合是否为空
            if (!speculatableTasks.isEmpty) {
              // Check for process-local tasks;
              // 检查 本地进程任务
              // note that tasks can be process-local on multiple nodes when we replicate cached blocks, as in Spark Streaming
              // 需要注意的是: 当我们备份缓存块时, 任务可以以本地进程 或者 多节点的形式运行 (就像spark流那样)
              for (index <- speculatableTasks if canRunOnHost(index)) {
                val prefs = tasks(index).preferredLocations
                val executors = prefs.flatMap(_ match {
                  case e: ExecutorCacheTaskLocation => Some(e.executorId)
                  case _ => None
                });
                // 如果 executor 进程包含该任务Id
                if (executors.contains(execId)) {
                  // 就不推测该任务
                  speculatableTasks -= index
                  // 返回某个本地进程
                  return Some((index, TaskLocality.PROCESS_LOCAL))
                }
              }
        
              // Check for node-local tasks 检查本地节点的任务
              if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
                for (index <- speculatableTasks if canRunOnHost(index)) {
                  val locations = tasks(index).preferredLocations.map(_.host)
                  if (locations.contains(host)) {
                    speculatableTasks -= index
                    return Some((index, TaskLocality.NODE_LOCAL))
                  }
                }
              }
        
              // Check for no-preference tasks 检查非优先级的任务
              if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
                  // 遍历 speculatableTasks, 如果有任务能够在主机上运行
                for (index <- speculatableTasks if canRunOnHost(index)) {
                  // 获取该task的优先级位置
                  val locations = tasks(index).preferredLocations
                  if (locations.size == 0) {
                    speculatableTasks -= index
                    return Some((index, TaskLocality.PROCESS_LOCAL))
                  }
                }
              }
        
              // Check for rack-local tasks 监察本地构建的任务
              if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
                for (rack <- sched.getRackForHost(host)) {
                  for (index <- speculatableTasks if canRunOnHost(index)) {
                    val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
                    if (racks.contains(rack)) {
                      speculatableTasks -= index
                      return Some((index, TaskLocality.RACK_LOCAL))
                    }
                  }
                }
              }
        
              // Check for non-local tasks 检查非本地性的任务
              if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
                for (index <- speculatableTasks if canRunOnHost(index)) {
                  speculatableTasks -= index
                  return Some((index, TaskLocality.ANY))
                }
              }
            }
        
            None
          }
        • 需要注意的是:

          • 对于 ETL(Extract Transformation Load) 类型的需要导入数据库的业务需要关闭推测执行机制, 否则会有重复的数据导入数据库。
          • 如果遇到数据倾斜的情况, 开启推测执行则有可能导致一直会有 task 重新启动处理相同的逻辑, 任务可能一直处于处理不完的状态。
  • 粗粒度资源申请 和 细粒度资源申请
    • 粗粒度资源申请(Spark)

      • 在 Application 执行之前, 将所有的资源申请完毕, 当资源申请成功后, 才会进行任务的调度, 当所有的 task 执行完成后才会释放这部分资源
      • 优点
        • 在 Application 执行之前, 所有的资源都申请完毕, 每一个 task 直接使用资源就可以了, 不需要 task 在执行前自己去申请资源。
        • task 执行快了 => stage 执行就快了 => job 执行就快了 => application 执行就快了
      • 缺点
        • 直到最后一个 task 执行完成才会释放资源, 集群的资源无法充分利用。(俗称: 占着M坑不拉S)
    • 细粒度资源申请(MR)
      • Application 执行之前不需要先去申请资源, 而是直接执行, 让 job 中的每一个 task 在执行前自己去申请资源, task执行完成就释放资源。
      • 优点
        • 集群的资源可以充分利用
      • 缺点
        • task自己去申请资源,task启动变慢,Application的运行就响应的变慢了。

原文地址:https://www.cnblogs.com/ronnieyuan/p/11734721.html

时间: 2024-11-04 23:08:39

Spark 资源调度 与 任务调度的相关文章

Spark资源调度和任务调度

一.资源调度&任务调度 1.启动集群后,Worker节点会周期性的[心跳]向Master节点汇报资源情况,Master掌握集群资源情况. 2.当Spark提交一个Application后,根据RDD之间的依赖关系将Application构建成一个DAG有向无环图. 3.任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler. 4.DAGScheduler是任务调度的高层调度器,是一个对象.DAGScheduler的主要作用就是将DAG根据RDD

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本課主題 Master 资源调度的源码鉴赏 Spark 的 Worker 是基于什么逻辑去启动 Executor 资源调度管理 任務調度與資源是通過 DAGScheduler.TaskScheduler.SchedulerBackend 等進行的作業調度 資源調度是指應用程序如何獲得資源 任務調度是在資源調度的基礎上進行的,沒有資源調度那麼任務調度就成為了無源之水無本之木 Master 资源调度的源码鉴赏 因為 Master 負責資源管理和調度,所以資源調度方法 scheduer 位於 Mast

Spark 资源调度包 stage 类解析

spark 资源调度包 Stage(阶段) 类解析 类注释: /** * A stage is a set of parallel tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle dependencies. * 一个阶段是所有计算相同功能的并行任务集合, 作为spark作业的一部分, 这些任务都有相同

3.算子+PV&amp;UV+submit提交参数+资源调度和任务调度源码分析+二次排序+分组topN+SparkShell

1.补充算子 transformations ?  mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值. ?  repartition 增加或减少分区.会产生shuffle.(多个分区分到一个分区不会产生shuffle) 多用于增多分区. 底层调用的是coalesce ?  coalesce(合并) coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle. true为产生shuffle,false不产生shuff

Spark资源调度

一:任务调度和资源调度的区别: 任务调度是指通过DAGScheduler,TaskScheduler,SchedulerBackend完成的job的调度 资源调度是指应用程序获取资源的调度,他是通过schedule方法完成的 二:资源调度解密 因为master负责资源管理和调度,所以资源调度的方法schedule位于master.scala这个了类中,当注册程序或者资源发生改变的都会导致schedule的调用,例如注册程序的时候(包括worker,driver和application的注册等,注

Spark里面的任务调度:离SparkContext开始

SparkContext这是发达国家Spark入学申请,它负责的相互作用和整个集群,它涉及到创建RDD.accumulators and broadcast variables.理解力Spark架构,我们需要从入口开始.下图是图的官方网站. DriverProgram就是用户提交的程序,这里边定义了SparkContext的实例. SparkContext定义在core/src/main/scala/org/apache/spark/SparkContext.scala. Spark默认的构造函

SPARK 资源调度源码总结

Executor在集群中分散启动,有利于task计算的数据本地化 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项 默认情况下没有设置--total-executor-cores,一个Applicatio

小记--------spark资源调度机制源码分析-----Schedule

Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类 /** * driver调度机制原理代码分析Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability change