Spark资源调度

一:任务调度和资源调度的区别:

任务调度是指通过DAGScheduler,TaskScheduler,SchedulerBackend完成的job的调度

资源调度是指应用程序获取资源的调度,他是通过schedule方法完成的

二:资源调度解密

因为master负责资源管理和调度,所以资源调度的方法schedule位于master.scala这个了类中,当注册程序或者资源发生改变的都会导致schedule的调用,例如注册程序的时候(包括worker,driver和application的注册等,注意executor是向SparkDeploySchedulerBackend注册的)

case RegisterApplication(description, driver) => {
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY) {
    // ignore, don‘t send response
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(description, driver)
    registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
    persistenceEngine.addApplication(app)
    driver.send(RegisteredApplication(app.id, self))
    schedule()
  }
**
 * Schedule the currently available resources among waiting apps. This method will be called
 * every time a new app joins or resource availability changes.
 */

每当新的应用程序加入或者可用资源发生改变(比如exccutor或者worker增加或者减少的时候)的时候,该方法都会发生响应

private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }//判断Master的状态是否为ALIVE,如果不是,则调度没有任何意义
  // Drivers take strict precedence over executors
  val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers

//将workers随机化,有利于负载均衡
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {//判断worker的状态,只有Alive级别的worker才能参与资源的分配工作
    for (driver <- waitingDrivers) {//循环遍历等待中的driver,当然这里指的是cluster模式,如果是client模式的话,driver就自动启动了。
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

//当worker的free内存和cpu比driver所需要的多的时候,将driver放到workers中随机的一个worker,启动driver
        launchDriver(worker, driver)
        waitingDrivers -= driver//将启动的driver在等待队列中移除。
      }
    }
  }
  startExecutorsOnWorkers()
}

schedule的代码解析(简单的就放在上面的代码注释里了)

Random.shuffle(workers)  将worker在master缓存数据结构中的顺序打乱

def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
  val buf = new ArrayBuffer[T] ++= xs//构建一个临时的缓冲数组

  def swap(i1: Int, i2: Int) {//交换数组中指定下表的两个元素
    val tmp = buf(i1)
    buf(i1) = buf(i2)
    buf(i2) = tmp
  }

  for (n <- buf.length to 2 by -1) {//生成随机数,并不停交换,打乱了数组中元素的顺序
    val k = nextInt(n)
    swap(n - 1, k)
  }

  (bf(xs) ++= buf).result//返回随机化的新集合(这里就是workers的集合了)
}

2 waitingDrivers

private val waitingDrivers = new ArrayBuffer[DriverInfo]

可以看到这里waitingDrivers是一个数据元素为DriverInfo的数组,DriverInfo包含了driver的信息startTime(启动时间),id,desc(driver的描述信息),submitDate(提交日期)

private[deploy] class DriverInfo(
    val startTime: Long,
    val id: String,
    val desc: DriverDescription,
    val submitDate: Date)
  extends Serializable {

其中描述信息包含了一下内容

private[deploy] case class DriverDescription(
    jarUrl: String,//jar包地址
    mem: Int,//内存信息
    cores: Int,//CPU
    supervise: Boolean//当spark-submit指定driver在cluster模式下运行的话如果设定了supervise,driver挂掉的时候回自动重启,
    command: Command) {//一些环境信息

  override def toString: String = s"DriverDescription (${command.mainClass})"
}

3 launchDriver spark只有先启动driver才能进行后面具体的调度

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  worker.addDriver(driver)//表明driver运行的worker
  driver.worker = Some(worker)//driver和worker的相互引用
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))//master通过远程rpc发指令给worker,让worker启动driver。
  driver.state = DriverState.RUNNING//启动之后将driver的状态转为RUNNING
}

4 startExecutorsOnWorkers 先进先出的队列方式进行简单调度,spark默认启动Executor的方式是FIFO的方式,只有前一个app满足了资源分配的基础上,才会为下一个应用程序分配资源

