Spark Executor Driver资源调度小结

一、引子

在Worker Actor中,每次LaunchExecutor会创建一个CoarseGrainedExecutorBackend进程,Executor和CoarseGrainedExecutorBackend是1对1的关系。也就是说集群里启动多少Executor实例就有多少CoarseGrainedExecutorBackend进程。

那么到底是如何分配Executor的呢?怎么控制调节Executor的个数呢?

二、Driver和Executor资源调度

下面主要介绍一下Spark Executor分配策略:

我们仅看,当Application提交注册到Master后,Master会返回RegisteredApplication,之后便会调用schedule()这个方法,来分配Driver的资源,和启动Executor的资源。

schedule()方法是来调度当前可用资源的调度方法,它管理还在排队等待的Apps资源的分配,这个方法是每次在集群资源发生变动的时候都会调用,根据当前集群最新的资源来进行Apps的资源分配。

Driver资源调度:

随机的将Driver分配到空闲的Worker上去,详细流程请看我写的注释 :)

    // First schedule drivers, they take strict precedence over applications
    val shuffledWorkers = Random.shuffle(workers) // 把当前workers这个HashSet的顺序随机打乱
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍历活着的workers
      for (driver <- waitingDrivers) { //在等待队列中的Driver们会进行资源分配
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //当前的worker内存和cpu均大于当前driver请求的mem和cpu,则启动
          launchDriver(worker, driver) //启动Driver 内部实现是发送启动Driver命令给指定Worker,Worker来启动Driver。
          waitingDrivers -= driver //把启动过的Driver从队列移除
        }
      }
    }

Executor资源调度:

Spark默认提供了一种在各个节点进行round-robin的调度,用户可以自己设置这个flag

val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

在介绍之前我们先介绍一个概念,

可用的Worker:什么是可用,可用就是资源空闲足够且满足一定的规则来启动当前App的Executor。

Spark定义了一个canUse方法:这个方法接受一个ApplicationInfo的描述信息和当前Worker的描述信息。

1、当前worker的空闲内存
该app在每个slave要占用的内存
 (executor.memory默认512M)大

2、当前app从未在此worker启动过App

总结: 从这点看出,要满足:该Worker的当前可用最小内存要比配置的executor内存大,并且对于同一个App只能在一个Worker里启动一个Exeutor,如果要启动第二个Executor,那么请到其它Worker里。这样的才算是对App可用的Worker。

  /**
   * Can an app use the given worker? True if the worker has enough memory and we haven't already
   * launched an executor for the app on it (right now the standalone backend doesn't like having
   * two executors on the same worker).
   */
  def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
    worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
  }

SpreadOut分配策略:

SpreadOut分配策略是一种轮询集群各个Worker,为Executor比较平均的分配Worker资源,来启动创建Executor的策略,好处是负载均衡,坏处是会造成启动等待。

下面看看,默认的spreadOutApps模式启动App的过程:

1、等待分配资源的apps队列默认是FIFO的。

2、app.coresLeft表示的是该app还有cpu资源没申请到:  app.coresLeft  = 当前app申请的maxcpus - granted的cpus

3、遍历未分配完全的apps,继续给它们分配资源,

4、usableWorkers =  从当前ALIVE的Workers中过滤找出上文描述的可用Worker,然后根据cpus的资源空闲,从大到小给Workers排序。

5、当toAssign(即将要分配的的core数>0,就找到可以的Worker持续分配)

6、当可用Worker的free cores 大于 目前该Worker已经分配的core时,再给它分配1个core,这样分配是很平均的方法。

7、round-robin轮询可用的Worker循环

8、toAssign=0时结束循环,开始根据分配策略去真正的启动Executor。

举例: 1个APP申请了6个core, 现在有2个Worker可用。

那么: toAssign = 6,assigned = 2

那么就会在assigned(1)和assigned(0)中轮询平均分配cores,以+1 core的方式,最终每个Worker分到3个core,即每个Worker的启动一个Executor,每个Executor获得3个cores。

// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) { //对还未被完全分配资源的apps处理
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根据core Free对可用Worker进行降序排序。
        val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker
        val assigned = new Array[Int](numUsable) //候选Worker,每个Worker一个下标,是一个数组,初始化默认都是0
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//还要分配的cores = 集群中可用Worker的可用cores总和(10), 当前未分配core(5)中找最小的
        var pos = 0
        while (toAssign > 0) {
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判断当前worker空闲cpu是否大于当前数组已经分配core值
            toAssign -= 1
            assigned(pos) += 1 //当前下标pos的Worker分配1个core +1
          }
          pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) { //如果assigned数组中的值>0,将启动一个executor在,指定下标的机器上。
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息
            launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去启动Executor
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {

非SpreadOut分配策略:

非SpreadOut策略,该策略:会尽可能的根据每个Worker的剩余资源来启动Executor,这样启动的Executor的core是不均匀的。好处是加快了App的Executor启动,坏处,每个Executor的并行度和负载均衡就不能够保证了。

当用户设定了参数spark.deploy.spreadOutfalse时,触发此游戏分支,跑个题,有些困了。。

1、遍历可用Workers

2、且遍历Apps

3、比较当前Worker的可用core和app还需要分配的core,取最小值当做还需要分配的core

4、如果coreToUse大于0,则直接拿可用的core来启动Executor。。奉献当前Worker全部资源。(Ps:挨个榨干每个Worker的剩余资源。。。。)

举例: App申请12个core,3个Worker,Worker1剩余1个core, Worke2r剩7个core, Worker3剩余4个core.

这样会启动3个Executor,Executor1 占用1个core, Executor2占用7个core, Executor3占用4个core.

总结:这样是尽可能的满足App,让其尽快执行,而忽略了其并行效率和负载均衡。

 } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) { //直接问当前worker是有空闲的core
            val coresToUse = math.min(worker.coresFree, app.coresLeft) //有则取,不管多少
            if (coresToUse > 0) { //有
              val exec = app.addExecutor(worker, coresToUse) //直接启动
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

三、总结:

1、 在Worker Actor中,每次LaunchExecutor会创建一个CoarseGrainedExecutorBackend进程,一个Executor对应一个CoarseGrainedExecutorBackend

2、针对同一个App,每个Worker里只能有一个针对该App的Executor存在,切记。如果想让整个App的Executor变多,设置SPARK_WORKER_INSTANCES,让Worker变多。

3、Executor的资源分配有2种策略:

3.1、SpreadOut :一种轮询集群各个Worker,为Executor比较平均的分配Worker资源,来启动创建Executor的策略,好处是负载均衡,坏处是会造成启动等待。

3.2、非SpreadOut:会尽可能的根据每个Worker的剩余资源来启动Executor,这样启动的Executor的core是不均匀的。好处是加快了App的Executor启动,坏处,每个Executor的并行度和负载均衡就不能够保证了。

行文仓促,如有不正之处,请指出,欢迎讨论 :)

——EOF——

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/38763985

时间: 2024-10-07 08:45:29

Spark Executor Driver资源调度小结的相关文章

Spark Executor Driver资源调度汇总

一.简介 于Worker Actor于,每次LaunchExecutor这将创建一个CoarseGrainedExecutorBackend流程.Executor和CoarseGrainedExecutorBackend是1对1的关系.也就是说集群里启动多少Executor实例就有多少CoarseGrainedExecutorBackend进程. 那么究竟是怎样分配Executor的呢?怎么控制调节Executor的个数呢? 二.Driver和Executor资源调度 以下主要介绍一下Spark

Spark的Driver节点和Executor节点

转载自:http://blog.sina.com.cn/s/blog_15fc03d810102wto0.html 1.驱动器节点(Driver) Spark的驱动器是执行开发程序中的 main方法的进程.它负责开发人员编写的用来创建SparkContext.创建 RDD,以及进行 RDD 的转化操作和行动操作代码的执行.如果你是用spark shell,那么当你启动 Spark shell的时候,系统后台自启了一个 Spark 驱动器程序,就是在Spark shell 中预加载的一个叫作 sc

Spark Executor内幕彻底解密(DT大数据梦工厂)

内容: 1.Spark Executor工作原理图: 2.ExecutorBackend注册源码解密: 3.Executor实例化内幕: 4.Executor具体是如何工作的? 1.Master发指令给Worker启动Executor: 2.Worker接受到Master发送来的指令,通过ExecutorRunner启动另外一个进程来运行Executor: 3.此时会启动粗粒度的ExecutorBackend(CoarseGrainedExecutorBackend): 4.CoarseGrai

Spark Executor 概述

Spark Executor 工作原理: 1. 在CoarseGrainedExecutorBackend启动时向Driver注册Executor,其实质是注册ExecutorBackend实例,和Executor实例之间没有直接关系 2. CoarseGrainedExecutorBackend 是 Executor 运行所在的进程名称,Executor才是真正处理Task的对象.Executor内部是通过线程池的方式来完成Task的计算的 3. CoarseGrainedExecutorBa

Spark Executor内幕彻底解密:Executor工作原理图、ExecutorBackend注册源码解密、Executor实例化内幕、Executor具体工作内幕

本课主题 Spark Executor 工作原理图 ExecutorBackend 注册源码鉴赏和 Executor 实例化内幕 Executor 具体是如何工作的 Spark Executor 工作原理图 第一步:Master 发指令给 Worker 启动 Executor: 第二步:Worker 接收到 Master 发送过来的指令通过 ExecutorRunner 远程启动另外一个线程来运行 Executor: 第三步:通过发送 RegisterExecutor 向 Driver 注册 E

Spark(五十):使用JvisualVM监控Spark Executor JVM

引导 Windows环境下JvisulaVM一般存在于安装了JDK的目录${JAVA_HOME}/bin/JvisualVM.exe,它支持(本地和远程)jstatd和JMX两种方式连接远程JVM. jstatd (Java Virtual Machine jstat Daemon)——监听远程服务器的CPU,内存,线程等信息 JMX(Java Management Extensions,即Java管理扩展)是一个为应用程序.设备.系统等植入管理功能的框架.JMX可以跨越一系列异构操作系统平台.

SPARK的MAster资源调度原理(源码)分析

SPARK的MAster资源分配算法(SPARK1.3) master资调度通过源码中的 org.apache.spark.deploy.master包下的schedule()方法实现 步骤如下: 首先判断master是否是alive状态,如果不是alive则返回,也就是只有活动的master才会进行资源调度,standby master是不会进行资源调度的 把之前注册的worker中的alive状态的worker传入 Random.shuffer方法,该方法主要是把worker顺序打乱,返回一

Spark技术内幕:Executor分配详解

当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程.序列图如下: 1. SparkContext创建TaskScheduler和DAG Scheduler SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它.如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例.

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar