Spark(五十二):Spark Scheduler模块之DAGScheduler流程

导入

从一个Job运行过程中来看DAGScheduler是运行在Driver端的,其工作流程如下图:

图中涉及到的词汇概念:

1. RDD——Resillient Distributed Dataset 弹性分布式数据集。

2. Operation——作用于RDD的各种操作分为transformation和action。

3. Job——作业,一个JOB包含多个RDD及作用于相应RDD上的各种operation。

4. Stage——一个作业分为多个阶段。

5. Partition——数据分区, 一个RDD中的数据可以分成多个不同的区。

6. DAG——Directed Acycle graph,有向无环图,反应RDD之间的依赖关系。

7. Narrow dependency——窄依赖,子RDD依赖于父RDD中固定的data partition。

8. Wide Dependency——宽依赖,子RDD对父RDD中的所有data partition都有依赖。

9. Caching Managenment——缓存管理,对RDD的中间计算结果进行缓存管理以加快整体的处理速度。

在Driver端,运行一个job时,涉及到DAGSheduler的流程如下:

1)调用applicaiton_jar.jar(应用程序入口函数),应用程序的运行过程依赖SparkContext,且需要初始化SparkContext sc,通过sc就可以创建RDD了(因为RDD是调用sc来创建的);

2)初始化SparkContext过程中会初始化DAGScheduler,并调用SparkContext.createTaskScheduler(this, master, deployMode)来初始化TaskScheduler、ShedulerBackend;

3)应用程序实际上是执行的RDD的transform或者action函数,当RDD#action函数触发时,实际上这样的action函数内部会调用sc.submitJob(...)方法,在SparkContext#submitJob(...)方法内部会根据action创建ResultStage,并找到其依赖的所有ShuffleMapStage。stage之间按照顺序执行,待前一个stage执行完成成功,才能执行下一个stage,所有stage执行成功后,该job才算执行完成。

4)stage实际上可以看作为TaskSet,它实际上代表的就是一个独立的Task集合,DAGScheduler将调用TaskScheduler来对TaskSet进行作业调度;

5)TaskScheduler调度过程是将task序列化通过RPC传递给Executor,Executor上会使用TaskRunner来运行task;

6)TaskScheduler上task如果运行失败,TaskScheduler会重试处理;同样在stage失败后,DAGScheduler也会触发stage重试处理。需要注意:这里如果stage失败,对当前stage重算,而不是从上一个stage开始,这样也是DAG划分stage的原因。

DAGScheduler源码分析

DAGScheduler功能:

  1)最高层的调度层,实现了stage-oriented(面向阶段)调度。DAGScheduler为每个作业计算出一个描述stages的DAG,跟踪哪些RDD和stage输出实现,并找到运行作业的最小计划。然后,它将stages封装为TaskSets提交给在集群上运行它们的底层TaskScheduler实现(TaskScheduler唯一实现类是TaskSchedulerImpl)。任务集包含完全独立的一组任务,这些任务可以根据群集中已经存在的数据(例如,前几个阶段的映射输出文件)立即运行,但如果此数据不可用,它可能会失败。

  2)Spark stages 是将RDD图在Shuffle边界处断开来创建的。具有“窄(narrow)”依赖关系的RDD操作(如map()和filter())在每个阶段中被流水线连接到一组任务中,但是具有shuffle依赖关系的操作需要多个阶段(一个阶段写入一组映射输出文件,另一个阶段在屏障后读取这些文件)。最后,每个阶段将只具有对其他阶段的shuffle依赖,并且可以在其中计算多个操作。这些操作的实际管道化发生在各种RDD的rdd.compute()函数中。

  3)除了划分stages的DAG之外,DAGScheduler还根据当前缓存状态确定运行每个task的首选位置,并将这些位置传递给底层TaskScheduler(任务调度器)。此外,它还处理由于shuffle输出文件丢失而导致的故障,在这种情况下,可能需要重新提交旧stages。在内部TaskScheduler会处理stage中不是由shuffle文件丢失引起的失败,它会在取消整个stage之前对每个任务重试几次。

  4)要从故障中恢复,同一阶段可能需要多次运行,这称为“attempts”。如果 TaskScheduler 报告某个任务由于前一阶段的映射输出文件丢失而失败,则DAGScheduler将重新提交该丢失的阶段。这是通过具有FetchFailed的CompletionEvent或ExecutorLost事件检测到的。DAGScheduler将等待一小段时间来查看其他节点或任务是否失败,然后为计算丢失任务的任何丢失阶段重新提交TaskSets(任务集)。作为这个过程的一部分,我们可能还必须为以前清理stage objects的旧(已完成)stage创建stage objects。由于stage的旧“attempts”中的任务可能仍在运行,因此必须小心映射在正确的stage对象中接收到的任何事件。

查看此代码时,有几个关键概念:

-Jobs:(由[ActiveJob]表示)是提交给调度程序的顶级工作项。例如,当用户调用诸如count()之类的操作时,作业将通过sc.submitJob()方法提交。每个作业可能需要执行多个阶段来构建中间数据。

-Stages:Stage是一组任务(TaskSet),用于计算作业中的中间结果,其中每个任务在同一个RDD的分区上计算相同的函数。

Stage在shuffle边界处分离,这会引入一个屏障(我们必须等待上一阶段完成获取输出)。

有两种类型的Stage(阶段):【ResultStage】(对于执行操作的最后阶段);【ShuffleMapStage】(为shuffle写入映射输出文件)。

如果这些作业重用同一个RDD,则Stage(阶段)通常在多个作业之间共享。

-Tasks:是单独的工作单元,每个工作单元发送到一台机器。

-Cache tracking: DagScheduler会找出缓存哪些RDD以避免重新计算它们,同样会记住哪些shuffle map stage已经生成了输出文件,以避免重复shuffle的映射端。

-Preferred locations:DAGScheduler还根据其底层RDD的首选位置,或缓存或无序处理数据的位置,计算在阶段中运行每个任务的位置。

-Cleanup:当依赖于它们的正在运行的作业完成时,所有数据结构都会被清除,以防止长时间运行的应用程序中发生内存泄漏。

DAGScheduler的生命周期

那么下边将会结合代码对DAGScheduler整个生命周期进行介绍,DAGScheduler的生命周期:

1)初始化DAGScheduler

2)根据RDD DAG划分Stages

3)对Stage进行调度、Stage容错

4)实例销毁

1)DAGScheduler之初始化

2)DAGScheduler之根据RDD DAG划分Stages

3)DAGScheduler之对Stage进行调度、容错

4)DAGScheduler之实例销毁

参考:

1)《Spark运行机制之DAG原理

2)《Spark Scheduler模块详解-DAGScheduler实现

3)《spark源码走读之DAGScheduler

原文地址:https://www.cnblogs.com/yy3b2007com/p/11094617.html

时间: 2024-08-27 13:27:07

Spark(五十二):Spark Scheduler模块之DAGScheduler流程的相关文章

五十二 常用第三方模块 图形界面

Python支持多种图形界面的第三方库,包括: Tk wxWidgets Qt GTK 等等. 但是Python自带的库是支持Tk的Tkinter,使用Tkinter,无需安装任何包,就可以直接使用.本章简单介绍如何使用Tkinter进行GUI编程. Tkinter 我们来梳理一下概念: 我们编写的Python代码会调用内置的Tkinter,Tkinter封装了访问Tk的接口: Tk是一个图形库,支持多个操作系统,使用Tcl语言开发: Tk会调用操作系统提供的本地GUI接口,完成最终的GUI.

QT开发(五十二)———QML语言

QT开发(五十二)---QML语言 QML是一种声明语言,用于描述程序界面.QML将用户界面分解成一块块小的元素,每一元素都由很多组件构成.QML定义了用户界面元素的外观和行为:更复杂的逻辑则可以结合JavaScript脚本实现. 一.QML基础语法 1.Import语句 QML代码中,import语句一般写在头几行,主要用途如下:     A.包含类型的全名空间     B.包含QML代码文件的目录     C.JavaScript代码文件 格式如下: import Namespace Ver

【黑金原创教程】【FPGA那些事儿-驱动篇I 】实验十二:串口模块① — 发送

实验十二:串口模块① — 发送 串口固然是典型的实验,想必许多同学已经作烂,不过笔者还要循例介绍一下.我们知道串口有发送与接收之分,实验十二的实验目的就是实现串口发送,然而不同的是 ... 笔者会用另一种思路去实现串口发送. 图12.1 PS/2发送时序与串口发送时序. 如图12.1所示,串口发送时序相较PS/2发送时序,串口发送时序就像断了翅膀的小鸟般,没有时钟信号控制整个传输协议.除此之外,串口发送时序与PS/2发送时序近似的地方也非常惊人 ... 默认下,一帧PS/2数据有11位,对此一帧

第五十二个知识点:选择一个先进的应用概念,如电子投票,拍卖或多方计算。这样一个系统的大致安全需求是什么

第五十二个知识点:选择一个先进的应用概念,如电子投票,拍卖或多方计算.这样一个系统的大致安全需求是什么 这是我们认为每个密码学博士一年级都应该知道的52件事中的最后一件.你可能已经收集了过去的52个博客,我们希望学生知道从理论到实践的各个方面.但关键是你需要在密码学中考虑的不仅是对遵守规则的玩家的安全,还有对不遵守规则的玩家的安全.让我们从投票.拍卖和多方计算的角度来研究这个问题. 让我们先讨论一下三个应用程序的含义. 在投票中,我们根绝投票者进行一些投票方案(得票最多者当选.多选.赞成投票或其

Spark(十二) -- Spark On Yarn & Spark as a Service & Spark On Tachyon

Spark On Yarn: 从0.6.0版本其,就可以在在Yarn上运行Spark 通过Yarn进行统一的资源管理和调度 进而可以实现不止Spark,多种处理框架并存工作的场景 部署Spark On Yarn的方式其实和Standalone是差不多的,区别就是需要在spark-env.sh中添加一些yarn的环境配置,在提交作业的时候会根据这些配置加载yarn的信息,然后将作业提交到yarn上进行管理 首先请确保已经部署了Yarn,相关操作请参考: hadoop2.2.0集群安装和配置 部署完

Spark源码分析之-scheduler模块

RDD的依赖关系和Stage的分类 在Spark中,每一个RDD是对于数据集在某一状态下的表现形式,而这个状态有可能是从前一状态转换而来的,因此换句话说这一个RDD有可能与之前的RDD(s)有依赖关系.根据依赖关系的不同,可以将RDD分成两种不同的类型:Narrow Dependency和Wide Dependency. Narrow Dependency指的是 child RDD只依赖于parent RDD(s)固定数量的partition. Wide Dependency指的是child R

五十二、django 中间件,csrf跨站请求伪造,auth模块表

django 中间件 django中间件事类似django的保安,请求的时候需要先经过中间件才能到达django后端(urls,views,templates,models), 响应走的时候也需要经过中间件才能到达web服务网关接口 django中间件中有五个用户可以自定义的方法 django中间件可以用来做什么? 1.网站全局的身份校验,访问频率限制,权限校验..只要涉及到全局的校验都可以在中间件中完成 2.django的中间件是所有web框架中,做的最好 需要掌握的方法有: 1.proces

Python3 学习第十二弹: 模块学习五之pickle与json

对于python来说,这两个模块是十分实用的两个模块,以一种简单的方法用于储存数据实例. pickle模块 提供用来储存Python各种数据序列化存储 # 原来的cPickle已经在python3中与pickle合并 dumps(obj) 返回对象信息存储成的二进制字符串 loads(str) 返回二进制字符串的对象信息 dump(obj, file) 将对象信息写入到二进制文件中,包括基本数据结构,函数实例,类实例 load(file) 从二进制文件中读入一个储存的对象,顺序与写入的顺序相同

Spark(十) -- Spark Streaming API编程

本文测试的Spark版本是1.3.1 Spark Streaming编程模型: 第一步: 需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数: 1.SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息 2.Seconds对象:该对象设置了StreamingContext多久读取一次数据流 第二步: 构建好入口对象之后,直接调用该入口的方法读取各