Spark的任务调度

本文尝试从源码层面梳理Spark在任务调度与资源分配上的做法。

先从Executor和SchedulerBackend说起。Executor是真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend是spark提供的接口,定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数),进行一次makeOffer;executor更新状态,若任务完成的话,回收core,进行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展开。

makeOffer的目的是在有资源更新的情况下,通过调用scheduler的resourceOffers方法来触发它对现有的任务进行一次分配,最终launch新的tasks。这里的全局 scheduler就是TaskScheduler,实现是TaskSchedulerImpl,它可以对接各种SchedulerBackend的实现,包括standalone的,yarn的,mesos的。SchedulerBackend在做makeOffer的时候,会把现有的 executor资源以WorkerOfffer列表的方式传给scheduler,即以worker为单位,将worker信息及其内的资源交给 scheduler。scheduler拿到这一些集群的资源后,去遍历已提交的tasks并根据locality决定如何launch tasks。

TaskScheduler里,resourceOffers方法会将已经提交的tasks进行一次优先级排序,这个排序算法目前是两种:FIFO或FAIR。得到这一份待运行的tasks后,接下里就是要把schedulerBackend交过来的worker资源信息合理分配给这些tasks。分配前,为了避免每次都是前几个worker被分到tasks,所以先对WorkerOffer列表进行一次随机洗牌。接下来就是遍历tasks,看workers的资源“够不够”,“符不符合”task,ok 的话task就被正式launch起来。注意,这里资源"够不够"是很好判断的,在TaskScheduler里设置了每个task启动需要的cpu个数,默认是1,所以只需要做核数的大小判断和减1操作就可以遍历分配下去。而"符不符合"这件事情,取决于每个tasks的locality设置。

task 的locality有五种,按优先级高低排:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。也就是最好在同个进程里,次好是同个node(即机器)上,再次是同机架,或任意都行。task有自己的locality,如果本次资源里没有想要的 locality资源,怎么办呢?spark有一个spark.locality.wait参数,默认是3000ms。对于 process,node,rack,默认都使用这个时间作为locality资源的等待时间。所以一旦task需要locality,就可能会触发delay scheduling。

到这里,对于任务的分配,资源的使用大致有个了解。实际上,TaskScheduler的resourceOffer里还触发了 TaskSetManager的resourceOffer方法,TaskSetManager的resourceOffer是会检查task的 locality并最终调用DAGScheduler去launch这个task。这些类的名字以及他们彼此的调用关系,看起来是比较乱的。我简单梳理下。

