SPARK的MAster资源调度原理(源码)分析

SPARK的MAster资源分配算法(SPARK1.3)

master资调度通过源码中的 org.apache.spark.deploy.master包下的schedule()方法实现

步骤如下:

  1. 首先判断master是否是alive状态,如果不是alive则返回,也就是只有活动的master才会进行资源调度,standby master是不会进行资源调度的
  2. 把之前注册的worker中的alive状态的worker传入 Random.shuffer方法,该方法主要是把worker顺序打乱,返回一个数组
  3. 获取返回的worker的数量
  4. 用for循环进行driver调度,只有启用yarn-cluster模式提交application才会进行driver调度,因为yarn-client和 standalone模式都是在提交的客户端启动driver,不需要调度
  5. for循环遍历WaittingDrivers ArrayBuffer,里面用while循环判断如果有alive的worker没有遍历,并且driver为为启动状态,则继续遍历
  6. 如果这个worker的内存>=driver需要的内存并且CPU>=driver需要的CPU,则启动driver,将driver从WaittingDrivers队列中移除
  7. 启动driver的方法为launchDriver,将driver加入worker的内部缓存,将worker剩余的内存、CPU减去driver需要的内存、CPU,worker也被加入到driver缓存结构中,然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来,然后将driver状态改为RUNNING
  8. driver启动后,进行application的调度,这里有两个算法,spreadOutApps和非spreadOutApps算法,这个在代码的SparkConf里可以设置, ("spark.deploy.spreadOut", true),默认是为true,启用spreadoutApps
  9. for遍历WaittingApps中的application,并且用if守卫过滤出还需要进行CPU分配的application,for循环里再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序(可以被application使用的worker必须是可用内存大于等于application最小executor需要的需要的内存,并且没有被application启用过)
  10. 把需要分配的application数量放入一个数组,然后获取最终需要分配的CPU数量=application需要分配的CPU和worker总CPU的最小值
  11. while遍历worker,如果worker还有可分配的CPU,将总的需要分配的CPU-1,给这个worker分配的CPU+1,指针移到下一个CPU。循环一直到CPU分配完,这种分配算法的结果是application的CPU尽可能的平均分配到了各个worker上,应用程序尽可能多的运行在所有的Node上
  12. 给worker分配完CPU后,遍历分配到CPU的worker,在每个application内部缓存结构中,添加executor,创建executorDSC对象,其中封装了给这个executor分配多少 CPU core,然后在worker上启动executor,将application状态改为RUNNING
  13. 如果是非spreadOutApps算法,刚好相反,先把每个worker的CPU全部分配完,在分配下一个worker的CPU,

    以下是核心源码:

private def schedule() {

/*

* 首先判断 master是否是alive状态

*/

if (state != RecoveryState.ALIVE) { return }

// First schedule drivers, they take strict precedence over applications

// Randomization helps balance drivers

//把alive状态的worker(之前注册的)传入Random.shuffle方法,把worker随机打乱

val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

//获取当前可用worker的数量(随机打乱后)

val numWorkersAlive = shuffledAliveWorkers.size

var curPos = 0

/*

* driver的调度机制,遍历waitingDrivers这个ArrayBuffer

* 只有用 yarn-cluster模式提交的时候,才会注册driver,并导致driver被调度,因为standalone和yarn-client模式

* 都会在本地启动driver,而不会注册,更不会调度

*/

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers

// We assign workers to each waiting driver in a round-robin fashion. For each driver, we

// start from the last worker that was assigned a driver, and continue onwards until we have

// explored all alive workers.

var launched = false

var numWorkersVisited = 0

//当还有alive的worker没有遍历,并且driver没有启动,则继续遍历worker

while (numWorkersVisited < numWorkersAlive && !launched) {

val worker = shuffledAliveWorkers(curPos)

numWorkersVisited += 1

//如果这个worker空闲内存>=driver需要的内存并且worker的空闲CPU>=driver需要的CPU

if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

//启动driver

launchDriver(worker, driver)

//并且经driver从waitingDrivers队列中移除

waitingDrivers -= driver

launched = true

}

//指针指向下一个worker

curPos = (curPos + 1) % numWorkersAlive

}

}

// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

// in the queue, then the second app, etc.

/*

* application的调度机制

* 这里两种算法,可以在sparkconf设置,默认为true(spreadOutApps算法)

*/

if (spreadOutApps) {

// Try to spread out each app among all the nodes, until it has all its cores

//遍历waitingApps 中的application,并且用if守卫过滤出还需要进行CPU分配的application

for (app <- waitingApps if app.coresLeft > 0) {

//再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序

//可以被application使用的worker必须是可用内存大于application最小Executor需要的内存,并且没有被该application启用过

val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)

.filter(canUse(app, _)).sortBy(_.coresFree).reverse

//创建一个数组,存储需要分配的CPU数量

val numUsable = usableWorkers.length

val assigned = new Array[Int](numUsable) // Number of cores to give on each node

//获取到底需要分配多少CPU,取application需要分配的CPU和worker总共CPU数量的最小值

var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

var pos = 0

while (toAssign > 0) { }

// Now that we‘ve decided how many cores to give on each node, let‘s actually give them

//给worker分配完CPU后,遍历worker

for (pos <- 0 until numUsable) {

//只要worker分配到了CPU

if (assigned(pos) > 0) {

//首先在每个application内部缓存结构中,添加executor,

//创建executorDSC对象,其中封装了给这个executor分配多少 CPU core

val exec = app.addExecutor(usableWorkers(pos), assigned(pos))

//那么就在worker上启动Executor

launchExecutor(usableWorkers(pos), exec)

//将application状态设置为RUNNING

app.state = ApplicationState.RUNNING

}

}

}

} else {

// Pack each app into as few nodes as possible until we‘ve assigned all its cores

for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {

for (app <- waitingApps if app.coresLeft > 0) {

if (canUse(app, worker)) {

val coresToUse = math.min(worker.coresFree, app.coresLeft)

if (coresToUse > 0) {

val exec = app.addExecutor(worker, coresToUse)

launchExecutor(worker, exec)

app.state = ApplicationState.RUNNING

}

}

}

}

}

}

