Spark DAGSheduler生成Stage过程分析实验

Spark Action会触发SparkContext类的runJob,而runJob会继续调用DAGSchduler类的runJob

DAGSchduler类的runJob方法调用submitJob方法,并根据返回的completionFulture的value判断Job是否完成。

onReceive用于DAGScheduler不断循环的处理事件,其中submitJob()会产生JobSubmitted事件,进而触发handleJobSubmitted方法。

正常情况下会根据finalStage创建一个ActiveJob。而finalStage就是由spark action对应的finalRDD生成的,而该stage要确认所有依赖的stage都执行完,才可以执行。也就是通过getMessingParentStages方法判断的。

这个方法用一个栈来实现递归的切分stage,然后返回一个宽依赖的HashSet,如果是宽依赖类型就会调用

之后提交stage,根据missingStage执行各个stage。划分DAG结束

submitStage会依次执行这个DAG中的stage,如果有父stage就先执行父stage,否则就提交这个stage,加入watingstages中。

示例:

scala> sc.makeRDD(Seq(1,2,3)).count

16/10/28 17:54:59 [INFO] [org.apache.spark.SparkContext:59] - Starting job: count at <console>:13

16/10/28 17:54:59 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Got job 0 (count at <console>:13) with 22 output partitions (allowLocal=false)

16/10/28 17:54:59 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Final stage: Stage 0(count at <console>:13)

16/10/28 17:54:59 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Parents of final stage: List()

16/10/28 17:54:59 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Missing parents: List()

16/10/28 17:54:59 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at <console>:13), which has no missing parents

collect依赖于reduceByKey,reduceByKey依赖于map,故会先提交map (Stage 1 (MappedRDD[2] at map at <console>:13))

scala> sc.makeRDD(Seq(1,2,3)).map(l =>(l,1)).reduceByKey((v1,v2) => v1+v2).collect
16/10/28 18:00:07 [INFO] [org.apache.spark.SparkContext:59] - Starting job: collect at <console>:13
16/10/28 18:00:07 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Registering RDD 2 (map at <console>:13)
16/10/28 18:00:07 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Got job 1 (collect at <console>:13) with 22 output partitions (allowLocal=false)
16/10/28 18:00:07 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Final stage: Stage 2(collect at <console>:13)
16/10/28 18:00:07 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Parents of final stage: List(Stage 1)
16/10/28 18:00:07 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Missing parents: List(Stage 1)
16/10/28 18:00:07 [INFO] [org.apache.spark.scheduler.DAGScheduler:59] - Submitting Stage 1 (MappedRDD[2] at map at <console>:13), which has no missing parents

时间: 2024-10-06 10:57:36

Spark DAGSheduler生成Stage过程分析实验的相关文章

Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

Spark下生成2000w测试数据(每条记录150列) 使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误.解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟. 如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试. import java.io.BufferedWriter; import

Spark 资源调度包 stage 类解析

spark 资源调度包 Stage(阶段) 类解析 类注释: /** * A stage is a set of parallel tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle dependencies. * 一个阶段是所有计算相同功能的并行任务集合, 作为spark作业的一部分, 这些任务都有相同

Spark Streaming应用启动过程分析

本文为SparkStreaming源码剖析的第三篇,主要分析SparkStreaming启动过程. 在调用StreamingContext.start方法后,进入JobScheduler.start方法中,各子元素start方法的调用顺序如下: private var eventLoop : EventLoop[JobSchedulerEvent] = null val listenerBus = new StreamingListenerBus() private val jobGenerat

spark 中划分stage的思路

窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为 一个父RDD的分区对应于一个子RDD的分区 两个父RDD的分区对应于一个子RDD 的分区. 宽依赖指子RDD的每个分区都要依赖于父RDD的所有分区,这是shuffle类操作 Stage: 一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage.Stage的划分,简单的说是以shuffle和result这两种类型来划分.在Spark中有两类task,一类是shuffleMap

spark 笔记 8: Stage

Stage 是一组独立的任务,他们在一个job中执行相同的功能(function),功能的划分是以shuffle为边界的.DAG调度器以拓扑顺序执行同一个Stage中的task. /** * A stage is a set of independent tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle

Spark Streaming和Flume-NG对接实验(好文转发)

转发自玖疯的博客 http://www.cnblogs.com/lxf20061900/p/3866252.html Spark Streaming是一个新的实时计算的利器,而且还在快速的发展.它将输入流切分成一个个的DStream转换为RDD,从而可以使用Spark来处理.它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函数:map, reduce, join, window等. 本文将Spark Streamin

Spark 学习总结

摘要: 1.spark_core 2.spark_sql 3.spark_ml 内容: 1.spark_core 原理篇: Spark RDD 核心总结 RangePartitioner 实现简记 spark 核心作业调度和任务调度 Spark DAGSheduler生成Stage过程分析实验 实战篇: Spark算子选择策略 Spark的持久化简记 Spark读取HBase [转]Spark常见问题汇总 调优篇: Spark算子选择策略 spark参数调优 Spark Shuffle原理.Sh

Spark 运行架构核心总结

摘要: 1.基本术语 2.运行架构 2.1基本架构 2.2运行流程  2.3相关的UML类图  2.4调度模块: 2.4.1作业调度简介 2.4.2任务调度简介 3.运行模式 3.1 standalone模式 4.RDD实战 总结: 基本术语: Application:在Spark 上建立的用户程序,一个程序由一个驱动程序(Driver Program)和集群中的执行进程(Executer)构成. Driver Program:运行应用程序(Application)的main函数和创建Spark

Spark核心作业调度和任务调度之DAGScheduler源码

前言:本文是我学习Spark 源码与内部原理用,同时也希望能给新手一些帮助,入道不深,如有遗漏或错误的,请在原文评论或者发送至我的邮箱 [email protected] 摘要: 1.作业调度核心--DAGScheduler 2.DAGScheduler类说明 2.1DAGScheduler 2.2ActiveJob 2.3Stage 2.4Task 3.工作流程 3.1划分Stage 3.2生成Job,提交Stage 3.3任务集的提交 3.4任务作业完成状态的监控 3.5任务结果的获取 内容