第一次看源码还是Spark 1.02。这次看新源码发现调度方式有了一些新的特征,在这里随便写一下。
不变的是,master还是接收Appclient和worker的消息,并且在接收RegisterApplication等消息后会执行一遍schedule()。schedule()依旧会先找到空闲的worker用以执行waitingDrivers。但是调度Executor的方式有了一点变化。
1 private def startExecutorsOnWorkers(): Unit = { 2 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 3 // in the queue, then the second app, etc. 4 for (app <- waitingApps if app.coresLeft > 0) { 5 val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor 6 // Filter out workers that don‘t have enough resources to launch an executor 7 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 8 .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && 9 worker.coresFree >= coresPerExecutor.getOrElse(1)) 10 .sortBy(_.coresFree).reverse 11 val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 12 13 // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them 14 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 15 allocateWorkerResourceToExecutors( 16 app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) 17 } 18 } 19 }
我们可以看到,在这里虽然依旧是那个简单的fifo调度,但是不再是对核心进行逐个调度,会适应executor对核心数的要求,在寻找usableWorkers时会找到memory和cores都满足条件的worker。这一点是为了改变之前的一个bug,即:
cluster has 4 workers with 16 cores each. User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is allocated at a time, 12 cores from each worker would be assigned to each executor. Since 12 < 16, no executors would launch [SPARK-8881]
其中scheduleExecutorsOnWorkers是找到每个可用的worker分配给当前app的核心数量。
简单而言这还是一个简单的fifo调度,比起Yarn默认的capacity来讲少了许多功能,而google的Borg论文《Large-scale cluster management at Google with Borg》中写的调度细节更是复杂,不过胜在简单易懂,看起代码来轻松许多。。。