//executor的启动

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {

logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

//将executor加入worker内部缓存

worker.addExecutor(exec)

//向worker发送LaunchExecutor消息

worker.actor ! LaunchExecutor(masterUrl,

exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)

//向executor对应的application发送ExecutorAdded消息

exec.application.driver ! ExecutorAdded(

exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

}

//Driver的启动

def launchDriver(worker: WorkerInfo, driver: DriverInfo) {

logInfo("Launching driver " + driver.id + " on worker " + worker.id)

//将driver加入到worker的内部缓存结构

//将worker剩余的内存、CPU减去driver使用的内存和CPU

worker.addDriver(driver)

//worker也被加入到driver内部缓存结构中

driver.worker = Some(worker)

//然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来

worker.actor ! LaunchDriver(driver.id, driver.desc)

//将driver状态改为RUNNING

driver.state = DriverState.RUNNING

}

时间: 2024-08-06 03:43:56

SPARK的MAster资源调度原理(源码)分析的相关文章

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

小记--------spark资源调度机制源码分析-----Schedule

Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类 /** * driver调度机制原理代码分析Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability change

通过 spark.files 传入spark任务依赖的文件源码分析

版本:spak2.3 相关源码:org.apache.spark.SparkContext 在创建spark任务时候,往往会指定一些依赖文件,通常我们可以在spark-submit脚本使用--files /path/to/file指定来实现. 但是公司产品的架构是通过livy来调spark任务,livy的实现其实是对spark-submit的一个包装,所以如何指定依赖文件归根到底还是在spark这边.既然不能通过命令行--files指定,那在编程中怎么指定?任务在各个节点上运行时又是如何获取到这

[五]类加载机制双亲委派机制 底层代码实现原理 源码分析 java类加载双亲委派机制是如何实现的

Launcher启动类 本文是双亲委派机制的源码分析部分,类加载机制中的双亲委派模型对于jvm的稳定运行是非常重要的 不过源码其实比较简单,接下来简单介绍一下 我们先从启动类说起 有一个Launcher类   sun.misc.Launcher; 仔细看下这简短的几行注释,可以得到有用的信息 ps:直接IDE里面查看反编译的,看不到注释的,可以下载openJDK查看源码,我的这个版本是openjdk-8-src-b132-03_mar_2014 sun.misc.Launcher这个类是系统用于

Mybatis Interceptor 拦截器原理 源码分析

Mybatis采用责任链模式,通过动态代理组织多个拦截器(插件),通过这些拦截器可以改变Mybatis的默认行为(诸如SQL重写之类的),由于插件会深入到Mybatis的核心,因此在编写自己的插件前最好了解下它的原理,以便写出安全高效的插件. 代理链的生成 Mybatis支持对Executor.StatementHandler.PameterHandler和ResultSetHandler进行拦截,也就是说会对这4种对象进行代理. 通过查看Configuration类的源代码我们可以看到,每次都

AtomicInteger原理&amp;源码分析

转自https://www.cnblogs.com/rever/p/8215743.html 深入解析Java AtomicInteger原子类型 在进行并发编程的时候我们需要确保程序在被多个线程并发访问时可以得到正确的结果,也就是实现线程安全.线程安全的定义如下: 当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的. 举个线程不安全的例子.假如我们想实现一个功能来统计

Tomcat7.0源码分析——请求原理分析(中)

前言 在<TOMCAT7.0源码分析--请求原理分析(上)>一文中已经介绍了关于Tomcat7.0处理请求前作的初始化和准备工作,请读者在阅读本文前确保掌握<TOMCAT7.0源码分析--请求原理分析(上)>一文中的相关知识以及HTTP协议和TCP协议的一些内容.本文重点讲解Tomcat7.0在准备好接受请求后,请求过程的原理分析. 请求处理架构 在正式开始之前,我们先来看看图1中的Tomcat请求处理架构. 图1 Tomcat请求处理架构 图1列出了Tomcat请求处理架构中的主

Tomcat7.0源码分析——请求原理分析(上)

前言 谈起Tomcat的诞生,最早可以追溯到1995年.近20年来,Tomcat始终是使用最广泛的Web服务器,由于其使用Java语言开发,所以广为Java程序员所熟悉.很多人早期的J2EE项目,由程序员自己实现Jsp页面或者Servlet接受请求,后来借助Struts1.Struts2.Spring等中间件后,实际也是利用Filter或者Servlet处理请求,大家肯定要问了,这些Servlet处理的请求来自哪里?Tomcat作为Web服务器是怎样将HTTP请求交给Servlet的呢? 本文就