深入理解spark-两种调度模式FIFO,FAIR模式

前面我们应知道了一个任务提交会由DAG拆分为job,stage,task,最后提交给TaskScheduler,在提交taskscheduler中会根据master初始化taskscheduler和schedulerbackend两个类,并且初始化一个调度池;

1.调度池比较

根据mode初始化调度池pool

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty 这里可以看到调度池初始化最小设置为0
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }

FIFO模式

这个会根据spark.scheduler.mode 来设置FIFO or FAIR,默认的是FIFO模式;

FIFO模式什么都不做,实现默认的schedulerableBUilder方法,建立的调度池也为空,addTasksetmaneger也是调用默认的;

可以简单的理解为,默认模式FIFO什么也不做。。

FAIR模式

fair模式则重写了buildpools的方法,读取默认路径 $SPARK_HOME/conf/fairscheduler.xml文件,也可以通过参数spark.scheduler.allocation.file设置用户自定义配置文件。

文件中配置的是

poolname 线程池名

schedulermode 调度模式(FIFO,FAIR仅有两种)

minshare 初始大小的线程核数

wight 调度池的权重

override def buildPools() {
    var is: Option[InputStream] = None
    try {
      is = Option {
        schedulerAllocFile.map { f =>
          new FileInputStream(f)
        }.getOrElse {
          Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
        }
      }

      is.foreach { i => buildFairSchedulerPool(i) }
    } finally {
      is.foreach(_.close())
    }

    // finally create "default" pool
    buildDefaultPool()
  }

同时也重写了addtaskmanager方法

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    var poolName = DEFAULT_POOL_NAME
    var parentPool = rootPool.getSchedulableByName(poolName)
    if (properties != null) {
      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
      parentPool = rootPool.getSchedulableByName(poolName)
      if (parentPool == null) {
        // we will create a new pool that user has configured in app
        // instead of being defined in xml file
        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
        rootPool.addSchedulable(parentPool)
        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
      }
    }
    parentPool.addSchedulable(manager)
    logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
  }

这一段逻辑中是把配置文件中的pool,或者default pool放入rootPool中,然后把TaskSetManager存入rootPool对应的子pool;

2.调度算法比较

除了初始化的调度池不一致外,其实现的调度算法也不一致

实现的调度池Pool,在内部实现方法中也会根据mode不一致来实现调度的不同

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

FIFO模式

FIFO模式的调度方式很容易理解,比较stageID,谁小谁先执行;

这也很好理解,stageID小的任务一般来说是递归的最底层,是最先提交给调度池的;

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

FAIR模式

fair模式来说的话,稍微复杂一点;

但是还是比较容易看懂,

1.先比较两个stage的 runningtask使用的核数,其实也可以理解为task的数量,谁小谁的优先级高;

2.比较两个stage的 runningtask 权重,谁的权重大谁先执行;

3.如果前面都一直,则比较名字了(字符串比较),谁大谁先执行;

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }

总结:虽然了解一下spark的调度模式,以前在执行中基本都没啥用到,没想到spark还有这样的隐藏功能。。。

原文地址:https://www.cnblogs.com/yankang/p/9786251.html

时间: 2024-10-11 00:18:10

深入理解spark-两种调度模式FIFO,FAIR模式的相关文章

[设计模式]两种方法实现简单工厂模式

