小记--------spark资源调度机制源码分析-----Schedule

Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类

/**
* driver调度机制原理代码分析Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {

  //首先判断,master撞他提不是ALIVE的话, 就直接返回
  //也就是说 standby master是不会进行application等资源的调度的
  if (state != RecoveryState.ALIVE) {
    return
  }

  // Rondom.shuffle的原理,就是对传入的集合的元素进行随机的打乱
  // 取出了workers中的所有值钱注册上来的worker,进行过滤, 必须死活状态为ALIVE的worker
  // 对状态为ALIVE的worker, 调用Random的shuffle方法进行随机的打乱
  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

  // 获取到当前可用worker的个数
  val numWorkersAlive = shuffledAliveWorkers.size
  var curPos = 0

  // 只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而不会来注册driver,更不会让master调度driver了

    // 首先会遍历waitingDrivers 这个ArrayBuffer
  for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers

    // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
    // start from the last worker that was assigned a driver, and continue onwards until we have
    // explored all alive workers.
    var launched = false
    var numWorkersVisited = 0

     // while的条件,numWorkersVisited小于numWorkersAlive,就是说只要还有活着的worker没有被遍历到,那么就继续遍历。而且,当前这个driver还没有被启动,也就是launched为false
     while (numWorkersVisited < numWorkersAlive && !launched) {
      val worker = shuffledAliveWorkers(curPos)
      numWorkersVisited += 1

      // 如果当前这个worker的空闲内存量大于等于driver需要的内存
      // 并且worker的空闲CPU数量大于等于driver需要的CPU数量
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

        //那么启动driver
        launchDriver(worker, driver)//详细代码见代码1

        // 并且将driver从waitingDrivers队列中移除
        waitingDrivers -= driver
        launched = true
      }

      //然后将指针指向下一个worker
      curPos = (curPos + 1) % numWorkersAlive
    }
  }
  startExecutorsOnWorkers()
}

代码1
/**
 *在某一个worker上启动driver
 */
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)

  // 将driver加入worker内存的缓存结构中
  // 将worker内使用的内存和CPU数量,都加上driver需要的内存和CPU数量
  worker.addDriver(driver)

  // 同时把worker也加入到driver内部的缓存结构中  进行一个互相引用,互相能找到对方
  driver.worker = Some(worker)

  // 然后调用worker的线程,给它发送LaunchDriver消息,让worker来启动driver
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))

  // 然后将driver的状态设置为RUNNING
  driver.state = DriverState.RUNNING
}

//executor调度原理及源码分析
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {

  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  // 首先遍历出waitingApps中的ApplicationInfo,并且过滤出还有需要调用的core的Application
  for (app <- waitingApps if app.coresLeft > 0) {
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor

     // Filter out workers that don‘t have enough resources to launch an executor
     // 从workers中,过滤出状态为ALIVE的,再次过滤可以被Application使用的worker,然后按照剩余CPU数量倒序排序
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse
    val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)//详细代码见:代码2

    // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them
    // 给每个worker分配完Application要求的CPU core之后 , 遍历每个worker
    // 并且判断之前给这个worker分配到了CPU core
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {

      //在Application内部缓存中添加executor
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))//详细代码见:代码3
    }
  }
}

代码2
private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
  val coresPerExecutor = app.desc.coresPerExecutor

  // 每个executor上最小的cores数量,默认为1
  val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
  val oneExecutorPerWorker = coresPerExecutor.isEmpty
  val memoryPerExecutor = app.desc.memoryPerExecutorMB

// 可用的worker数量
  val numUsable = usableWorkers.length
  val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
  val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker

// 取Application需要的core是和集群workers的空闲cores和的最小值
  var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

  /** Return whether the specified worker can launch an executor for this app. */
// 是否可以在一个worker上分配Executor
  def canLaunchExecutor(pos: Int): Boolean = {
    val keepScheduling = coresToAssign >= minCoresPerExecutor
    val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

    // If we allow multiple executors per worker, then we can always launch new executors.
    // Otherwise, if there is already an executor on this worker, just give it more cores.
    val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0

// 检查worker的空闲core和内存是否够用
    if (launchingNewExecutor) {
      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
      val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
      keepScheduling && enoughCores && enoughMemory && underLimit
    } else {
      // We‘re adding cores to an existing executor, so no need
      // to check memory and executor limits
//  不检查memory和executor的限制
      keepScheduling && enoughCores
    }
  }

  // Keep launching executors until no more workers can accommodate any
  // more executors, or if we have reached this application‘s limits
  var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
  while (freeWorkers.nonEmpty) {
    freeWorkers.foreach { pos =>
      var keepScheduling = true
      while (keepScheduling && canLaunchExecutor(pos)) {
        // 要分配的cores
        coresToAssign -= minCoresPerExecutor

        // 已分配的cores
        assignedCores(pos) += minCoresPerExecutor

        // If we are launching one executor per worker, then every iteration assigns 1 core
        // to the executor. Otherwise, every iteration assigns cores to a new executor.
        // 一个worker只启动一个Executor
        if (oneExecutorPerWorker) {
          assignedExecutors(pos) = 1
        } else {
          assignedExecutors(pos) += 1
        }

        // Spreading out an application means spreading out its executors across as
        // many workers as possible. If we are not spreading out, then we should keep
        // scheduling executors on this worker until we use all of its resources.
        // Otherwise, just move on to the next worker.
        // 如果没有开启spreadOut算法,就一直在一个worker上分配,直到不能在分配为止,
        // 这个算法的意思就是,每个Application,都尽可能分配到尽量少的worker上,比如总过有10个worker,每个有10个CPU core
        // 而我们的Application总共需要20个core, 那么其实 就只用到两个worker
        if (spreadOutApps) {
          keepScheduling = false
        }
      }
    }
    freeWorkers = freeWorkers.filter(canLaunchExecutor)
  }
  assignedCores
}

