Spark schedule资源调度分配详解

一:任务调度与资源调度的区别

1. 任务调度:是通过DAGScheduler,TaskScheduler,SchedulerBackend等进行的作业调度;

2. 资源调度:是指应用程序如何获得资源;

3. 任务调度时在资源调度的基础上进行的,没有资源调度那么任务调度就成为了无源之水,无本之木。

二:资源调度内幕天机揭秘

1. 因为Master负责资源管理和调度,所以资源调度的方法schedule位于Master.scala这个类中,当注册程序或者资源发生改变的时候都会导致schedule的调用,例如注册程序如下:

case RegisterApplication(description, driver) => {
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY) {
    // ignore, don‘t send response
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(description, driver)
    registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
    persistenceEngine.addApplication(app)
    driver.send(RegisteredApplication(app.id, self))
    schedule()
  }
}

2. schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少,Worker增加或者减少等);

/**
 * Schedule the currently available resources among waiting apps. This method will be called
 * every time a new app joins or resource availability changes.
 */

3. 当前Master必须是Alive的方式采用进行资源的调度,如果不是ALIVE的状态会直接返回,也就是说StandbyMaster不会进行Application的资源调度;

if (state != RecoveryState.ALIVE) { return }

4. 使用Random.shuffle把Master中保留的集群中所有Worker的信息随机打乱;为啥要打乱?为了负载均衡。

// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers

5. Shuffle源码详解:

将worker存入到ArrayBuffer中并赋值给buf.

swap函数: 是将索引位置上的Worker两两进行交换.

For循环: 从buf中最后一个元素开始循环,一直到索引为3,其中的nextInt是取0到n-1的随机数,然后调用swp()函数,将n-1和k进行交换,这样执行结束后,buf中的Worker顺序完全被打乱了。

Workers的源码是

val workers = new HashSet[WorkerInfo]

其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置;

10. 接下来要判断所有Worker中那些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作;

for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {

当SparkSubmit指定Driver在Cluster模式的情况下,此时Driver会加入waitingDriver等待列表中,在每个DriverInfo的DriverDescription中有要启动的Driver时候对Worker的内存及cores要求等内容。

Supervise: 如果是Cluster集群模式的话,SparkSubmit的时候,可以设置suprvise,Driver挂掉之后可以自动重启,但这个前提是Driver是在进群中的。

private[deploy] case class DriverDescription(
    jarUrl: String,
    mem: Int,
    cores: Int,
    supervise: Boolean,
    command: Command) {

  override def toString: String = s"DriverDescription (${command.mainClass})"
}
11. launchDriver源码:launch到worker中去了,而这个worker就是我们前面Shuffle之后打乱的Worker,此时就把Driver放到了Worker上。
//判断Worker上的内存和可用的cores是否满足Driver的要求
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
  launchDriver(worker, driver)

在符合资源要求的情况下,然后采用随机打乱后的一个Worker来启动Driver。

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
//打印log日志信息
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
//worker:是workerInfo是对Worker的描述信息
//将Driver加入到Worker上的这个信息保存到WorkerInfo中,而WorkerInfo是master端持有的。
  worker.addDriver(driver)
//同时,Driver也要将自己在那个Worker上面的信息加入到自己的描述信息里面//DriverInfo
  driver.worker = Some(worker)
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  driver.state = DriverState.RUNNING
}

Matser发指令给Worker,让远程的Worker启动Driver.

worker.endpoint.send(LaunchDriver(driver.id, driver.desc))

启动Driver之后,Driver的状态就变成了RUNNING。先启动Driver才会发生后续的一切的资源调度的模式。

driver.state = DriverState.RUNNING
12. startExecutorsOnWorkers():为程序在Worker上启动Executor。

Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都是放在调试的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础上才能够满足下一个应用程序资源的分配;

Master调用了startExecutorsOnWorkers方法,但是在那个Worker上分配Executor,还不知道。

private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.

为应用程序具体分配Executor之前要判断应用程序是否还需要分配Core,如果不需要则不会为应用程序分配Executor;

// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {

coresLeft:

private[master] def coresLeft: Int = requestedCores - coresGranted

默认情况下一个executor分配一个core,coresPerExecutor是获得每个Executor上分配多少个cores。

val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor

具体分配Executor之前要对要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序产生计算资源由大到小的usableWorkers(可用的Worker)数据结构:

// Filter out workers that don‘t have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
    worker.coresFree >= coresPerExecutor.getOrElse(1))//至少大于一个