这件事情要从Spark的DAG切割说起。Spark RDD通过其transaction和action操作,串起来形成了一个DAG。action的调用,触发了DAG的提交和整个job的执行。触发之后,由DAGScheduler这个全局唯一的面向stage的DAG调度器来切分DAG,根据是否 shuffle来切成多个小DAG,即stage。凡是RDD之间是窄依赖的,都归到一个stage里,这里面的每个操作都对应成MapTask,并行度就是各自RDD的partition数目。凡是遇到宽依赖的操作,那么就把这一次操作切为一个stage,这里面的操作对应成ResultTask,结果 RDD的partition数就是并行度。MapTask和ResultTask分别可以简单理解为传统MR的Map和Reduce,切分他们的依据本质上就是shuffle。所以shuffle之前,大量的map是可以同partition内操作的。每个stage对应的是多个MapTask或多个 ResultTask,这一个stage内的task集合成一个TaskSet类,由TaskSetManager来管理这些task的运行状态,locality处理(比如需要delay scheduling)。这个TaskSetManager是Spark层面上的,如何管理自己的tasks,即任务线程,这一层与底下资源管理是剥离的。我们上面提到的TaskSetManager的resourceOffer方法,是task与底下资源的交互,这个资源交互的协调人是 TaskScheduler,也是全局的,TaskScheduler对接的是不同的SchedulerBackend的实现(比如 mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是worker上起几个进程,每个进程分配多少资源。所以这两层很清楚,spark本身计算框架内管理线程级别的task,每个stage都有一个TaskSet,本身是个小DAG,可以丢到全局可用的资源池里跑;spark下半身的双层资源管理部分掌控的是进程级别的executor,不关心task怎么摆放,也不关心task运行状态,这是TaskSetManager管理的事情,两者的协调者就是TaskScheduler及其内的SchedulerBackend实现。

SchedulerBackend 的实现,除去local模式的不说,分为细粒度和粗粒度两种。细粒度只有Mesos(mesos有粗细两种粒度的使用方式)实现了,粗粒度的实现者有 yarn,mesos,standalone。拿standalone模式来说粗粒度,每台物理机器是一个worker,worker一共可以使用多少 cpu和内存,启动时候可以指定每个worker起几个executor,即进程,每个executor的cpu和内存是多少。在我看来,粗粒度与细粒度的主要区别,就是粗粒度是进程long-running的,计算线程可以调到executor上跑,但executor的cpu和内存更容易浪费。细粒度的话,可以存在复用,可以实现抢占等等更加苛刻但促进资源利用率的事情。这俩概念还是AMPLab论文里最先提出来并在Mesos里实现的。AMPLab 在资源使用粒度甚至任务分配最优的这块领域有不少论文,包括Mesos的DRF算法、Sparrow调度器等。所以standalone模式下,根据 RDD的partition数,以及每个task需要的cpu数,可以很容易计算每台物理机器的负载量、资源的消耗情况、甚至知道TaskSet要分几批才能跑完一个stage。

来自:http://blog.csdn.net/pelick/article/details/41866845

时间: 2024-08-08 14:00:30

Spark的任务调度的相关文章

spark内核揭秘-04-spark任务调度系统个人理解

spark的任务调度系统如下所示: 从上图中科院看出来由RDDObject产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每组的tasks都是一个state,每当遇到shuffle就会产生新的state,可以看出上图一共有三个state:DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时需勋勋task的最优化调度,例如数据本地性等:DAGScheduler还要监

spark中资源调度任务调度

在spark的资源调度中 1.集群启动worker向master汇报资源情况 2.Client向集群提交app,向master注册一个driver(需要多少core.memery),启动一个driver 3.Driver将当前app注册给master,(当前app需要多少资源),并请求启动对应的Executor 4.driver分发任务给Executor的Thread Pool. 根据Spark源码可以知道: 1.一个worker默认为一个Application启动一个Executor 2.启动

Spark(1.0) 内核解析

Spark的内核部分主要从以下几个方面介绍: 任务调度系统.I/0模块.通信控制模块.容错模块.shuffle模块 一.任务调度系统 1.作业执行流程 接下来注意几个概念: Application:用户自定义的Spark程序,用户提交后,Spark为App分配资源,将程序转换并执行. Driver Program:运行Application的main()函数并创建SparkContext RDD Graph:RDD是Spark的核心结构,可以通过一系列算子进行操作(主要有Transformati

spark总结——转载

转载自:http://smallx.me/2016/06/07/spark%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93/ 第一个Spark程序 /** * 功能:用spark实现的单词计数程序 * 环境:spark 1.6.1, scala 2.10.4 */ // 导入相关类库import org.apache.spark._ object WordCount { def main(args: Array[String]) { // 建立spark运行上下文 val

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea

Spark学习笔记(一)

概念: Spark是加州大学伯克利分校AMP实验室,开发的通用内存并行计算框架. 支持用scala.java和Python等语言编写应用程序.相较于Hdoop,往往有更好的运行效率. Spark包括了Spark Core, Spark SQL, SparkStreaming,MLlib和Graphx等组件. Spark Core:内存计算框架 Spark SQL:及时查询 SparkStreaming:实时应用的处理 MLlib:机器学习 Graphx:图形处理 Spark运行模式: Local

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St

王家林每日大数据语录Spark篇

王家林每日大数据语录Spark篇0043(2015.12.15于上海):Worker在退出的时候会通过ExecutorRunner杀死Executor并且会将运行在当前Worker下的Driver Client删除掉,最终AppClient端的SparkDeploySchedulerBackend会收到Master发过来的StatusUpdate信息来处理Executor丢失的信息,Task会被重新分配. 王家林每日大数据语录Spark篇0042(2015.12.15于上海):生产环境下Spar

【Spark】Spark的Standalone模式安装部署

Spark执行模式 Spark 有非常多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则执行在集群中,眼下能非常好的执行在 Yarn和 Mesos 中.当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,假设企业已经有 Yarn 或者 Mesos 环境.也是非常方便部署的. local(本地模式):经常使用于本地开发測试,本地还分为local单线程和local-cluster多线程; standalone(集群模式):典型的