Spark 任务调度分析

1、  资源分配

通过SparkSubmit进行提交应用后,首先会创建Client将应用程序(字节码文件.class)包装成Driver,并将其注册到Master。Master收到Client的注册请求后将其加入待调度队列waitingDrivers,并等待分配执行资源。

1.1 Dirver调度(分配Driver执行容器,1个)

Master中调度程序执行时会为Driver分配一满足其执行要求的Worker, 并通知Worker启动将Driver。Worker接到执行Driver指令后创建DriverRunner执行Driver(应用程序mainClass,mainClass执行时其会创建Spark执行上下文环境:SparkContext。伴随SparkContext会创建DAGScheduler和TaskScheduler分别用于Stage调度和任务调度,并会触发RDD的Action算子提交job)。

1.2 APP调度(分配Executor, 多个)

若想Job运行就需要得到执行资源,Dirver成功执行后,会通过SparkDeployScheduler-Backend创建AppClient(包装App信息,包含可以创建CoarseGrainedExecutorBackend实例Command),用于向Master汇报资源需求。Master接到AppClient的汇报后,将其加入waittingApps队列,等待调度。

App调度时会为app分配满足条件的资源-----Worker(State是Alive,其上并没有该Application的executor,可用内存满足要求(spark.executor.memory指定,默认512), 核满足要求(spark.cores.max, 最大可用core数,若未指定,则为全部资源)),然后通知Woker启动Excutor. 及向AppClient发送ExecutorAdded消息。

进行调度时,调度程序会根据配制SpreadOutApps = spark.deploy.spreadOut情况决定资源分配方式,若

SpreadOutApps方式:将每个app分配到尽可能多的worker中执行。

1 从列表中取下一app,根据CPU情况找出合适的woker,按核从小到大排序

2 如果worker节点存在可以分配的core 则进行预分配处理(轮循一次分一个直至满足app需求),并在分配列表(assigned = Array[Int](numUsable))中记数。

3根据assinged列表中的预分配信息,进行分配Executor(真实分配)

4 启动Executor并设置app.state =  ApplicationState.RUNNING

非SpreadOutApps方式: 将每个app分配到尽可能少的worker中执行。

1 从可用的worker列表中取下一work. (worker <- workers if worker.coresFree > 0)

2 遍历waitingApps 找到满足app运行条件的app,进行分配

3启动Executor(launchExecutor(w,e))并设置app.state =  ApplicationState.RUNNING

其中:launchExcutor(worker, exec) 具体内容如下:

向executor分配给worker

通知worker启动executor

由分配过程可知, 分配的Excutor个数与CPU核心数有关。当指定完Worker节点后,会在Worker节点创建ExecutorRunner,并启动,执行App中的Command 去创建并启动CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend启动后,会首先通过传入的driverUrl这个参数向在CoarseGrainedSchedulerBackend::DriverActor(用于与Master通信,及调度任务)发送RegisterExecutor(executorId, hostPort, cores),DriverActor会创建executorData(executor信息)加入executorDataMap供后续task使用,并回复RegisteredExecutor,此时CoarseGrainedExecutorBackend会创建一个org.apache.spark.executor.Executor。至此,Executor创建完毕。Executor是直接用于task执行, 是集群中的直接劳动者。

至此,资源分配结束。当分配完资源后,就可以为依本地性为任务分配具体的执行资源。

2、Stage划分

当执行mainClass时,执行到RDD的action算子时,会触发执行作业(sc.runJob),最终通过调用DAGScheduler的runJob方法根据RDD信息及action算子要做的操作创建ResultStage(FinalStage)及ActiveJob。

若ResultStage创建成功的话,根据配制信息及RDD特征可分为本地执行,集群执行。

若“spark.localExecution.enable”指定允许本地运行(默认为:false,不允许),具RDD的action算了允许本地运行allowLocal=true,且RDD只有一个partition的话可以直接以本地线程执行job,无需划分stage。否则要将job分成多个Stage提交到集群去执行(通过提交ResultStage进行)。