//所有可用而且是符合条件的Worker进行排序,将cores多的最先选出
 .sortBy(_.coresFree).reverse// _ => worker

scheduleExecutorsOnWorkers返回为每个Worker上分配的cores的数组。

然后将返回值复制给assignedCores

val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

在FIFO的情况下默认是spreadOutApps来让应用程序尽可能多的运行在尽可能的Node上;

// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

为应用程序分配Executors有两种方式

第一种方式:是尽可能在集群的所有Worker上分配Executor,这样利于增大并发处理能力,这种方式往往会带来潜在的更好的数据本地性,资源分配的时候已经考虑到了最大化的本地性。

* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application‘s
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
  val coresPerExecutor = app.desc.coresPerExecutor
//默认最小为每个executor分配一个core
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
//每个Executor需要的memory
val memoryPerExecutor = app.desc.memoryPerExecutorMB
//已经给每个Worker上分配的cores
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
13. 具体在集群上分配Cores的时候会尽可能的满足我们的需求:

为啥要求最小值:因为可能我们的程序需要1000个cores,但是集群中只有100个cores。所以只能先分配100个cores,所以就要增加批次。

//求最小值
var coresToAssign = math.min(app.coresLeft, //程序需要的cores
//可用的Worker上面的free cores总和
usableWorkers.map(_.coresFree).sum)
14. 判断Worker是否可以启动一个Executor。
/** Return whether the specified worker can launch an executor for this app. */
//筛选条件
def canLaunchExecutor(pos: Int): Boolean = {
//必须要大于等于,因为如果默认一个cores都不能满足的话,无法启动了。
  val keepScheduling = coresToAssign >= minCoresPerExecutor
  val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
  // Otherwise, if there is already an executor on this worker, just give it more cores.
//
  val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
  if (launchingNewExecutor) {
//具体executor上分配的内存
    val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
    val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
    val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
    keepScheduling && enoughCores && enoughMemory && underLimit
  } else {
//如果Worker上的executor已经存在,可用直接往executor上增加cores
    // We‘re adding cores to an existing executor, so no need
    // to check memory and executor limits
    keepScheduling && enoughCores
  }
}
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application‘s limits
//根据filter就过滤出来满足在Worker上launchExecutor的条件
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
//可用的Worker不是空的话,就执行下面的循环。
while (freeWorkers.nonEmpty) {
  freeWorkers.foreach { pos =>
    var keepScheduling = true
    while (keepScheduling && canLaunchExecutor(pos)) {
      coresToAssign -= minCoresPerExecutor
      assignedCores(pos) += minCoresPerExecutor

如果是每个Worker下面只能够为当前的应用程序分配一个Executor的话,每次是分配一个Core!

如果是spreadOutApps(也是系统默认的情况下)的时候,会尽量使用集群中所有的executors. 每次都会给executor增加一个core。

如果不是spreadOutApps的时候,每次都会给executor增加一个core,会一直循环当前程序的executor上的freeCores,所以会占用本机器上的尽可能多的cores。

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
  assignedExecutors(pos) = 1
} else {
  assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
//如果不是spreadOutApps的话,会尽可能用当前的机器去处理程序的一切的cores需求,也就是executor会占用尽可能多的cores。
if (spreadOutApps) {
  keepScheduling = false
}

至此已经决定了在那台分配多少个executor,每个executor上分配多少个core。

15. 下面就开始具体分配了。

// Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them
//前面是决定了每个worker上分配多少个cores,下面具体完成了。
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
  allocateWorkerResourceToExecutors(
    app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
16. 具体看一下allocateWorkerResourceToExecutprs函数具体是如何实现的。
/**
 * Allocate a worker‘s resources to one or more executors.
 * @param app the info of the application which the executors belong to
 * @param assignedCores number of cores on this worker for this application
 * @param coresPerExecutor number of cores per executor
 * @param worker the worker info
 */
private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {
//将要分配executor的元数据信息加入到app,
    val exec = app.addExecutor(worker, coresToAssign)
//luanchExecutor,分配executor
    launchExecutor(worker, exec)
//分配executor之后,application就为RUNNING
    app.state = ApplicationState.RUNNING
  }
}
17. addExecutor:    返回executor的描述信息。
private[master] def addExecutor(
    worker: WorkerInfo,
    cores: Int,
    useID: Option[Int] = None): ExecutorDesc = {
  val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
  executors(exec.id) = exec
//在分配前就已经确定为executor分配多少cores。
  coresGranted += cores //为executor分配cores添加到executor描述信息中
  exec
}
18. 准备具体要为当前应用程序分配的Executor信息后,Master要通过远程通信发指令给Worker来具体启动ExecutorBackend进程;
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)
//让Worker接收到LaunchExecutor的指令。
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
19. 紧接着给我们应用程序的Driver发送一个ExecutorAdded的信息;
exec.application.driver.send(
  ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
时间: 2024-10-05 04:44:51

Spark schedule资源调度分配详解的相关文章

Spark Streaming 源码详解

原地址 本系列内容适用范围: * 2015.12.05 update, Spark 1.6 全系列 √ (1.6.0-preview,尚未正式发布) * 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2) * 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1) * 2015.04.17 update, Spark 1.3 全系列 √ (1.3.0, 1.3.1) 概述 0.1 Spark

Spark技术内幕:Executor分配详解

当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程.序列图如下: 1. SparkContext创建TaskScheduler和DAG Scheduler SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它.如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例.

Spark各运行模式详解

一.测试或实验性质的本地运行模式 (单机) 该模式被称为Local[N]模式,是用单机的多个线程来模拟Spark分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题. 其中N代表可以使用N个线程,每个线程拥有一个core.如果不指定N,则默认是1个线程(该线程有1个core). ? ? 指令示例: ? ? 1)spark-shell --master local 效果是一样的 2)spark-shell --master local[4] 代表会有4个线程(每个线程一个core)来并发执行

