Spark作业执行

Spark中一个action触发一个job的执行,在job提交过程中主要涉及Driver和Executor两个节点。

Driver主要解决

1. RDD 依赖性分析,生成DAG。

2. 根据RDD DAG将job分割为多个Stage。

3. Stage一经确认,即生成相应的Task,将生成的Task分发到Executor执行。

Executor节点在接收到执行任务的指令后,启动新的线程运行任务,并将结果返回。

划分Stage

当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。

以GroupByKey操作为例,该操作返回的结果实际上是一个ShuffleRDD,当DAGScheduler遍历到这个ShuffleRDD的时候,因为其Dependency是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及shuffleDependency等对象就被用来构建一个新的Stage,这个Stage的输出结果的分区方式,则由ShuffleDependency中的Partitioner对象来决定。

可以看到,尽管划分和构建Stage的依据是ShuffleDependency,对应的RDD也就是这里的ShuffleRDD,但是这个Stage所处理的数据是从这个shuffleRDD的父RDD开始计算的,只是最终的输出结果的位置信息参考了ShuffleRDD返回的ShuffleDependency里所包含的内容。而shuffleRDD本身的运算操作(其实就是一个获取shuffle结果的过程),是在下一个Stage里进行的。

提交Stage

上一个步骤得到一个或多个有依赖关系的Stage,其中直接触发Job的RDD所关联的Stage作为FinalStage生成一个Job实例,这两者的关系进一步存储在resultStageToJob映射表中,用于在该Stage全部完成时做一些后续处理,如报告状态,清理Job相关数据等。

具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage。 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交

什么时候waitingStages中的Stage会被重新提交呢,当一个属于中间过程Stage的任务(这种类型的任务所对应的类为ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,如果是都完成了,则DAGScheduler将重新扫描一次waitingStages中的所有Stage,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以提交该Stage。

此外每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫描并提交就绪Stage的调用过程

TaskSet的提交

每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet,这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的Task到对应的Executor节点上进行运算

时间: 2024-10-06 01:38:31

Spark作业执行的相关文章

【转】Spark架构与作业执行流程简介

原文链接 http://www.cnblogs.com/shenh062326/p/3658543.html Spark架构与作业执行流程简介 Local模式 运行Spark最简单的方法是通过Local模式(即伪分布式模式). 运行命令为:./bin/run-example org.apache.spark.examples.SparkPi local 基于standalone的Spark架构与作业执行流程 Standalone模式下,集群启动时包括Master与Worker,其中Master负

数据倾斜是多么痛?spark作业调优秘籍

目录视图 摘要视图 订阅 [观点]物联网与大数据将助推工业应用的崛起,你认同么?      CSDN日报20170703--<从高考到程序员--我一直在寻找答案>      [直播]探究Linux的总线.设备.驱动模型! 数据倾斜是多么痛?spark作业调优秘籍 2017-06-27 13:28 39人阅读 评论(0) 收藏 举报  分类: Spark(124)  原文:https://mp.weixin.qq.com/s?__biz=MzI5OTAwMTM1MQ==&mid=2456

spark作业调优(一)-------合理分配资源

原理: spark作业sparkcontext.DAGDAGSchedler.taskSchulder.会将编写算子.切割成大量的task,会提交到application的excutor上执行如何资源设置cup core和excutor较少,就会造成能并行处理的task数量较少,从而就会增加task的运行时间,所以根据资源配置分配资源是很有必要的会很大程度的提高task的运行时间. 例子: 设置参数: 原文地址:https://www.cnblogs.com/SupperMan/p/903544

LK按作业执行时长优化

新到一家公司,需要折腾点认可出来.然后开始苦逼的优化工作.暂时不吐槽权限问题!!!优化效果优化前,作业历史记录(前30)P图优化前,CPU使用情况有几个作业平均时长2.5~3.5小时,还有很多时长在半小时以上的作业,基本要到11-12点才能完成作业统计.CPU每天7:00-11:00一直维持在比较高的数值.优化后,作业历史记录(前30)P图最近三天的CPU情况最近一天的CPU情况优化后,作业执行时长明显降低,从最近3天执行情况看,9:00前统计作业能够执行完成.服务器CPU高峰时段由之前的7:0

sqlserver 监控自动化作业执行情况

ALTER procedure [dbo].[monitorJob] @name varchar(100) as begin declare @bd varchar(100) ; if exists( select * from  msdb.dbo.sysjobhistory where job_id in (select job_id from msdb.dbo.sysjobs where [name][email protected] ) and run_date=convert(varch

SQLServer 2012异常问题(一)--故障转移群集+镜像环境导致作业执行失败

原文:SQLServer 2012异常问题(一)--故障转移群集+镜像环境导致作业执行失败 先感谢一下我的同事们最先发现此问题,鸣谢:向飞.志刚.海云 最近在生产环境发现一个诡异的问题: 环境:WINDOWS 2012+SQLSERVER 2012 SP1,双节点的故障转移群集+单节点的SQLSERVER 2012 SP1实例(镜像) 生产数据库是从SQLSERVER 2008R2迁移到2012的,迁移过程很顺利,按照一般经验,可能导致数据库所有者丢失,因此在迁移后手动修改数据库所有者为sa,与

自己编写的spark代码执行流程

我们自己编写了spark代码后;放到集群中一执行,就会出现问题,没有序列化.指定的配置文件不存在.classnotfound等等.这其实很多时候就是因为我们对自己编写的spark代码执行流程的不熟悉导致的,源码阅读可以解决,但源码不是每个人都能看懂或能看进去的,下面我们就来讲一下,我们自己写的spark代码究竟是这么执行的.从执行的过程可分为三个部分来分析main方法,RDD处理方法,DStream处理方法,从执行的JVM虚拟机可以分为两个部分driver端,worker端 一.main方法 m

Spark内部执行机制

Spark内部执行机制 1.1 内部执行流程 如下图1为分布式集群上spark应用程序的一般执行框架.主要由sparkcontext(spark上下文).cluster manager(资源管理器)和?executor(单个节点的执行进程).其中cluster manager负责整个集群的统一资源管理.executor是应用执行的主要进程,内部含有多个task线程以及内存空间. 图1 spark分布式部署图 详细流程图如下图2: 图2 详细流程图 (1) 应用程序在使用spark-submit提

Spark架构与作业执行流程简介(scala版)

在讲spark之前,不得不详细介绍一下RDD(Resilient Distributed Dataset),打开RDD的源码,一开始的介绍如此: 字面意思就是弹性分布式数据集,是spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合. Resilient:弹性的,它表示的是数据可以保存在磁盘,也可以保存在内存中 Distributed:它的数据分布式存储,并且可以做分布式的计算 Dataset:一个数据集,简单的理解为集合,用于存放数据的 事实上,关于RDD有5个特性