<!--完整代码下载链接:点击下载完整代码示例--> 1.描述 在项目开发中经常会遇到根据不同的条件创建不同的对象,然后对该对象进行操作,一般都包括许多的switch -case分支如下: CBase* pBase(NULL); switch (type_variable) { case obj1: pBase = new CBaseDerivate1();break; case obj2: pBase = new CBaseDerivate2();break; ... case objN:

Spark两种提交方式Yarn-client and Yarn-cluster

Spark支持三种集群部署方式(Standalone,Mesos,Yarn),其中Master服务(Spark Standalone,Mesos Master,Yarn ResourceManager)决定哪些应用可以运行,在那个节点上运行,以及什么时候运行.Slave服务(Yarn NodeManager)运行在每个节点上,节点控制着Executor进程,同时监控作业的运行状态以及资源的消耗.Spark运行在Yarn上,有两种模式,Yarn-Client和Yarn-Cluster.通常情况下,

qml-main.cpp中的两种启动Qt Quick App模式

 现有两种启动Qt Quick App 模式: QQmlApplicationEngine搭配Window. QQuickView搭配Item.  qt默认使用第一种方法. QQmlApplicationEngine搭配Window: 1 #include <QGuiApplication> 2 #include <QQmlApplicationEngine> 3 4 int main(int argc, char *argv[]) 5 { 6 #if defined(Q_OS_W

两种用于派生的Singleton模式(以TextureMgr为例)

Singleton,顾名思义,从字面上来理解就是单例模式,这是C++程序中 常用到的一种设计模式,特别是像文件管理器,纹理管理器这种整个软件 中只需要唯一的实例来管理所有资源时,这种模式的价值便得以体现. 下面来介绍两种用于派生管理的Singleton模式: 其中,第一种是Gof版本的Singleton, 其代码如下: //[Singleton_Gof.h] #pragma once template<typename T> class Singleton_Gof { protected: s

一道题采用两种设计模式:对比策略模式和模版方法

摘要 <C++ Primer>习题14.38和14.39分别采用策略模式.模版方法解决问题. 问题 <C++ Primer 5th>习题 14.38 : 编写一个类令其检查某个给定的 string 对象的长度是否与一个阀值相等.使用该对象编写程序,统计并报告输入的文件中长度为 1 的单词有多少个.长度为 2 的单词有多少个........长度为 10 的单词有多少个. <C++ Primer 5th>习题 14.39 : 修改上一题的程序令其报告长度在 1 至 9 之间

理解json两种结构:数组和对象

Json是一种异常简单易懂的数据格式,关于json的规定,仅仅如下而已: 1) 并列的数据之间用逗号(", ")分隔. 2) 映射用冒号(": ")表示. 3) 并列数据的集合(数组)用方括号("[]")表示. 4) 映射的集合(对象)用大括号("{}")表示. 1. $arr = array(111,'aaa','bbb'); $arr1 = array( 'a' => 'aaa', 'b' => 222 );

FTP具有两种模式

FTP具有两种模式,分别是port模式(也叫主动模式)和pasv模式(也叫被动模式),怎么来理解这两种模式呢?我来打个比喻吧,在主动模式下:客户端给服务器端的21端口发命令说,我要下载什么什么,并且还会说我已经打开了自己的某个端口,你就从这里把东西给我吧,服务器知道后就会通过另外一个数据端口把东西传给客户端,这就是主动模式,可以理解为服务端主动给客户端传输文件;在被动模式下:客户端给服务器端的21端口发命令说,我要下载什么什么,服务器端知道后,就打开一个端口,然后告诉客户端,我已经打开了某某端口

安卓手机应用开发培训讲义笔记和心得(Java和Mono两种模式)

培训内容    —————————————————————————————————————————————————————————————————— 昨天夏主要讲了两个方面的安卓手机开发 一:Java语言开发手机安卓 ① 准备发软件工具  环境   (可以直接本地搭建环境) 开发IDE:Eclipse(仅次于VS的强大IDE) 其它一大堆的东西:Android SDK  模拟器   SDK JDK DAT ②  window下搭建Eclipse+andro开发环境 安装步骤:一般首先安装ava运

0548-apache两种工作模式介绍及配置优化

apache常用工作模式有两种,区别?worker模式:1.线程模式 2.占用资源少 3.稳定性略差 4.并发大prefork模式:1.进程模式 2.占用资源多 3.稳定 4.并发一般apache默认是prefork,编译时候一般选择worker模式.如果编译时候不指定worker模式,那么就是默认的prefork模式 已经确定了worker模式,如何调优呢?[[email protected] blog]# cd /application/apache/conf/[[email protect