Hadoop学习之路(8)Yarn资源调度系统详解

文章目录 1.Yarn介绍 2.Yarn架构 2.1 .ResourceManager 2.2 .ApplicationMaster 2.3 .NodeManager 2.4 .Container 2.5 .Resource Request 及 Container 2.6 .JobHistoryServer 2.7.Timeline Server 3.yarn应用运行原理 3.1.yarn应用提交过程 3.2.mapreduce on yarn 4. yarn使用 4.1 .配置文件 4.2.

(版本定制)第4课:Spark Streaming事务处理彻底详解

本篇文章主要从二个方面展开: 一.Exactly Once 二.输出不重复 事务: 银行转帐为例,A用户转账给B用户,B用户可能收到多笔钱,如何保证事务的一致性,也就是说事务输出,能够输出且只会输出一次,即A只转一次,B只收一次. 从事务视角解密SparkStreaming架构: SparkStreaming应用程序启动,会分配资源,除非整个集群硬件资源奔溃,一般情况下都不会有问题.SparkStreaming程序分成而部分,一部分是Driver,另外一部分是Executor.Receiver接

spark[源码]-sparkContext详解

h2 { color: #fff; background-color: #7CCD7C; padding: 3px; margin: 10px 0px } h3 { color: #fff; background-color: #008eb7; padding: 3px; margin: 10px 0px } spark简述 sparkContext在Spark应用程序的执行过程中起着主导作用,它负责与程序和spark集群进行交互,包括申请集群资源.创建RDD.accumulators及广播变量

第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详

</pre></h2><div><p>本节课内容:</p><p>1.     TaskSchedulerBackend与SchedulerBackend</p><p>2.     FIFO与FAIR两种调度模式</p><p>3.     Task数据本地性资源的分配</p></div><h3>一.Scheduler运行过程(Spark-shell角度)

Spark Web UI 监控详解

Spark集群环境配置 我们有2个节点,每个节点是一个worker,每个worker上启动一个Executor,其中Driver也跑在master上.每个Executor可使用的核数为2,可用的内存为2g,集群中所有Executor最大可用核数为4. conf/spark-defaults.conf 部分参数配置如下: spark.master spark://Master:7077 spark.executor.memory 2g spark.executor.cores 2 spark.co

Java虚拟机内存分配详解

简介 了解Java虚拟机内存分布的好处 1.了解Java内存管理的细节,有助于程序员编写出性能更好的程序.比如,在新的线程创建时,JVM会为每个线程创建一个专属的栈 (stack),其栈是先进后出的数据结构,这种方式的特点,让程序员编程时,必须特别注意递归方法要尽量少使用,另外栈的大小也有一定的限制,如果过多 的递归,容易导致stack overflow. 2.了解Java内存管理的细节,一旦内存管理出现问题,有助于找到问题的根本原因所在. 3.了解Java内存管理的内幕,有助于优化JVM,从而