Spark系列(七)Master中的资源调度

资源调度

说明:

Application的调度算法有两种,分别为spreadOutApps和非spreadOutApps

spreadOutApps

  • 在spark-submit脚本中,可以指定要多少个executor,executor需要多少个cpu及多少内存,基于该机制,最后executor的实际数量,以及每个executor的cpu可能与配置是不一样的。
  • 因为spreadOutApps调度算法的总是基于总CPU总和来分配,比如要求3个executor每个要3个CPU,如果有9个worker每个有1个CPU,因为总共要分配9个core,所以每个worker分配一个core然后每个worker启动一个executor
  • 最后启动9个executor每个executor1个cput core

非spreadOutApps

  • 每个application都尽可能分配到尽量少的worker上,比如总共有10个worker,每个有10个core app总共要分配20个core,那么其实只会分配到两个worker上,每个worker都占满10个core.

 

Schdule方法源码分析

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule() {
    // 判断master状态,不为ALIVE时直接返回
    if (state != RecoveryState.ALIVE) { return }
 
    // First schedule drivers, they take strict precedence over applications
10      // Randomization helps balance drivers
11      // 获取状态为ALIVE的worker,并且随机打乱
12      val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
13      // 可用worker数量
14      val numWorkersAlive = shuffledAliveWorkers.size
15      var curPos = 0
16   
17      // diriver调度过程(yarn-client模式下)
18      for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
19        // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
20        // start from the last worker that was assigned a driver, and continue onwards until we have
21        // explored all alive workers.
22        var launched = false
23        var numWorkersVisited = 0
24        // 判读还有可用的worker且Driver还未启动
25        while (numWorkersVisited < numWorkersAlive && !launched) {
26          val worker = shuffledAliveWorkers(curPos)
27          numWorkersVisited += 1
28          // 判断当前worker空闲内存是否大于等于driver需要的内存,且Worker空闲的core数量大于等于dirver需要的core的数量
29          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
30            // 启动driver
31            launchDriver(worker, driver)
32            waitingDrivers -= driver
33            launched = true
34          }
35          curPos = (curPos + 1) % numWorkersAlive
36        }
37      }
38   
39      // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
40      // in the queue, then the second app, etc.
41      // spreadOutApps调度方式
42      if (spreadOutApps) {
43        // Try to spread out each app among all the nodes, until it has all its cores
44        // 遍历需要调度的app(Application),且该app中的core还需要调度
45        for (app <- waitingApps if app.coresLeft > 0) {
46          val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
47            .filter(canUse(app, _)).sortBy(_.coresFree).reverse
48          // 可用worker的数量
49          val numUsable = usableWorkers.length
50          // 存放app 需要分配core的结果
51          val assigned = new Array[Int](numUsable) // Number of cores to give on each node
52          // 获取Application剩余需要分配的cpu数量与worker总共可用cpu数量中的最小值
53          var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
54          var pos = 0
55          while (toAssign > 0) {
56            // 如果worker空闲的cpu数量大于已经分配出去的cpu数量,那么woker还可继续分配cpu
57            if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
58              // 还需分配core的总数量减1
59              toAssign -= 1
60              // 在已分配app core结果集中加1
61              assigned(pos) += 1
62            }
63            pos = (pos + 1) % numUsable
64          }
65          // Now that we‘ve decided how many cores to give on each node, let‘s actually give them
66          for (pos <- 0 until numUsable) {
67            if (assigned(pos) > 0) {
68              // 根据WorkerInfo和所需的core构建ExecutorDesc
69              val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
70              // 启动Executor
71              launchExecutor(usableWorkers(pos), exec)
72              app.state = ApplicationState.RUNNING
73            }
74          }
75        }
76      } 
77      // 非spreadOutApps调度方式
78      else {
79        // Pack each app into as few nodes as possible until we‘ve assigned all its cores
80        // 过滤出可用的worker
81        for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
82          // 获取需要分配core的app
83          for (app <- waitingApps if app.coresLeft > 0) {
84            // 判读app是否可以使用该worker
85            if (canUse(app, worker)) {
86              // 取worker空闲core与app需分配core中的最小值
87              val coresToUse = math.min(worker.coresFree, app.coresLeft)
88              if (coresToUse > 0) {
89                // 根据WorkerInfo和所需的core构建ExecutorDesc
90                val exec = app.addExecutor(worker, coresToUse)
91                // 启动Executor
92                launchExecutor(worker, exec)
93                app.state = ApplicationState.RUNNING
94              }
95            }
96          }
97        }
98      }
99    }
时间: 2024-11-05 14:16:07

