spark1.3.x与spark2.x启动executor不同的cpu core分配方式

***这里的executor在worker上分配策略以spreadOut 为例***


for (app <- waitingApps if app.coresLeft > 0) { //对还未被完全分配资源的apps处理
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根据core Free对可用Worker进行降序排序。
        val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker
        val assigned = new Array[Int](numUsable) //候选Worker,每个Worker一个下标,是一个数组,初始化默认都是0
        var toAssign = math.min(app.coresLeft,还要分配的cores = 集群中可用Worker的可用cores总和(10), 当前未分配core(5)中找最小的
        var pos = 0
        while (toAssign > 0) {
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判断当前worker空闲cpu是否大于当前数组已经分配core值
            toAssign -= 1
            assigned(pos) += 1 //当前下标pos的Worker分配1个core +1
          pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker
        // Now that we‘ve decided how many cores to give on each node, let‘s actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) { //如果assigned数组中的值>0,将启动一个executor在,指定下标的机器上。
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息
            launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去启动Executor
            app.state = ApplicationState.RUNNING

以上红色代码清晰的展示了在平均分配的场景下,每次会给worker分配1个core,所以说在spark-submit中如果设置了 --executor-cores属性未必起作用;


 private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft,

    /** Return whether the specified worker can launch an executor for this app. */
    def canLaunchExecutor(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We‘re adding cores to an existing executor, so no need
        // to check memory and executor limits
        keepScheduling && enoughCores

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application‘s limits
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          if (spreadOutApps) {
            keepScheduling = false
      freeWorkers = freeWorkers.filter(canLaunchExecutor)


当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程.序列图如下: 1. SparkContext创建TaskScheduler和DAG Scheduler SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它.如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例.

spark-3.0 application 调度算法解析

spark 各个版本的application 调度算法还是有这明显的不同之处的.从spark1.3.0 到 spark 1.6.1.spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改.下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制. private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO schedul

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.


转 spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand