DT大数据梦工厂第三十五课 Spark系统运行循环流程

本节课内容:

1.     TaskScheduler工作原理

2.     TaskScheduler源码

一、TaskScheduler工作原理

总体调度图:

通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理。

回顾:

DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程;运行时从前往后运行的。每个Stage中有很多任务Task,Task是可以并行执行的。它们的执行逻辑完全相同的,只不过是处理的数据不同而已,DAGScheduler通过TaskSet的方式,把其构造的所有Task提交给底层调度器TaskScheduler。

&  TaskScheduler是一个trait,与具体的资源调度解耦合,这符合面向对象中依赖抽象不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行的众多的资源调度模式上,例如:StandAlone、Yarn、Mesos、Local、EC2或者其他自定义的资源调度器。

在StandAlone模式下,我们来看看TaskScheduler的一个实现TaskSchedulerImpl。

1.TaskScheduler的核心任务

TaskScheduler的核心任务是提交TaskSet到集群并汇报结果,主要负责Application的不同Job之间的调度。

具体来讲有以下几点:

(1)    为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息;

(2)    Task执行失败时启动重试机制,以及遇到Straggle任务会在其他节点上启动备份任务;

(3)    向DAGScheduler汇报执行情况,包括在shuffle输出丢失的时候报告fetch failed错误等信息。

2.TaskScheduler的核心功能

(1)注册当前程序。

&  TaskScheduler内部会握有SchedulerBackend引用,SchedulerBackend是一个trait,它主要负责管理Executor资源,从StandAlone模式来讲,具体实现是SparkDeploySchedulerBackend。SparkDeploySchedulerBackend在启动时会构造AppClient实例并在该实例start的时候启动ClientEndpoint(消息循环体),ClientEndpoint在启动时会向Master注册当前程序。

(1)   注册Executor信息。

&  SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend会在Start的时候实例化一个类型DriverEndpoint的消息循环体。DriverEndpoint就是我们程序运行时的Driver对象。SparkDeploySchedulerBackend是专门给来负责收集Worker上的资源信息,当ExecutorBackend启动的时候会发送RegisteredExecutor信息向Driver中的DriverBackend进行注册。(可以参考前几讲的Master注册部分。)此时SparkDeploySchedulerBackend就掌握了当前应用应用程序所拥有的计算资源,TaskScheduler就是通过SparkDeploySchedulerBackend拥有的计算资源来具体运行Task。

&  补充:SparkContext、DAGScheduler、TaskSchedulerImpl、SparkDeploySchedulerBackend在应用程序启动的时候值实例化一次,应用程序存在期间始终存在这些对象。SparkDeploySchedulerBackend是一个辅助类,主要是帮助TaskSchedulerImpl中的Task获取计算资源和发送Task到集群中

3.TaskScheduler的实例化时机

TaskScheduler是在SparkContext实例化时进行实例化的,如TaskSchedulerImpl的实例化。(Spark1.6.0  SparkContext.scala  #521-#526)

// Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

在SparkContext#createTaskScheduler方法中会创建TaskScheduler和SparkDeploySchedulerBackend:

private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._
    //省略部分代码
	case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
		//利用SparkDeploySchedulerBackend来初始化TaskScheduler
        scheduler.initialize(backend)  //1
        (backend, scheduler)
}

4.TaskScheduler初始化