Spark系列(七)Master中的资源调度的相关文章

iOS流布局UICollectionView系列七——三维中的球型布局

摘要: 类似标签云的球状布局,也类似与魔方的3D布局 iOS流布局UICollectionView系列七--三维中的球型布局 一.引言 通过6篇的博客,从平面上最简单的规则摆放的布局,到不规则的瀑布流布局,再到平面中的圆环布局,我们突破了线性布局的局限,在后面,我们将布局扩展到了空间,在Z轴上进行了平移,我们实现了一个类似UIPickerView的布局模型,其实我们还可以再进一步,类比于平面布局,picKerView只是线性排列布局在空间上的旋转与平移,这次,我们更加充分了利用一下空间的尺寸,来

Spark系列(五)Master主备切换机制

Spark Master主备切换主要有两种机制,之中是基于文件系统,一种是基于Zookeeper.基于文件系统的主备切换机制需要在Active Master挂掉后手动切换到Standby Master上,而基于Zookeeper的主备切换机制可以实现自动切换Master. 切换流程图 流程说明: Standby Master模式 1. 使用持久化引擎读取持久化的storeApps.storeDrivers.storeWorkers,持久化引擎有FileSystemPersistenceEngin

Spark 系列(七)—— 基于 ZooKeeper 搭建 Spark 高可用集群

一.集群规划 这里搭建一个 3 节点的 Spark 集群,其中三台主机上均部署 Worker 服务.同时为了保证高可用,除了在 hadoop001 上部署主 Master 服务外,还在 hadoop002 和 hadoop003 上分别部署备用的 Master 服务,Master 服务由 Zookeeper 集群进行协调管理,如果主 Master 不可用,则备用 Master 会成为新的主 Master. 二.前置条件 搭建 Spark 集群前,需要保证 JDK 环境.Zookeeper 集群和

【转载】Spark系列之运行原理和架构

参考 http://www.cnblogs.com/shishanyuan/p/4721326.html 1. Spark运行架构 1.1 术语定义 lApplication:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码: lDriver:Spark中的Driver即运行上述Application的main()函数并且创建SparkCon

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

【微信分享】王团结:如何用Hadoop/Spark构建七牛数据平台

摘要:7月30日,七牛数据平台工程师王团结就七牛内部使用的数据平台,深入分享了该团队在Flume.Kafka.Spark以及Streaming上的实践经验,并讲解了各个工具使用的注意点. 继" YARN or Mesos?Spark痛点探讨"." Mesos资源调度与管理的深入分享与交流".及" 主流SQL on Hadoop框架选择"之后,CSDN Spark微信用户群邀请了王团结为大家分享Hadoop/Spark在七牛数据平台的实战. 王团结

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD 抽象 2.2 Spark 编程接口 2.2.1 例子 – 监控日志数据挖掘 2.3 RDD 模型的优势 2.4 不适合用 RDDs 的应用 3 Spark 编程接口 3.1 Spark 中 RDD 的操作 3.2 举例应用 3.2.1 线性回归 3.2.2 PageRank 4 表达 RDDs 5

struts2官方 中文教程 系列七:消息资源文件

介绍 在本教程中,我们将探索使用Struts 2消息资源功能(也称为 resource bundles 资源绑定).消息资源提供了一种简单的方法,可以将文本放在一个视图页面中,通过应用程序,创建表单字段标签,并根据用户的语言环境将文本更改为特定的语言. 贴个本帖的地址,以免被爬:struts2官方 中文教程 系列七:消息资源文件  即 http://www.cnblogs.com/linghaoxinpian/p/6906720.html 下载本章节代码 信息资源属性文件 在Struts2 we