/**
 * Schedule and launch executors on workers
 */
private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  for (app <- waitingApps if app.coresLeft > 0) {//为应用程序具体分配Executor之前会判断当前应用程序是否还需要cores,如果不需要则不会为应用程序分配Executor
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor//应用程序所需要的cores
    // Filter out workers that don‘t have enough resources to launch an executor

//过滤掉不满足条件的worker,条件为:worker的状态必须是AlIVE的,worker的内存和cpu必须比每一个Executor所需要的大。

//过滤完之后,按照可用cores进行排序,并将大的放到前面,最优的最先使用。
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse

//这里采用spreadOutApps的方式来让应用程序尽可能分散的运行在每一个Node上,这种方式往往能顺便带来更好的数据本地性,通常数据是分散的分布在各台机器上,这种方式通常也是默认的。这方法返回的是每一个分配给每一个worker的cores的数组。具体的在分配cores的时候回尽可能的满足当前所需的
    val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

    // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them

//下面进行真正的分配Executors,Master通过远程通信发指令给Worker来启动ExecutorBackend进程,向driver发送ExecutorAdded通信。
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

原文地址:https://www.cnblogs.com/itboys/p/9966838.html

时间: 2024-10-06 23:34:26

Spark资源调度的相关文章

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本課主題 Master 资源调度的源码鉴赏 Spark 的 Worker 是基于什么逻辑去启动 Executor 资源调度管理 任務調度與資源是通過 DAGScheduler.TaskScheduler.SchedulerBackend 等進行的作業調度 資源調度是指應用程序如何獲得資源 任務調度是在資源調度的基礎上進行的,沒有資源調度那麼任務調度就成為了無源之水無本之木 Master 资源调度的源码鉴赏 因為 Master 負責資源管理和調度,所以資源調度方法 scheduer 位於 Mast

Spark 资源调度包 stage 类解析

spark 资源调度包 Stage(阶段) 类解析 类注释: /** * A stage is a set of parallel tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle dependencies. * 一个阶段是所有计算相同功能的并行任务集合, 作为spark作业的一部分, 这些任务都有相同

Spark 资源调度 与 任务调度

Spark 资源调度与任务调度的流程(Standalone): 启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master掌握了集群资源状况. 当 Spark 提交一个 Application 后, 根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图. 任务提交后, Spark 会在任务端创建两个对象: DAGSchedular 和 TaskScheduler DAGSchedular 是任务调度的高层调度器, 是一个对象 DAGSch

Spark资源调度和任务调度

一.资源调度&任务调度 1.启动集群后,Worker节点会周期性的[心跳]向Master节点汇报资源情况,Master掌握集群资源情况. 2.当Spark提交一个Application后,根据RDD之间的依赖关系将Application构建成一个DAG有向无环图. 3.任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler. 4.DAGScheduler是任务调度的高层调度器,是一个对象.DAGScheduler的主要作用就是将DAG根据RDD

SPARK 资源调度源码总结

Executor在集群中分散启动,有利于task计算的数据本地化 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项 默认情况下没有设置--total-executor-cores,一个Applicatio

小记--------spark资源调度机制源码分析-----Schedule

Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类 /** * driver调度机制原理代码分析Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability change

Spark 从入门到精通学习笔记大纲

Spark 传奇行动目录 我把这个部份称之为Spark世界的[九阴真经],是 Spark学习者增强内功的首选地方. 第28课:Spark天堂之门解密 (点击进入博客)从 SparkContext 创建3大核心对象开始到注册给 Master 这个过程中的源码鉴赏 第29课:Master HA彻底解密 (点击进入博客)从 Master 如何基于 ZooKeeper 来做 HA 的源码鉴赏 第30课:Master的注册机制和状态管理解密 (点击进入博客)从 Master 的角度去分析它是如何接收 Wo

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视