spark的task调度器(FAIR公平调度算法)

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    默认为0,除非通过fair的配置文件进行了配置指定

    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    如果是TaskSetManager时,就是taskSet中运行的task的个数,
    如果是Pool实例是表示是所有使用这个poolName的所有的TaskSetManager正在运行的task的个数.

    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    只有在minShare在fair的配置文件中显示配置,同时大于正在运行的task的个数时,才会为true

    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    运行的task的个数针对于minShare的比重

    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    得到正在运行的task个数针对于pool的weight的比重
    var compare: Int = 0

    这里首先根据正在运行的task的个数是否已经达到调度队列中最小的分片的个数来进行排序,
    如果s1中运行运行的个数小于s1的pool的配置的minShare,返回true,表示s1排序在前面.
    如果s2中运行的task的个数小于s2的pool中配置的minShare(最小分片数)的值,表示s1小于s2,这时s2排序应该靠前.

    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
    这种情况表示s1与s2两个队列中,正在运行的task的个数都已经大于(不小于)了两个子调度器中配置的minShare的个数时,根据两个子调度器队列中正在运行的task的个数对应此调度器中最小分片的值所占的比重最小的一个排序更靠前
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
    这种情况表示s1与s2两个子调度器的队列中,正在运行的task的个数都还没有达到配置的最小分片的个数的情况,比较两个队列中正在运行的task的个数对应调度器队列的weigth的占比,最小的一个排序更靠前
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
    如果两个根据上面的计算,排序值都相同,就看看这两个调度器的名称,按名称的字节序来排序了.
      s1.name < s2.name
    }
  }
}

  

原文地址:https://www.cnblogs.com/hanhaotian/p/11791966.html

时间: 2024-10-05 23:25:44

spark的task调度器(FAIR公平调度算法)的相关文章

Linux IO 调度器

Linux IO Scheduler(Linux IO 调度器) 每个块设备或者块设备的分区,都对应有自身的请求队列(request_queue),而每个请求队列都可以选择一个I/O调度器来协调所递交的request.I/O调度器的基本目的是将请求按照它们对应在块设备上的扇区号进行排列,以减少磁头的移动,提高效率.每个设备的请求队列里的请求将按顺序被响应.实际上,除了这个队列,每个调度器自身都维护有不同数量的队列,用来对递交上来的request进行处理,而排在队列最前面的request将适时被移

Hadoop的调度器总结

Hadoop的调度器总结 随着MapReduce的流行,其开源实现Hadoop也变得越来越受推崇.在Hadoop系统中,有一个组件非常重要,那就是调度器,它的作用是将系统中空闲的资源按一定策略分配给作业.在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器.Hadoop中常见的调度器有三种,分别为: (1)默认的调度器FIFO Hadoop中默认的调度器,它先按照作业的优先级高低,再按照到达时间的先后选择被执行的作业. (2) 计算能力调度器Capacity Sc

Linux IO Scheduler(Linux IO 调度器)

每个块设备或者块设备的分区,都对应有自身的请求队列(request_queue),而每个请求队列都可以选择一个I/O调度器来协调所递交的request.I/O调度器的基本目的是将请求按照它们对应在块设备上的扇区号进行排列,以减少磁头的移动,提高效率.每个设备的请求队列里的请求将按顺序被响应.实际上,除了这个队列,每个调度器自身都维护有不同数量的队列,用来对递交上来的request进行处理,而排在队列最前面的request将适时被移动到请求队列中等待响应. IO调度器在内核栈中所处位置如下: 内核

CFS调度器

一.前言 随着内核版本的演进,其源代码的膨胀速度也在递增,这让Linux的学习曲线变得越来越陡峭了.这对初识内核的同学而言当然不是什么好事情,满腔热情很容易被当头浇灭.我有一个循序渐进的方法,那就是先不要看最新的内核,首先找到一个古老版本的内核(一般都会比较简单),将其吃透,然后一点点的迭代,理解每个版本变更背后的缘由和目的,最终推进到最新内核版本. 本文就是从2.4时代的任务调度器开始,详细描述其实现并慢慢向前递进.当然,为了更好的理解Linux调度器设计和实现,我们在第二章给出了一些通用的概

hadoop调度器的原理和应用场景解析

前置篇: 为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 计算框架完全重构,发生了根本的变化.新的 Hadoop MapReduce 框架命名为 Yarn,重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是计算资源管理和任务调度 / 监控.ResourceManager全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协

k8s中的默认调度器

默认调度器中的调度算法分为预选和优选 预选策略1.基础的检查项(GeneralPredicates)PodFitsResources node剩余可分配资源(Allocatable-sum(request))是否足够pod调度PodFitsHost node的名称是否跟pod的spec.nodeName字段一致PodFitsHostPorts pod指定的spec.nodePort端口在待考察node上是否已被占用PodMatchNodeSelector pod的nodeSelector或者no

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

Spark源代码分析之六:Task调度(二)

话说在<Spark源代码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这种方法针对接收到的ReviveOffers事件进行处理.代码例如以下: // Make fake resource offers on all executors     // 在全部的executors上提供假的资源(抽象的资源.也就是资源的对象信息,我是这么理解的)     private def makeOffers() {       /

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.