<pre name="code" class="plain">//1被处调用
def initialize(backend: SchedulerBackend) {
   	 this.backend = backend
   	 // temporarily set rootPool name to empty
  	 rootPool = new Pool("", schedulingMode, 0, 0) //2
	//根据rootPool中的算法创建可调度对象
   	 schedulableBuilder = {
     	 schedulingMode match {
        case SchedulingMode.FIFO =>
		//FIFO模式
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
		//Fair模式
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    //创建调度池
    schedulableBuilder.buildPools()
  }

创建调度池

(1)创建rootPool(实现调度算法)

//2处被调用
private[spark] class Pool(
    val poolName: String,
    val schedulingMode: SchedulingMode,
    initMinShare: Int,
    initWeight: Int)
  extends Schedulable
  with Logging {

  val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
  val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
  var weight = initWeight
  var minShare = initMinShare
  var runningTasks = 0
  var priority = 0

  // A pool's stage id is used to break the tie in scheduling.
  var stageId = -1
  var name = poolName
  var parent: Pool = null
	//根据不同的调度算法创建调度算法的实例
  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

(2)创建可调度对象

Org.apache.spark.scheduler.Pool包含了一组可以调度的实体。对于FIFO来说,rootPool包含了一组TaskSetManager;而对于FAIR来说,rootPool包含了一组Pool,这些Pool构成了一颗调度树,其中这棵树的叶子节点就是TaskSetManager。

(3)创建调度池

schedulableBuilder.buildPools()因调度方式而异,如果是FIFO,它的实现是空的如下:

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
  extends SchedulableBuilder with Logging {
  override def buildPools() {
    // nothing
  }
	//定义了如何将TaskSetManager加入到调度池中
  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    rootPool.addSchedulable(manager) //3
  }
}

因为rootPool并没有包含Pool,而是直接包含TaskSetManager:submitTasks直接将TaskSetManager添加到rootPool(调度队列,队列默认是先入先出)即可。

 //将可调度对象加入到调度队列  3处被调用
 override def addSchedulable(schedulable: Schedulable) {
    require(schedulable != null)
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    schedulable.parent = this
  }

而FAIR模式则需要在运行前先进行一定的配置。它需要在rootPool的基础上根据这个配置文件来构建这颗调度树。

具体实现见如下代码:

override def buildPools() {
    var is: Option[InputStream] = None
    try {
      is = Option {
		//以spark.scheduler.allocation.file设置的文件名字来创建FileInputStream
        schedulerAllocFile.map { f =>
          new FileInputStream(f)
        }.getOrElse {
		//若spark.Scheduler.allocation.file没有设置,则直接以fairscheduler.xml创建
		//FileInputStream
          Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
        }
      }
	//以is对应的内容创建Pool
      is.foreach { i => buildFairSchedulerPool(i) }
    } finally {
      is.foreach(_.close())
    }
	创建名为“default”的Pool
    buildDefaultPool()
  }

(4)调度算法

private[spark] traitSchedulingAlgorithm {
  <span style="white-space:pre">	</span>def comparator(s1: Schedulable, s2:Schedulable): Boolean
    }

从代码来看调度算法是一个trait,需要子类实现。其实质就是封装了一个比较函数。子类只需实现这个比较函数即可。

(a)FIFO

采用FIFO任务调度的顺序:

首先要保证JobID较小的先被调度,如果是同一个Job,那么StageID小的先被调度(同一个Job,可能多个Stage可以并行执行,比如Stage划分过程中Stage0和Stage1)。

调度算法:

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
	//比较可调度对象s1与s2,这里s1与s2其实就是TaskSetManager。
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority     //这个priority实际上就是Job ID
    val priority2 = s2.priority     //同上
    var res = math.signum(priority1 - priority2)  //首先比较Job ID
    if (res == 0) { //如果Job ID相同,那么比较Stage ID
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

(2)FAIR

对于FAIR,首先是挂在rootPool下面的pool先确定调度顺序,然后在每个pool内部使用相同的算法来确定TaskSetManager的调度顺序。

算法实现:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
	//最小共享,可以理解为执行需要的最小资源即CPU核数,其他相同时,所需最小核数小的优先
//调度
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
//运行的任务的数量
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
	//查看是否有调度队列处于饥饿状态,看可分配的核数是否少于任务数,如果资源不够用,那么
//处于挨饿状态
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
	//计算ShareRatio, 最小资源占用比例,这里可以理解为偏向任务较轻的
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
	//计算Task的Weight比重即权重,任务数相同,权重高的优先
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0
	//首先处于饥饿优先
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      //都处于挨饿状态则,需要资源占用比小的优先
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
	//都不挨饿,则比较权重比,比例低的优先
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
	//如果都一样,那么比较名字,按照字母顺序比较,所以名字比较重要
      s1.name < s2.name
    }
  }
}

注:

&  公平原则本着的原则就是谁最需要就给谁,所以挨饿者优先;

&  资源占用比这块有点费解,如果把他理解成一个贪心问题就容易理解了。对于都是出于挨饿状态的任务可以这样理解,负载大的即时给你资源你也不一定能有效缓解,莫不如给负载小的,让其快速使用,完成后可以释放更多的资源,这是一种贪心策略。如JobA和JobB的Task数量相同都是10,A的minShare是2,B的是5,那占用比为5和2,显然B的占用比更小,贪心的策略应该给B先调度处理;

&  对于都处于满足状态的,当然谁的权重有着更好的决定性,权重比低得优先(偏向权利大的);

&  如果所有上述的比较都相同,那么名字字典排序靠前的优先(哈哈,名字很重要哦);名字aaa要比abc优先,所以这里在给Pool或者TaskSetManager起名字的时候要考虑这一点

(备注来源:https://yq.aliyun.com/articles/6041

补充:这两种调度的排序算法针对的可比较对象都是Schedule的具体对象,这里我们对这个对象Schedulable做简单的解释。

前面讲到,Schedulable可调度对象在Spark有两种形式:Pool和TaskSetManager。Pool是一个调度池,Pool里面还可以有子Pool,Spark中的rootPool即根节点默认是一个无名(default)的Pool。对于FIFO和FAIR有不同的层次。

对于FIFO模式的调度,rootPool管理的直接就是TaskSetManager,没有子Pool这个概念,就只有两层,rootPool和叶子节点TaskSetManager,实现如下所示。

但对于FAIR这种模式来说,是三层的,根节点是rootPool,为无名Pool,下一层为用户定义的Pool(不指定名称默认名称为default),再下一层才是TaskSetManager,即根调度池管理一组调度池,每个调度池管理自己的TaskSetManager,其实现如下所示。

这里的调度顺序是指在一个SparkContext之内的调度,一般情况下我们自行使用是不太会需要Pool这个概念的,因为不存在Pool之间的竞争,但如果我们提供一个Spark应用,大家都可以提交任务,服务端有一个常驻的任务,对应一个SparkContext,每个用户提交的任务都由其代理执行,那么针对每个用户提交的任务可以按照用户等级和任务优先级设置一个Pool,这样不同的用户的Pool之间就存在竞争关系了,可以用Pool的优先级来区分任务和用户的优先级了,**但要再强调一点名字很重要,因为FAIR机制中,如果其他比较无法判断,那么会按照名字来进行字典排序的**。(补充来源:https://yq.aliyun.com/articles/6041)

5.创建DAGScheduler,调用TaskScheduler#start方法(SparkContext初始化过程中)

//SparkContext.scala
525)     _dagScheduler = new DAGScheduler(this)
526)     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
527)
528)    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
529)    // constructor
530)    _taskScheduler.start()
//TaskSchedulerImpl.scala
override def start() {
   //启动SparkDeploySchedulerBackend
	 backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

6.启动Executor

override def start() {
    super.start()
    launcherBackend.connect()

    // The endpoint for executors to talk to us
    val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
      RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
// Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
	//定义分配资源的进程名称
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
	//省略部分代码,详细内容参见前几节,Executor注册过程。
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
	//注册应用程序
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

在client#start方法中最终会注册应用程序。

二、总结

在SparkContext实例化的时候调用createTaskScheduler来创建TaskSchedulerImpl和SparkDeploySchedulerBackend,同时在SparkContext实例化的时候会调用TaskSchedulerImpl#start方法,在该方法中会调用SparkDeploySchedulerBackend#start;在这个start方法中会创建AppClient对象并调用AppClient#start方法,这时会创建ClientEndpoint,在创建ClientEndpoint时会传入来指定具体为当前应用程序启动的Executor进程的入口类的名称为CoarseGrainedExecutorBackend,然后ClientEndpoint启动并通过tryRegisterMaster来注册当前的应用程序到Master中,Master接收到注册信息后,如果可以运行程序,则会为该程序生成JobID并通过Schedule来分配计算资源,具体计算资源分配是通过应用程序运行方式、Memory、core等配置信息来决定的,最后Master会发送指令给Worker;Worker中为当前应用程序分配计算资源时,首先分配ExecutorRunner,ExecutorRunner内部会通过Thread的方式构建ProcessBuilder来启动另一个JVM进程,这个JVM进程启动时候会加载的main方法所在的类的名称就是创建ClientEndpoint传入的Command指定的入口类CoarseGrainedExecutorBackend,此时JVM在通过ProcessBuilder启动的时候获得了CoarseGrainedExecutorBackend后,加载并调用其中的main方法。在main方法中会实例化CoarseGrainedExecutorBackend本身这个消息循环体,而其实例化时会通过回调onStart向DriverEndpoint发送RegisterExecutor来注册当前的CoarseGrainedExecutorBackend,此时DriverEndpoint收到该注册信息并保存在了SparkDeployScheduler实例的内存数据结构中,这样Driver就获得了计算资源!

Task运行各个阶段交互过程

图35-6 资源分配过程

备注:有关FIFO、FAIR调度算法解析部分参考 张安站 --Spark技术内幕一书

说明:

本文是由DT大数据梦工厂的IFM课程第35课为基础所做的笔记

时间: 2024-10-10 20:52:12

DT大数据梦工厂第三十五课 Spark系统运行循环流程的相关文章

DT大数据梦工厂-Scala学习笔记(1):Scala开发环境搭建和HelloWorld解析

一.scala是函数式编程和面向对象编程结合的语言,这两种编程的特点分别是什么? 答:函数式编程擅长数值的计算:面向对象编程特别适合于大型工程或项目的组织,以及团队的分工协作. 二.Scala的特点 Scala结构优雅.计算精致.富有表现力 三.scala的安装需要什么资源? Java,推荐安装Java8(Java7也可以) 支持scala 2.10.*以上(可以安装2.10.4,http://scala-lang.org/download) 四.设置环境变量(系统变量): (1)JAVA_HO

底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1.一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交:2.Scala学习角度讲,比Java难.找Scala的高手比Java难,项目的维护和二次开发比较困难:3.很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combat

DT大数据 梦工厂57讲

今日[DT大数据梦工厂视频]<第57讲:Scala中Dependency Injection实战详解> 土豆:http://www.tudou.com/programs/view/5LnLNDBKvi8/ 百度网盘:http://pan.baidu.com/s/1c0no8yk (DT大数据梦工厂scala的所有视频.PPT和代码在百度云盘的链接地址:http://pan.baidu.com/share/home?uk=4013289088#category/type=0&qq-pf-

DT大数据梦工厂免费实战大数据视频全集 分享

接触大数据有几年,以前一直都是对hadoop的使用.相比于日新月异的前端技术,我还是比较喜欢大数据--这个已经被热炒多年的课题,也相信从事大数据方面的技术研究是IT从业者的一条光明坦途. 2010年hadoop开始进入我的视野, 不断自学,使用,感谢hadoop国内还是有很多书籍.去年开始各大社区开始讨论spark,也开始学习scala语言,断断续续没有坚持,没多久也会忘掉.今年初看到了王老师的<Spark亚太研究院Spark公益大讲堂>视频,一些实时性需求在工作中出现让我不得不对比hadoo

DT大数据梦工厂 第72,73讲

王家林亲授<DT大数据梦工厂>大数据实战视频“Scala深入浅出实战经典”视频.音频和PPT下载!第72讲:Scala界面事件处理编程实战详解百度云:http://pan.baidu.com/s/1qWsq6Jy腾讯微云:http://url.cn/g6En3l360云盘:http://yunpan.cn/cchw9EZAp2rKd 访问密码 05d1本节王老师讲了scala界面的事件处理就是当用户操作一个行为,GUI将做出一个反应.比如点击一个按钮事件.def top=new MainFra

DT大数据梦工厂 第63,64,65讲

王家林亲授<DT大数据梦工厂>大数据实战视频“Scala深入浅出实战经典”视频.音频和PPT下载!第63讲:Scala中隐式类代码实战详解百度云:http://pan.baidu.com/s/1o6wxJdS腾讯微云:http://url.cn/TfOJqr360云盘:http://yunpan.cn/cckajtapNGT9z 访问密码 369d本节王老师讲了隐式类.其作用就是把转换后的类放在一个作用域中,消除了之前的隐式转换方法.object Context{ implicit class

DT大数据 梦工厂53讲

王家林亲授<DT大数据梦工厂>大数据实战视频"Scala深入浅出实战经典"视频.音频和PPT下载!第53讲:Scala中结构类型实战详解 百度云盘:http://pan.baidu.com/s/1ntEGt4X 腾讯微云:http://url.cn/frfFQl 360云盘:http://yunpan.cn/cc3SKdVt8dFwa  访问密码 6ae4 第53讲 王老师讲了结构类型,结构类型表明了一个方法中的参数被一个结构所限定,只要符合一种结构,就可以作为参数传入这个

DT大数据 梦工厂52讲

王家林亲授<DT大数据梦工厂>大数据实战视频"Scala深入浅出实战经典"视频.音频和PPT下载!第52讲:Scala中路径依赖代码实战详解 百度云:http://pan.baidu.com/s/1gdES4hX 360云盘:http://yunpan.cn/ccHXX2Wkrrrt4  访问密码 c489 腾讯微云:http://url.cn/VV5kx5 本节王老师讲了内部类同java的区别.scala中的内部类是用外部类的实例创建的,只和外部类的实例相关,不同的实例创

DT大数据梦工厂,插上大数据的翅膀

2015.7.9 DT大数据梦工厂scala 深入浅出实战经典 再也没有这样好的视频了,只要每天看一点,你就会有一点收获,讲的并不只是代码,而且还有一些看待事物的哲理,通过真实场景来思考代码,这是这个视频的精华所在,要想学好大数据,并没有什么可犹豫的,直接看就可以了,你的成功不在于你努力了多少,而是你做了成功的事.shihttp://pan.baidu.com/s/1kTotMQz 今天先讲一下File这个传输的过程吧,自己敲了一下代码试试,这是我通过看视频,散发出的一些正能量,绝对刚出炉,还热