因为ResultStage提交时,首先会去判断其是否存在缺失的ParentStage(也就是说是否存在未完成的父Stage)。若有,则其需要等待其父Stage执行完成,才能进行提交执行。
       判断是否存在Stage的标准是看是否存在ShuffeDependency(Stage的分界线)。提交ResultStage时会根据其finalRDD 的依赖递归的寻找其DAG图中是否存在ShuffeDependency, 若存在,则创建ShuffleMapStage做为finalStage的父Stage以此类似。但至此,只能说存在父Stage并不能说存在缺失的父Stage. 判断缺失的标准是看其结果成功的输出信息(status)个数与其处理的分区个数是否相同,如若相同,则说明父Stage已经执行完成, 不存在missing;否则,说明还未完成,存在missing.  因为将ShuffleMapStage划分成maptask时,每个Partition对应一个maptask, 每个task会得到一个status输出结果信息,并在执行结束时将输出结果上报mapOutputTracker,并更新shuffleStage状态(将status增加进行其outputLocs列表,并将numAvailableOutputs加1),若numAvailableOutputs 与 Stage所要处理的partitions一致,说明所有的task都已经执行完成,即Stage执行完成;否则,说明还有task未完成,即Stage未完成。
       由上述分析可知,存在依赖关系的两个Stage,如果父Stage未执行完成,子Stage不能提交,也就是不能转变为Taskset加入任务调度队列。因此其先后顺序是严格控制的。我们知道只有存在ShuffleDependency时,才会划分Stage,这也就是说两个Stage之间是要做Shuffle操作的。根据上述分析可知Shuffle时ShuffleWrite做不完,ShuffleRead不能进行.

3. Task调度

当Stage不存在缺失的ParentStage时,会将其转换为TaskSet并提交。转换时依Stage类型进行转换:将ResultStage转换成ResultTask,
ShuffleMapStage转换成ShuffleMapTask. Task个数由Stage中finalRDD 的分区数决定。

当转换成的TaskSet提交之后,将其通过taskScheduler包装成TaskSetManager并添加至调度队列中(Pool),等待调度。在包装成TaskSetManager时,根据task的preferredLocatitions将任务分类存放在pendingTasksForExecutor,
pendingTaskForHost, pendingTasksForRack,
pendingTaskWithNoPrefs及allPendingTasks中,
前三个列表是是包含关系(本地性越来越低),范围起来越大,例如:在pendingTasksForExecutor也在pendingTaskForHost,pendingTasksForRack中, 分类的目的是在调度时,依次由本地性高à低的查找task。

在进行Task调度时,首先根据调度策略将可调度所有taskset进行排序,然后对排好序的taskset待调度列表中的taskset,按序进行分配Executor。再分配Executor时,然后逐个为Executor列表中可用的Executor在此次选择的taskset中按本地性由高到低查找适配任务。此处任务调度为延迟调度,即若本次调度时间距上一任务结束时间小于当前本地性配制时间则等待,若过了配制时间,本地性要求逐渐降低,再去查找适配的task。当选定某一task后后将其加入runningtask列表,当其执行完成时会加入success列表,下次调度时就会过滤过存在这两个列表中的任务,避免重复调度。

当一个任务执行结束时,会将其从runningtask中移除,并加入success,并会适放其占用的执行资源,供后序task使用,
将判断其执行成功的task数与此taskset任务总数相等时,意为taskset中所有任务执行结束,也就是taskset结束。此时会将taskset移除出可调度队列。

重复上述过程直到taskset待调度列表为空。即所有作业(job)执行完成。

3.1 spark调度策略

上文任务调度时提到,在调度任务时,首先后依据调度策略对任务按优先级进行排序。下面就调度策略就行介绍。

Spark现有的调度策略有FIFO 及 Fair两种。采用何种调度策略由“spark.scheduler.mode”参数指定,默认为FIFO类型。

……………………

   …………………………

时间: 2024-08-02 15:13:24

Spark 任务调度分析的相关文章

Spark源代码分析之六:Task调度(二)