代码3
//分配CPUcore的算法
private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  // 需要的cores / 每个executor的cores 得到真正需要启动的executor数量
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {

    // 首先在Application内部缓存结构中,添加executors
    // 并且创建ExecutorDesc对象,其中封装了,给这个executor分配多少个CPU core
    // spark 1.3.0版本的executor启动的内部机制
    // 在spark-submit脚本中, 可以指定要多少个executor,每个executor多少个CPU,多少内存,那么基于我们的机制,实际上,最后,executor的实际数量, 以及每个executor的CPU, 可能与配置是不一样的。
    // 因为我们这里是基于总的CPU来分配的,就是说,比如要求3个executor,每个executor要3个CPU,那么
    // 当我们有9个worker,每个worker有1个CPU时,那么其实总共是要分配9个CPU core, 其实根据这种算法,会给每个worker分配一个CPU core, 然后给每个worker启动一个executor
    // 最后总过会启动9个executor ,每个executor有一个CPUcore
    // application  需要记录executor
    val exec = app.addExecutor(worker, coresToAssign)

    // 接着在worker上启动executor
    launchExecutor(worker, exec)// 详细代码见:代码4

    // 然后将Application的状态修改为RUNNING
    app.state = ApplicationState.RUNNING
  }
}

代码4
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

  // 将executor加入worker内部缓存中
  worker.addExecutor(exec)

  // 向worker发送LaunchExecutor消息
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))

  // 向executor对应的application的driver,发送executorAdded消息
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
 

原文地址:https://www.cnblogs.com/yzqyxq/p/11968018.html

时间: 2024-10-11 21:03:36

小记--------spark资源调度机制源码分析-----Schedule的相关文章

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Android -- 消息处理机制源码分析(Looper,Handler,Message)

android的消息处理有三个核心类:Looper,Handler和Message.其实还有一个Message Queue(消息队列),但是MQ被封装到Looper里面了,我们不会直接与MQ打交道,因此我没将其作为核心类.下面一一介绍: Looper Looper的字面意思是“循环者”,它被设计用来使一个普通线程变成Looper线程.所谓Looper线程就是循环工作的线程.在程序开发中(尤其是GUI开发中),我们经常会需要一个线程不断循环,一旦有新任务则执行,执行完继续等待下一个任务,这就是Lo

Android 中View的绘制机制源码分析 三

到目前为止,measure过程已经讲解完了,今天开始我们就来学习layout过程,不过在学习layout过程之前,大家有没有发现我换了编辑器,哈哈,终于下定决心从Html编辑器切换为markdown编辑器,这里之所以使用"下定决心"这个词,是因为毕竟Html编辑器使用好几年了,很多习惯都已经养成了,要改变多年的习惯确实不易,相信这也是还有很多人坚持使用Html编辑器的原因.这也反应了一个现象,当人对某一事物非常熟悉时,一旦出现了新的事物想取代老的事物时,人们都有一种抵触的情绪,做技术的

Java NIO——Selector机制源码分析---转

一直不明白pipe是如何唤醒selector的,所以又去看了jdk的源码(openjdk下载),整理了如下: 以Java nio自带demo : OperationServer.java   OperationClient.java(见附件) 其中server端的核心代码: public void initSelector() { try { selector = SelectorProvider.provider().openSelector(); this.serverChannel1 =

Spark SQL Catalyst源码分析之Optimizer

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以

Android 中View的绘制机制源码分析 二

尊重原创:http://blog.csdn.net/yuanzeyao/article/details/46842891 本篇文章接着上篇文章的内容来继续讨论View的绘制机制,上篇文章中我们主要讲解了View的measure过程,今天我们就来学习ViewGroup的measure过程,由于ViewGroup只是一个抽象类,所以我们需要以一个具体的布局来分析measure过程,正如我上篇文章说的,我打算使用LinearLayout为例讲解measure过程,如果你还没有读过上篇文章,那么建议你先

Spark SQL Catalyst源码分析之Physical Plan

前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized L