Spark 资源调度包 stage 类解析

spark 资源调度包 Stage(阶段) 类解析

  • 类注释:

    /**
     * A stage is a set of parallel tasks all computing the same function that need to run as part
     * of a Spark job, where all the tasks have the same shuffle dependencies.
     * 一个阶段是所有计算相同功能的并行任务集合, 作为spark作业的一部分, 这些任务都有相同的 shuffle 依赖
     *
     * Each DAG of tasks run by the scheduler is split up into stages at the boundaries where
     * shuffle occurs, and then the DAGScheduler runs these stages in topological order.
     * 每个由调度器运行的任务的有向无环图在shuffle发生的分界处分化成不同的阶段, 并且这些有向无环图的调度器
     * 将以拓扑排序来运行这些 不同阶段的任务
     *
     * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
     * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
     * (e.g. count(), save(), etc) by running a function on an RDD.
     * 每个阶段都可以是一个 shuffle 匹配阶段, 该任务的结果可以被其他阶段作为导入, 也可以是 一个结果阶段,
     * 它的任务 通过 在RDD上运行 一个公式 来直接计算 一个 spark 行为(计数, 保存等...)
     *
     * For shuffle map stages, we also track the nodes that each output partition is on.
     * 对于 shuffle 匹配阶段, 我们会追踪 每个输出的分区的节点
     *
     * Each Stage also has a firstJobId, identifying the job that first submitted the stage.  When
     * FIFO scheduling is used, this allows Stages from earlier jobs to be computed first or
     * recovered faster on failure.
     * 每个阶段都有一个 首任务Id, 用于辨识第一个提交到该阶段的 任务。 当 先进先出 调度策略被使用时, 这会允
     * 许 更早 的 作业 的阶段先被计算, 或者 在失败之后更早的恢复。
     *
     * Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In
     * that case, the Stage object will track multiple StageInfo objects to pass to listeners or  * the web UI.
     * 最终, 由于 容错恢复机制, 单个阶段可以在多次尝试运行中 被重新执行。在这种情况下, 该阶段对象会追踪 许
     * 多 阶段信息对象 并 将信息传递给监听者 或者 web UI 界面
     *
     * The latest one will be accessible through latestInfo.
     * 最新的阶段可以通过 最新的信息获取
     *
     * @param id Unique stage ID    id: 唯一的阶段Id
     * @param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map
     * tasks on, while for a result stage, it's the target RDD that we ran an action on
     * rdd:该 阶段 所运行在的 RDD:对于shuffle匹配阶段, 该RDD是我们 运行匹配任务 所在的RDD, 但对于结果阶
     * 段,该RDD是我们运行 行为所在的RDD
     * @param numTasks Total number of tasks in stage; 当前阶段的任务数量
     * result stages in particular may not need to compute all partitions, e.g. for first(),
     * lookup(), and take().
     * 结果阶段 实际上不一定需要计算所有分区, 比如使用 first(), lookup() 和 take() 等算子
     * @param parents List of stages that this stage depends on (through shuffle dependencies).
     * parent: 该阶段 所依赖(shuffle依赖) 的 阶段列表
     * @param firstJobId ID of the first job this stage was part of, for FIFO scheduling.
     * firstJobId: 该阶段的第一个任务Id, 用于先进先出调度
     * @param callSite Location in the user program associated with this stage: either where the
     * target RDD was created, for a shuffle map stage, or where the action for a result stage was
     * called.
     * callSite: 与该阶段相关的用户程序的位置: 对于 shuffle 匹配阶段 是 目标 RDD创建的位置, 对于 结果
     * 阶段, 是 行为被调用的位置 (调用位点)
     */
  • 代码:
    private[scheduler] abstract class Stage(
        val id: Int,
        val rdd: RDD[_],
        val numTasks: Int,
        val parents: List[Stage],
        val firstJobId: Int,
        val callSite: CallSite)
      extends Logging {
      // 根据数组长度确定分区数
      val numPartitions = rdd.partitions.length
    
      /** Set of jobs that this stage belongs to. 该阶段 的任务Id集合*/
      val jobIds = new HashSet[Int]
    
      /** The ID to use for the next new attempt for this stage.
       * 该阶段的下一次新尝试的 Id
       */
      private var nextAttemptId: Int = 0
      // 短型的调用位点
      val name: String = callSite.shortForm
      // 长型的调用位点
      val details: String = callSite.longForm
    
      /**
       * Pointer to the [[StageInfo]] object for the most recent attempt.
       * 指向阶段信息对象的指针, 用于获取最近最多的尝试
       * This needs to be initialized here, before any attempts have actually been created, because
       * the DAGScheduler uses this StageInfo to tell SparkListeners when a job starts (which
       * happens before any stage attempts have been created).
       * 在任何尝试被实际创建之前, 它需要被初始化, 因为 有向无环图的调度器 使用了该阶段信息来告诉spark的监
       * 听者有任务启动了(这发生在任何阶段的尝试被发生之前)
       */
      private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
    
      /**
       * Set of stage attempt IDs that have failed with a FetchFailure.
       * 拉取失败的的阶段尝试Id集
       * We keep track of these failures in order to avoid endless retries if a stage keeps failing
       * with a FetchFailure.
       * 我们追踪这些失败时为了防止 一个阶段一直发生拉取失败后 无尽的重试
       * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
       * multiple tasks from the same stage attempt fail (SPARK-5945).
       * 我们追踪失败的每次尝试的Id以防止 如果许多任务在同一个阶段的尝试中失败 会导致 记录重复的失败
       */
      val fetchFailedAttemptIds = new HashSet[Int]
      // 清除失败Id
      private[scheduler] def clearFailures() : Unit = {
        fetchFailedAttemptIds.clear()
      }
    
      /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID.
       * 通过创建一个新的附带新的尝试Id的状态信息 来为当前阶段创建一个新的尝试
       */
      def makeNewStageAttempt(
          // 需要计算的分区
          numPartitionsToCompute: Int,
          // 本地优先的任务
          taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
          // TaskMetrics是内部累加器的一个包装类
        val metrics = new TaskMetrics
        metrics.register(rdd.sparkContext)
         // 获取最近的阶段信息
        _latestInfo = StageInfo.fromStage(
          this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
        nextAttemptId += 1
      }
    
      /** Returns the StageInfo for the most recent attempt for this stage.
       * 返回最近最多尝试的 当前阶段的 阶段信息
       */
      def latestInfo: StageInfo = _latestInfo
    
      override final def hashCode(): Int = id
    
      override final def equals(other: Any): Boolean = other match {
        case stage: Stage => stage != null && stage.id == id
        case _ => false
      }
    
      /** Returns the sequence of partition ids that are missing (i.e. needs to be computed).
       * 返回缺失的分区 id 的序列
       */
      def findMissingPartitions(): Seq[Int]
    }

原文地址:https://www.cnblogs.com/ronnieyuan/p/11723208.html

时间: 2024-11-02 19:04:39

Spark 资源调度包 stage 类解析的相关文章

第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本課主題 Master 资源调度的源码鉴赏 Spark 的 Worker 是基于什么逻辑去启动 Executor 资源调度管理 任務調度與資源是通過 DAGScheduler.TaskScheduler.SchedulerBackend 等進行的作業調度 資源調度是指應用程序如何獲得資源 任務調度是在資源調度的基礎上進行的,沒有資源調度那麼任務調度就成為了無源之水無本之木 Master 资源调度的源码鉴赏 因為 Master 負責資源管理和調度,所以資源調度方法 scheduer 位於 Mast

Spark Thrift JDBCServer应用场景解析与实战案例

[TOC] Spark Thrift JDBCServer应用场景解析与实战案例 1 前言 这里说的Spark Thrift JDBCServer并不是网上大部分写到的Spark数据结果落地到RDB数据库中所使用的JDBC方式,而是指Spark启动一个名为thriftserver的进程以供客户端提供JDBC连接,进而使用SQL语句进行查询分析. http://spark.apache.org/docs/2.3.3/sql-programming-guide.html#running-the-th

关于org.apache.spark.deploy.yarn.Client类

目录 前言 主要方法 submitApplication createApplicationSubmissionContext getApplicationReport getClientToken verifyClusterResources copyFileToRemote prepareLocalResources distribute setupLaunchEnv createContainerLaunchContext monitorApplication run @(关于org.ap

Thrift compiler代码生成类解析

代码生成类解析: Thrift--facebook RPC框架,介绍就不说了,百度,google一大把,使用也不介绍,直接上结构和分析吧. Hello.thrift文件内容如下: namespace java com.tomsun.thrift.generated.demo service Hello { string helloString(1:string para) } 内容很简单,申明个RPC service(Hello),服务方法helloString,方法参数格式(seq: para

tuple解包给类的构造函数

首先我们的第一步当然是将tuple解包.tuple提供了一个get函数来获取第N个元素.例如: get<1>(make_tuple(...)); 要将一个tuple全部拆解,就可以使用通过多次调用这个函数来进行解析,例如: auto tup = make_tuple(..........); func( get<0>(tup),get<1>(tup),get<2>(tup)......get<n>(tup) ); 而实际上,0,1,...n这个数

学习笔记之rpm程序包管理功能解析

Rpm包管理功能全解 软件包管理的功能:将编译好的程序的各组成文件打包成一个或几个程序包文件,为了方便的实现程序包的安装.升级.卸载.查询.校验.数据库维护. 下面我们来看看RPM包管理的解析 Rpm包在redhat和S.U.S.E中有很大的应用 我们接下来就以centos系统中rpm包的管理做一些详细的功能解析 使用yum(rhel系列)安装时可以自动解决依赖关系d rpm包命名格式: name-VERSION-release.arch.rpm VERSION:major.minor.rele

常见基础包、类、接口、异常

#常用的包 第一个: Java.lang包 该包提供Java语言进行程序设计的基础类. 第二个: Java.util包 该包提供了集合框架,时间模型,日期和时间实施,国际化的实用工具类. 第三个:Java.io包 通过文件系统,数据流和序列化提供系统的输入与输出. 第四个:Java.net包 该包提供实现网络应用与开发的类 第五个:Java.sql包 该包提供了使用java语言访问并处理存储在数据源中的数据API. 第六个:Java.text包 提供了与自然语言无关的方式来处理文本.日期.数字的

Spark 资源调度 与 任务调度

Spark 资源调度与任务调度的流程(Standalone): 启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master掌握了集群资源状况. 当 Spark 提交一个 Application 后, 根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图. 任务提交后, Spark 会在任务端创建两个对象: DAGSchedular 和 TaskScheduler DAGSchedular 是任务调度的高层调度器, 是一个对象 DAGSch

google Guava包的ListenableFuture解析

原文地址  译者:罗立树  校对:方腾飞 并发编程是一个难题,但是一个强大而简单的抽象可以显著的简化并发的编写.出于这样的考虑,Guava 定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口. 我们强烈地建议你在代码中多使用ListenableFuture来代替JDK的 Future, 因为: 大多数Futures 方法中需要它. 转到ListenableFuture 编程比较容易. Guava提供的通用公共类封装了公共的操作方方法,不需要提