话说在<Spark源代码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这种方法针对接收到的ReviveOffers事件进行处理.代码例如以下: // Make fake resource offers on all executors     // 在全部的executors上提供假的资源(抽象的资源.也就是资源的对象信息,我是这么理解的)     private def makeOffers() {       /

Spork: Pig on Spark实现分析

介绍 Spork是Pig on Spark的highly experimental版本,依赖的版本也比较久,如之前文章里所说,目前我把Spork维护在自己的github上:flare-spork. 本文分析的是Spork的实现方式和具体内容. Spark Launcher 在hadoop executionengine包路径下,写了一个Spark启动器,同MapReduceLauncher类似,会在launchPig的时候,把传入的物理执行计划进行翻译. MR启动器翻译的是MR的操作,以及进一步

挨踢部落第一期:Spark离线分析维度

活动说明:挨踢部落是为核心开发者提供深度技术交流,解决开发需求,资源共享的服务社群.基于此社群,我们邀请了业界技术大咖对开发需求进行一对一突破,解除开发过程中的绊脚石.以最专业.最高效的答复为开发者解决开发难题. 话题关键词: 大数据  spark  数据分析  数据画像 部落阵容:徐韬,龙珠直播大数据主管:王劲,数果科技 联合创始人: 面向对象:初级开发工程师,数据分析师,运维工程师 参与方式:加入51CTO开发者QQ交流群 370892523 ,有任何技术问题,在群里提问,或发给群主小官.

Spark日志分析项目Demo(9)--常规性能调优

一 分配更多资源 分配更多资源:性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升,是显而易见的:基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置:在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限:那么才是考虑去做后面的这些性能调优的点. 问题: 1.分配哪些资源? 2.在哪里分配这些资源? 3.为什么

SylixOS任务调度分析

任务调度相关链表SylixOS将任务控制块加入到不同的任务调度链表进行管理,创建一个任务就会把新创建的任务加入到优先级就绪表,等待被调度执行.根据不同的任务阻塞原因会被加入到不同的阻塞表中.1.1 优先级就绪表图 1.1 任务优先级就绪表SylixOS系统启动的过程会初始化一个任务优先级就绪表,当创建新任务时,根据任务的优先级加入到对应的优先级就绪表中,如图 1.1所示.系统启动过程创建一个优先级最低的IDLE任务.1.2 任务控制块地址表图 1.2 任务TCB地址表创建新任务添加到就绪表的同时

Spark任务调度executors分配问题 in yarn

红色留着继续思考.  问题背景:  CCSWYB ,在云平台上模拟shell流程,在各个节点上分配fvcom计算任务. Spark程序流程: 从HDFS中读取tasklist.txt(每一行对应一个任务) 经过一些操作过后生成一个JavaPairRDD ,记作data,对data执行foreach操作,函数内执行shell脚本启动任务.可以正常执行任务.  集群: 四个i5机器,hadoop2.3.0 + spark 1.0.0 + jdk1.7.0_60 问题: 任务数目从2 - 20 左右,

Spark交互式分析平台Apache Zeppelin的安装

Zeppelin介绍 Apache Zeppelin提供了web版的类似ipython的notebook,用于做数据分析和可视化.背后可以接入不同的数据处理引擎,包括spark, hive, tajo等,原生支持scala, java, shell, markdown等.它的整体展现和使用形式和Databricks Cloud是一样的,就是来自于当时的demo. Zeppelin可实现你所需要的: - 数据采集 - 数据发现 - 数据分析 - 数据可视化和协作 支持多种语言,默认是scala(背

SPARK 任务调度源码总结

任务调度可以从一个Action类算子开始,因为Action类算子会触发一个job的执行 划分stage,以taskSet形式提交任务,DAGScheduler 类中getMessingParentStages()方法是切割job划分stage 原文地址:https://www.cnblogs.com/xiangyuguan/p/11241151.html

spark 算子分析

别的不说先上官网: action 这些算子中需要注意: 1.reduce 和 reduceByKey 虽说都有reduce,但是一个是action级别,一个是transformation级别,速度上会有很大的差异 2.groupBy的使用如下 groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组. val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 ==