Spark 1.60的executor schedule

第一次看源码还是Spark 1.02。这次看新源码发现调度方式有了一些新的特征,在这里随便写一下。

不变的是,master还是接收Appclient和worker的消息,并且在接收RegisterApplication等消息后会执行一遍schedule()。schedule()依旧会先找到空闲的worker用以执行waitingDrivers。但是调度Executor的方式有了一点变化。

 1   private def startExecutorsOnWorkers(): Unit = {
 2     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
 3     // in the queue, then the second app, etc.
 4     for (app <- waitingApps if app.coresLeft > 0) {
 5       val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
 6       // Filter out workers that don‘t have enough resources to launch an executor
 7       val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
 8         .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
 9           worker.coresFree >= coresPerExecutor.getOrElse(1))
10         .sortBy(_.coresFree).reverse
11       val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
12
13       // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them
14       for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
15         allocateWorkerResourceToExecutors(
16           app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
17       }
18     }
19   }

我们可以看到,在这里虽然依旧是那个简单的fifo调度,但是不再是对核心进行逐个调度,会适应executor对核心数的要求,在寻找usableWorkers时会找到memory和cores都满足条件的worker。这一点是为了改变之前的一个bug,即:

cluster has 4 workers with 16 cores each. User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is allocated at a time, 12 cores from each worker would be assigned to each executor. Since 12 < 16, no executors would launch [SPARK-8881]

其中scheduleExecutorsOnWorkers是找到每个可用的worker分配给当前app的核心数量。

简单而言这还是一个简单的fifo调度,比起Yarn默认的capacity来讲少了许多功能,而google的Borg论文《Large-scale cluster management at Google with Borg》中写的调度细节更是复杂,不过胜在简单易懂,看起代码来轻松许多。。。

时间: 2024-10-25 02:51:16

Spark 1.60的executor schedule的相关文章

Spark技术内幕:Executor分配详解

当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程.序列图如下: 1. SparkContext创建TaskScheduler和DAG Scheduler SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它.如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例.

spark动态资源(executor)分配

spark动态资源调整其实也就是说的executor数目支持动态增减,动态增减是根据spark应用的实际负载情况来决定. 开启动态资源调整需要(on yarn情况下) 1.将spark.dynamicAllocation.enabled设置为true.意思就是启动动态资源功能 2.将spark.shuffle.service.enabled设置为true. 在每个nodeManager上设置外部shuffle服务 2.1 将spark-<version>-yarn-shuffle.jar拷贝到

Spark核心原理之Executor原理

Executor是Spark执行任务的进程,Spark启动Executor过程包括如下步骤: 1)使用Spark-submit提交到集群,Master收到RequesSubmitDriver请求. 2)Master调用scheduler把Driver程序发送到worker端执行. 3)Driver执行时初始化SparkContext,创建AppClient,向Master注册,其中Appclient中实现了内部类ClientEndPoint,和Master进行通信. 4)Master收到注册信息

Tuning Spark

https://spark.apache.org/docs/1.2.1/tuning.html Data Serialization 数据序列化,对于任意分布式系统都是性能的关键点 Spark默认使用Java serialization,这个比较低效 推荐使用,Kryo serialization,会比Java序列化,更快更小, Spark使用Twitter chill library(Kryo的scala扩展) conf.set("spark.serializer", "o

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

Spark调优

转载:http://www.oschina.net/translate/spark-tuning 因为大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU.网络带宽或者是内存都有可能成为Spark程序的瓶颈.通常情况下,如果数据完全加载到内存那么网络带宽就会成为瓶颈,但是你仍然需要对程序进行优化,例如采用序列化的方式保存RDD数据(Resilient Distributed Datasets),以便减少内存使用.该文章主要包含两个议题:数据序列化和内存优化,数据序列化不但能

【Spark Core】任务执行机制和Task源码浅析1

引言 上一小节<TaskScheduler源码与任务提交原理浅析2>介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息. 我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor执行的Task分为ShuffleMap

spark系统实现yarn资源的自动调度

参考: http://blog.csdn.net/dandykang/article/details/48160953 对于Spark应用来说,资源是影响Spark应用执行效率的一个重要因素.当一个长期运行 的服务(比如Thrift Server),若分配给它多个Executor,可是却没有任何任务分配给它,而此时有其他的应用却资源张,这就造成了很大的资源浪费和资源不合理的调度. 动态资源调度就是为了解决这种场景,根据当前应用任务的负载情况,实时的增减 Executor个数,从而实现动态分配资源