spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制。

private def startExecutorsOnWorkers(): Unit = {  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app  // in the queue, then the second app, etc.  for (app <- waitingApps) {    //如果在 spark-submmit 脚本中,指定了每个executor 多少个 CPU core,    // 则每个Executor 分配该个数的 core,    // 否则 默认每个executor 只分配 1 个 CPU core    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)    // If the cores left is less than the coresPerExecutor,the cores left will not be allocated    //  当前 APP 还需要分配的  core  数 不能  小于 单个 executor 启动 的 CPU core 数    if (app.coresLeft >= coresPerExecutor) {      // Filter out workers that don‘t have enough resources to launch an executo/*ku*/r      // 过滤出 状态 为 ALIVE,并且还能 发布 Executor 的 worker      // 按照剩余的 CPU core 数  倒序      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)        .filter(canLaunchExecutor(_, app.desc))        .sortBy(_.coresFree).reverse      if (waitingApps.length == 1 && usableWorkers.isEmpty) {        logWarning(s"App ${} requires more resource than any of Workers could have.")      }
    // TODO:  默认采用 spreadOutApps  调度算法, 将 application需要的 executor资源 分派到  多个 worker 上去
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {        allocateWorkerResourceToExecutors(          app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))      }    }  }}判断一个 worker 是否可以发布 executor
private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = {  canLaunch(    worker,    desc.memoryPerExecutorMB,    desc.coresPerExecutor.getOrElse(1),    desc.resourceReqsPerExecutor)}让我们看一看里面的 canlaunch 方法
private def canLaunch(    worker: WorkerInfo,    memoryReq: Int,    coresReq: Int,    resourceRequirements: Seq[ResourceRequirement])  : Boolean = {  // worker 上 空闲的 内存值  要 大于等于  请求的 内存值  val enoughMem = worker.memoryFree >= memoryReq  // worker 上 空闲的 core 数  要 大于等于  请求的 core数  val enoughCores = worker.coresFree >= coresReq  //  worker 是否满足 executor 请求的资源     val enoughResources = ResourceUtils.resourcesMeetRequirements(    worker.resourcesAmountFree, resourceRequirements)  enoughMem && enoughCores && enoughResources}

回到上面的 scheduleExecutorsOnWorkers
private def scheduleExecutorsOnWorkers(    app: ApplicationInfo,    usableWorkers: Array[WorkerInfo],    spreadOutApps: Boolean): Array[Int] = {  val coresPerExecutor = app.desc.coresPerExecutor  val minCoresPerExecutor = coresPerExecutor.getOrElse(1)  // 默认情况下 是 开启  oneExecutorPerWorker 机制的,也就是默认是在 一个 worker 上  只启动 一个 executor的  //  如果在spark -submit 脚本中设置了coresPerExecutor , 在worker资源充足的时候,则 会在每个worker 上,启动多个executor  val oneExecutorPerWorker = coresPerExecutor.isEmpty  val memoryPerExecutor = app.desc.memoryPerExecutorMB  val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor  val numUsable = usableWorkers.length  val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker  val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker  var coresToAssign = math.min(app.coresLeft,
// 判断  Worker节点是否能够启动Executor  def canLaunchExecutorForApp(pos: Int): Boolean = {

    val keepScheduling = coresToAssign >= minCoresPerExecutor    val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor    val assignedExecutorNum = assignedExecutors(pos)

    // If we allow multiple executors per worker, then we can always launch new executors.    // Otherwise, if there is already an executor on this worker, just give it more cores.

    // 如果spark -submit 脚本中设置了coresPerExecutor值,    // 并且当前 这个worker 还没有为这个 application 分配 过  executor ,    val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0      // TODO:  可以启动新的 Executor    if (launchingNewExecutor) {      val assignedMemory = assignedExecutorNum * memoryPerExecutor      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor      val assignedResources = {        req => req.resourceName -> req.amount * assignedExecutorNum      }.toMap      val resourcesFree = usableWorkers(pos) {        case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))      }      val enoughResources = ResourceUtils.resourcesMeetRequirements(        resourcesFree, resourceReqsPerExecutor)      val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit      keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit    } else {      // We‘re adding cores to an existing executor, so no need      // to check memory and executor limits      // TODO:  不满足启动新的 Executor条件,则 在 老的 Executor 上 追加  core 数      keepScheduling && enoughCores    }  }

  // Keep launching executors until no more workers can accommodate any  // more executors, or if we have reached this application‘s limits

  var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)  while (freeWorkers.nonEmpty) {    freeWorkers.foreach { pos =>      var keepScheduling = true      while (keepScheduling && canLaunchExecutorForApp(pos)) {        coresToAssign -= minCoresPerExecutor        assignedCores(pos) += minCoresPerExecutor

        // If we are launching one executor per worker, then every iteration assigns 1 core        // to the executor. Otherwise, every iteration assigns cores to a new executor.        if (oneExecutorPerWorker) {          //TODO: 如果该Worker节点不能启动新的 Executor,则在老的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值默认 为 1 )          assignedExecutors(pos) = 1        } else {          //TODO: 如果该Worker节点可以启动新的 Executor,则在新的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值为 spark-submit脚本配置的 coresPerExecutor 值)          assignedExecutors(pos) += 1        }

        // Spreading out an application means spreading out its executors across as        // many workers as possible. If we are not spreading out, then we should keep        // scheduling executors on this worker until we use all of its resources.        // Otherwise, just move on to the next worker.        if (spreadOutApps) {          // TODO: 这里传入 keepScheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一个 worker 上  再去 分配 core,直到 worker          // TODO:  完成一次遍历          keepScheduling = false        }      }    }    freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)  }  // 返回每个Worker节点分配的CPU核数  assignedCores}

再来分析 allocateWorkerResourceToExecutors
private def allocateWorkerResourceToExecutors(    app: ApplicationInfo,    assignedCores: Int,    coresPerExecutor: Option[Int],    worker: WorkerInfo): Unit = {  // If the number of cores per executor is specified, we divide the cores assigned  // to this worker evenly among the executors with no remainder.  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.  val numExecutors = { assignedCores / _ }.getOrElse(1)  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)  for (i <- 1 to numExecutors) {    val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)    // TODO : 当前 这个 application 追加 一次  Executor    val exec = app.addExecutor(worker, coresToAssign, allocated)    //TODO: 给worker 线程 发送 launchExecutor 命令    launchExecutor(worker, exec)    app.state = ApplicationState.RUNNING  }}ok,至此,spark最新版本 spark-3.0的Application 调度算法分析完毕!!!


