spark1.3.x与spark2.x启动executor不同的cpu core分配方式

***这里的executor在worker上分配策略以spreadOut 为例***

1.3版本关键点:

for (app <- waitingApps if app.coresLeft > 0) { //对还未被完全分配资源的apps处理
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根据core Free对可用Worker进行降序排序。
        val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker
        val assigned = new Array[Int](numUsable) //候选Worker,每个Worker一个下标,是一个数组,初始化默认都是0
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//还要分配的cores = 集群中可用Worker的可用cores总和(10), 当前未分配core(5)中找最小的
        var pos = 0
        while (toAssign > 0) {
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判断当前worker空闲cpu是否大于当前数组已经分配core值
            toAssign -= 1
            assigned(pos) += 1 //当前下标pos的Worker分配1个core +1
          }
          pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker
        }
        // Now that we‘ve decided how many cores to give on each node, let‘s actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) { //如果assigned数组中的值>0,将启动一个executor在,指定下标的机器上。
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息
            launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去启动Executor
            app.state = ApplicationState.RUNNING
          }
        }
      }

以上红色代码清晰的展示了在平均分配的场景下,每次会给worker分配1个core,所以说在spark-submit中如果设置了 --executor-cores属性未必起作用;

但在2.x版本的spark中却做了这方面的矫正,它确实会去读取--executor-cores属性中的值,如果该值未设置则依然按照1.3.x的方式执行,代码如下:

 private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

    /** Return whether the specified worker can launch an executor for this app. */
    def canLaunchExecutor(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We‘re adding cores to an existing executor, so no need
        // to check memory and executor limits
        keepScheduling && enoughCores
      }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application‘s limits
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    assignedCores
  }

原文地址:https://www.cnblogs.com/zzq-include/p/9276915.html

时间: 2024-10-11 14:14:35

spark1.3.x与spark2.x启动executor不同的cpu core分配方式的相关文章

关于Linux启动时挂载rootfs的几种方式

一直对Linux启动时挂载根文件系统的过程存在着很多疑问,今天在水木精华区找到了有用的资料,摘录如下: 1.Linux启动时,经过一系列初始化之后,需要mount 根文件系统,为最后运行init进程等做准备,mount 根文件系统有这么几种方式: 1)文件系统已经存在于硬盘(或者类似的设备)的某个分区上了,kernel根据启动的命令行参数(root=/dev/xxx),直接进行mount. 这里有一个问题,在root文件系统本身还不存在的情况下,kernel如何根据/dev/xxx来找到对应的设

tomcat启动startup.bat一闪而过的问题处理方式

tomcat在启动时,会读取环境变量的信息,需要一个CATALINA_HOME 与JAVA_HOME的信息,CATALINA_HOME即tomcat的主目录,JAVA_HOME即Java安装的主目录,jdk的主目录.首先,要在环境变量处,配置JAVA_HOME,注意变量值是jdk的主目录,不是bin目录,并且不要加分号,如图: 然后,如果这样配置,startup.bat还是一闪而过,可以右键点击startup.bat,编辑,在文本的最后敲上pause,保存后重新运行startup.bat,这时候

一张图帮你记忆,Spring Boot 应用在启动阶段执行代码的几种方式

前言 有时候我们需要在应用启动时执行一些代码片段,这些片段可能是仅仅是为了记录 log,也可能是在启动时检查与安装证书 ,诸如上述业务要求我们可能会经常碰到 Spring Boot 提供了至少 5 种方式用于在应用启动时执行代码.我们应该如何选择?本文将会逐步解释与分析这几种不同方式 CommandLineRunner CommandLineRunner 是一个接口,通过实现它,我们可以在 Spring 应用成功启动之后 执行一些代码片段 @Slf4j @Component @Order(2)

win7启动后报丢失nscmk.dll解决解决方式

1.根据当前计算机选择下载64位或者32位nscmk.dll 2.拷贝nscmk.dll到相路径(32位:%windir%\system32\:64位:%windir%\SysWOW64\nscmk.dll) 3.在dos命令行中执行相应命令(32位:regsvr32 %windir%\system32\nscmk.dll /s:64位:regsvr32 %windir%\SysWOW64\nscmk.dll /s) 4.注销机器,登录后,发现该问题解决.

另一种使用SAP SAT事务码对通过浏览器启动的应用的性能测量和分析方式

We have several questions regarding the trace functionality in parallel session. Would you please kindly have a look at them when you are free? J We set a breakpoint in CL_AP_FO_TASK_SRVC~__EXECUTE_ACTION, When we click the "New->Clarification Req

Spark技术内幕:Executor分配详解

当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程.序列图如下: 1. SparkContext创建TaskScheduler和DAG Scheduler SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它.如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例.

spark-3.0 application 调度算法解析

spark 各个版本的application 调度算法还是有这明显的不同之处的.从spark1.3.0 到 spark 1.6.1.spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改.下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制. private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO schedul

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand