spark-3.0 application 调度算法解析

spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制。

private def startExecutorsOnWorkers(): Unit = {  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app  // in the queue, then the second app, etc.  for (app <- waitingApps) {    //如果在 spark-submmit 脚本中,指定了每个executor 多少个 CPU core,    // 则每个Executor 分配该个数的 core,    // 否则 默认每个executor 只分配 1 个 CPU core    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)    // If the cores left is less than the coresPerExecutor,the cores left will not be allocated    //  当前 APP 还需要分配的  core  数 不能  小于 单个 executor 启动 的 CPU core 数    if (app.coresLeft >= coresPerExecutor) {      // Filter out workers that don‘t have enough resources to launch an executo/*ku*/r      // 过滤出 状态 为 ALIVE,并且还能 发布 Executor 的 worker      // 按照剩余的 CPU core 数  倒序      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)        .filter(canLaunchExecutor(_, app.desc))        .sortBy(_.coresFree).reverse      if (waitingApps.length == 1 && usableWorkers.isEmpty) {        logWarning(s"App ${app.id} requires more resource than any of Workers could have.")      }
    // TODO:  默认采用 spreadOutApps  调度算法, 将 application需要的 executor资源 分派到  多个 worker 上去
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {        allocateWorkerResourceToExecutors(          app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))      }    }  }}判断一个 worker 是否可以发布 executor
private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = {  canLaunch(    worker,    desc.memoryPerExecutorMB,    desc.coresPerExecutor.getOrElse(1),    desc.resourceReqsPerExecutor)}让我们看一看里面的 canlaunch 方法
private def canLaunch(    worker: WorkerInfo,    memoryReq: Int,    coresReq: Int,    resourceRequirements: Seq[ResourceRequirement])  : Boolean = {  // worker 上 空闲的 内存值  要 大于等于  请求的 内存值  val enoughMem = worker.memoryFree >= memoryReq  // worker 上 空闲的 core 数  要 大于等于  请求的 core数  val enoughCores = worker.coresFree >= coresReq  //  worker 是否满足 executor 请求的资源     val enoughResources = ResourceUtils.resourcesMeetRequirements(    worker.resourcesAmountFree, resourceRequirements)  enoughMem && enoughCores && enoughResources}

回到上面的 scheduleExecutorsOnWorkers
private def scheduleExecutorsOnWorkers(    app: ApplicationInfo,    usableWorkers: Array[WorkerInfo],    spreadOutApps: Boolean): Array[Int] = {  val coresPerExecutor = app.desc.coresPerExecutor  val minCoresPerExecutor = coresPerExecutor.getOrElse(1)  // 默认情况下 是 开启  oneExecutorPerWorker 机制的,也就是默认是在 一个 worker 上  只启动 一个 executor的  //  如果在spark -submit 脚本中设置了coresPerExecutor , 在worker资源充足的时候,则 会在每个worker 上,启动多个executor  val oneExecutorPerWorker = coresPerExecutor.isEmpty  val memoryPerExecutor = app.desc.memoryPerExecutorMB  val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor  val numUsable = usableWorkers.length  val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker  val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker  var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
// 判断  Worker节点是否能够启动Executor  def canLaunchExecutorForApp(pos: Int): Boolean = {

    val keepScheduling = coresToAssign >= minCoresPerExecutor    val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor    val assignedExecutorNum = assignedExecutors(pos)

    // If we allow multiple executors per worker, then we can always launch new executors.    // Otherwise, if there is already an executor on this worker, just give it more cores.

    // 如果spark -submit 脚本中设置了coresPerExecutor值,    // 并且当前 这个worker 还没有为这个 application 分配 过  executor ,    val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0      // TODO:  可以启动新的 Executor    if (launchingNewExecutor) {      val assignedMemory = assignedExecutorNum * memoryPerExecutor      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor      val assignedResources = resourceReqsPerExecutor.map {        req => req.resourceName -> req.amount * assignedExecutorNum      }.toMap      val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {        case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))      }      val enoughResources = ResourceUtils.resourcesMeetRequirements(        resourcesFree, resourceReqsPerExecutor)      val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit      keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit    } else {      // We‘re adding cores to an existing executor, so no need      // to check memory and executor limits      // TODO:  不满足启动新的 Executor条件,则 在 老的 Executor 上 追加  core 数      keepScheduling && enoughCores    }  }

  // Keep launching executors until no more workers can accommodate any  // more executors, or if we have reached this application‘s limits

  var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)  while (freeWorkers.nonEmpty) {    freeWorkers.foreach { pos =>      var keepScheduling = true      while (keepScheduling && canLaunchExecutorForApp(pos)) {        coresToAssign -= minCoresPerExecutor        assignedCores(pos) += minCoresPerExecutor

        // If we are launching one executor per worker, then every iteration assigns 1 core        // to the executor. Otherwise, every iteration assigns cores to a new executor.        if (oneExecutorPerWorker) {          //TODO: 如果该Worker节点不能启动新的 Executor,则在老的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值默认 为 1 )          assignedExecutors(pos) = 1        } else {          //TODO: 如果该Worker节点可以启动新的 Executor,则在新的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值为 spark-submit脚本配置的 coresPerExecutor 值)          assignedExecutors(pos) += 1        }

        // Spreading out an application means spreading out its executors across as        // many workers as possible. If we are not spreading out, then we should keep        // scheduling executors on this worker until we use all of its resources.        // Otherwise, just move on to the next worker.        if (spreadOutApps) {          // TODO: 这里传入 keepScheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一个 worker 上  再去 分配 core,直到 worker          // TODO:  完成一次遍历          keepScheduling = false        }      }    }    freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)  }  // 返回每个Worker节点分配的CPU核数  assignedCores}

再来分析 allocateWorkerResourceToExecutors
private def allocateWorkerResourceToExecutors(    app: ApplicationInfo,    assignedCores: Int,    coresPerExecutor: Option[Int],    worker: WorkerInfo): Unit = {  // If the number of cores per executor is specified, we divide the cores assigned  // to this worker evenly among the executors with no remainder.  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)  for (i <- 1 to numExecutors) {    val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)    // TODO : 当前 这个 application 追加 一次  Executor    val exec = app.addExecutor(worker, coresToAssign, allocated)    //TODO: 给worker 线程 发送 launchExecutor 命令    launchExecutor(worker, exec)    app.state = ApplicationState.RUNNING  }}ok,至此,spark最新版本 spark-3.0的Application 调度算法分析完毕!!!

原文地址:https://www.cnblogs.com/guodong1789/p/11982170.html

时间: 2024-10-11 14:14:39

spark-3.0 application 调度算法解析的相关文章

Spark集群任务提交流程----2.1.0源码解析

Spark的应用程序是通过spark-submit提交到Spark集群上运行的,那么spark-submit到底提交了什么,集群是怎样调度运行的,下面一一详解. 0. spark-submit提交任务 0.1 启动脚本解析 分析spark-submit脚本源码可知最终该命令执行./bin/spark-class的Java类脚本,./bin/spark-class脚本启动的类是org.apache.spark.launcher.Main,在spark-submit模式下该类会启动SparkSubm

Spark Thrift JDBCServer应用场景解析与实战案例

[TOC] Spark Thrift JDBCServer应用场景解析与实战案例 1 前言 这里说的Spark Thrift JDBCServer并不是网上大部分写到的Spark数据结果落地到RDB数据库中所使用的JDBC方式,而是指Spark启动一个名为thriftserver的进程以供客户端提供JDBC连接,进而使用SQL语句进行查询分析. http://spark.apache.org/docs/2.3.3/sql-programming-guide.html#running-the-th

Spark on K8S源码解析.md

Spark on K8S源码解析 sparkk8s time: 2019-12-19 Spark on k8s源码解析 1. Spark Submit spark-submit.sh spark-class.sh SparkSubmit 第一步,初始化spark应用配置 第二步,执行spark应用 Spark on k8s源码解析 本文基于spark-3.0.0 preview源码,来分析spark作业基于K8S的提交过程. spark on k8s的代码位置位于: 关于kubernetes目录

Spark 1.0.0企业级开发动手:实战世界上第一个Spark 1.0.0课程,涵盖Spark 1.0.0所有的企业级开发技术

课程介绍 2014年5月30日发布了Spark 1.0.0版本,而本课程是世界上第一个Spark1.0.0企业级实践课程,课程包含Spark的架构设计.Spark编程模型.Spark内核框架源码剖析.Spark的广播变量与累加器.Shark的原理和使用.Spark的机器学习.Spark的图计算GraphX.Spark SQL.Spark实时流处理.Spark的优化.Spark on Yarn.JobServer等Spark 1.0.0所有的核心内容 最后以一个商业级别的Spark案例为基础,实战

Android网络之Retrofit2.0使用和解析

Android网络之Retrofit2.0使用和解析 Retrofit2在项目中的使用 Android studio项目添加依赖 compile 'com.squareup.retrofit2:retrofit:2.0.1' 项目中使用样例 定义HTTP API使用接口 public interface GitHubService { @GET("users/{user}/repos") Call<List<Repo>> listRepos(@Path(&quo

Apache Spark 3.0 预览版正式发布,多项重大功能发布

2019年11月08日 数砖的 Xingbo Jiang 大佬给社区发了一封邮件,宣布 Apache Spark 3.0 预览版正式发布,这个版本主要是为了对即将发布的 Apache Spark 3.0 版本进行大规模社区测试.无论是从 API 还是从功能上来说,这个预览版都不是一个稳定的版本,它的主要目的是为了让社区提前尝试 Apache Spark 3.0 的新特性.如果大家想测试这个版本,可以到 这里 下载. Apache Spark 3.0 增加了很多令人兴奋的新特性,包括动态分区修剪(

Spark 1.0.0版本发布

前言 今天Spark终于跨出了里程碑的一步,1.0.0版本的发布标志着Spark已经进入1.0时代.1.0.0版本不仅加入了很多新特性,并且提供了更好的API支持.Spark SQL作为一个新的组件加入,支持在Spark上存储和操作结构化的数据.已有的标准库比如ML.Streaming和GraphX也得到了很大程度上的增强,对Spark和Python的接口也变得更稳定.以下是几个主要的改进点: 融合YARN的安全机制 Hadoop有着自己的安全机制,包括认证和授权.Spark现在可以和Hadoo

cocos2d-x 3.0 使用Sax解析xml文档(解决中文显示问题)

今天是个好日子,心想的事儿都能成,明天是个好日子,打开了家门儿迎春风... 恩,听着歌写文档生活就是这么享受. 今天以前的邻居大神突然在qq上赞了我一下,这让我异常激动啊..这还要从前前前几天说起,那会无意间看到cocos微信上的一个实话实说活动,反正就是参加了可以抽奖这样子啦,没错,我就是本着那官方T恤去的,本着分子越大分母越大抽奖几率越大的原则,然后就连着发了一番感慨,而且还都是比较罗嗦,没想到隔天cocos君竟然给我回复了,中奖了有木有,cocos2dx的官方T恤,哈哈..然后就是以前的大

Spark 1.0.0 横空出世 Spark on yarn 部署(hadoop 2.4)

就在昨天,北京时间5月30日20点多.Spark 1.0.0终于发布了:Spark 1.0.0 released 根据官网描述,Spark 1.0.0支持SQL编写:Spark SQL Programming Guide 个人觉得这个功能对Hive的市场的影响很小,但对Shark冲击很大,就像win7和winXP的关系,自相残杀嘛? 这么着急的发布1.x 版是商业行为还是货真价实的体现,让我们拭目以待吧~~~~ 本文是CSDN-撸大湿原创,如要转载请注明出处,谢谢:http://blog.csd