【Spark 深入学习 04】再说Spark底层运行机制

本节内容

· spark底层执行机制

· 细说RDD构建过程

· Job Stage的划分算法

· Task最佳计算位置算法

一、spark底层执行机制

对于Spark底层的运行原理,找到了一副很好的图,先贴上

客户端提交应用后,spark是如何执行的要有一个整体的概念,做到心中有数,先整体把握,才能更好的分模块开垦细节,废话不多说,先来看该图如何更好的理解。

1)提交前的联系

Worker向Master或则ResourceManager汇报自己有哪些资源(内存、CPU、磁盘空间、网络等),Master或则ResourceManager与Worker一直保持心跳

2)应用提交后

Spark通过RDD对分布式的数据进行管理,RDD记录了转换成“spark格式”后的数据分区(记录数据的存储位置)和数据分区对应的操作

· 应用提交后,形成RDD Graph,并且在后台创建DAG对象(spark不仅仅用DAG建模,而且还会执行它,并且里面不是用对象表示,而是用RDD对象之间的关系)

· DAG Scheduler 优先使用pipeline方法,把RDD的transformation压缩,当碰到wide transformation 时,narrow无法和wide pipeline,那DAG scheduler会把前面的transformation定义成一个stage,DAG Scheduler的工作结果就是将RDD产生一组stages

· 将DAG Scheduler产生的stages传送给task scheduler,task scheduler使用集群管理器依次执行task,task被分配到各个work下执行,当所有的task执行完毕,一个stage标记完成,再运行下一个stage,直到整个spark job完成。

简单理解, Spark 把要处理的数据,处理中间结果,和输出结果都定义成 RDD. 这样一个常见的 Spark job 就类似于:

• 从数据源读取数据,把输入生成一个 RDD;

• 通过运算把输入 RDD 转换成另一个RDD;

• 再通过运算把生成的 RDD 转换成另一个RDD,重复需要进行的 RDD 转换操作 (此处省略一千遍);

• 最后运算成结果 RDD,处理结果;

Spark的运行流程: Client提交应用,master找到一个worker启动driver[也可以其他],driver向master请求资源,之后将应用转化为RDD Graph,再由DAGScheduler将RDD Graph转换为stage的DAG提交给TaskScheduler,由TaskScheduler提交任务给executor。

从调度来看,经历了如下调度:application调度 -> Job调度 -> Stage调度 -> Task调度

二、细说RDD构建过程

从前面的学习我们发现 RDD 其实就是数据集,是一组数据被处理到一个阶段的状态。

每一个 Spark Job 就是定义了由输入 RDD,如何把它转化成下一个状态,再下一个状态 …… 直到转化成我们的输出。这些转化就是对 RDD 里每一个 data record 的操作。用个高大上点的语言,一个 Spark job 就是一系列的 RDD 以及他们之间的转换关系。那么用户如何才能定义 RDD 和转换关系呢?换句话说,用户如何使用 Spark 呢?

用户需要定义一个包含主函数的 Java (main) 类。在这个 main 函数中,无论业务逻辑多么复杂,无论你需要使用多少 Java 类,如果从 Spark 的角度简化你的程序,那么其实就是:

• 首先生成 JavaSparkContext 类的对象.

• 从 JavaSparkContext 类的对象里产生第一个输入RDD. 以读取 HDFS 作为数据源为例,调用 JavaSparkContext.textFile() 就生成第一个 RDD.

• 每个 RDD 都定义了一些标准的常用的变化,比如我们上面提到的 map, filter, reduceByKey …… 这些变化在 Spark 里叫做 transformation.

• 之后可以按照业务逻辑,调用这些函数。这些函数返回的也是 RDD, 然后继续调用,产生新的RDD …… 循环往复,构建你的 RDD 关系图。

• 注意 RDD 还定义了其他一些函数,比如 collect, count, saveAsTextFile 等等,他们的返回值不是 RDD. 这些函数在 Spark 里叫做 actions, 他们通常作为 job 的结尾处理。

• 用户调用 actions 产生输出结果,Job 结束。

Action 都是类似于 “数数这个 RDD 里有几个 data record”, 或者 ”把这个 RDD 存入一个文件” 等等。想想他们作为结尾其实非常合理:我们使用 Spark 总是来实现业务逻辑的吧?处理得出的结果自然需要写入文件,或者存入数据库,或者数数有多少元素,或者其他一些统计什么的。所以 Spark 要求只有用户使用了一个 action,一个 job 才算结束。当然,一个 job 可以有多个 action,比如我们的数据既要存入文件,我们又期望知道有多少个元素。

这些 RDD 组成的关系在 Spark 里叫做 DAG,就是有向无循环图,图论里的一个概念,大家有兴趣可以专门翻翻这个概念。可以发现,实践中绝大部分业务逻辑都可以用 DAG 表示,所以 spark 把 job 定义成 DAG 也就不足为奇了。

RDD 的两种变化

我们上面刚刚介绍了 transformation 的概念。在 Spark 眼中,transformation 被分成 narrow transformation 和 wide transformation. 这是什么东西呢?

上文提到过 RDD 被分成几个分区,分散在多台机器上。当我们把一个 RDD A 转化成下一个 RDD B 时,这里有两种情况:

· 有时候只需要一个 A 里面的一个分区,就可以产生 B 里的一个分区了,比如 map 的例子:A 和 B 之间每个分区是一一对应的关系,这就是 narrow transofmration.

· 还有一类 transformation,可能需要 A 里面所有的分区,才能产生 B 里的一个分区,比如 reduceByKey的例子,这就是 wide transformation.

Narrow 或者 Wide 有什么关系吗?

一个 Spark job 中可能需要连续地调用 transformation, 比如先 map,后 filter,然后再 map …… 那这些 RDD 的变化用图表示就是:

我们可以大胆设想一下,如果每个分区里的数据就待在那台机器的内存里,我们逐一的调用 map, filter, map 函数到这些分区里,Job 就很好的完成。

更重要的是,由于数据没有转移到别的机器,我们避免了 Network IO 或者 Disk IO. 唯一的任务就是把 map / filter 的运行环境搬到这些机器上运行,这对现代计算机来说,overhead 几乎可以忽略不计。

这种把多个操作合并到一起,在数据上一口气运行的方法在 Spark 里叫 pipeline (其实 pipeline 被广泛应用的很多领域,比如 CPU)。这时候不同就出现了:只有 narrow transformation 才可以进行 pipleline 操作。对于 wide transformation, RDD 转换需要很多分区运算,包括数据在机器间搬动,所以失去了 pipeline 的前提。

RDD 的执行

当用户调用 actions 函数时,Spark 会在后台创建出一个 DAG. 就是说 Spark 不仅用 DAG 建模,而且真正地创建出一个 DAG, 然后执行它(顺便说一句 DAG 在 Spark 里不是用一个对象表示的,而是用 RDD 对象之间的关系)。

Spark 会把这个 DAG 交给一个叫 DAG scheduler 的模块,DAG scheduler 会优先使用 pipeline 方法,把 RDD 的 transformation 压缩;当我们遇到 wide transformation 时,由于之前的 narrow transformation 无法和 wide transformation pipeline, 那 DAG scheduler 会把前面的 transformation 定义成一个 stage.

重要的事情说三遍:DAG scheduler 会分析 Spark Job 所有的 transformation, 用 wide transformation 作为边界,把所有 transformation 分成若干个stages. 一个 stage 里的一个分区就被 Spark 叫做一个task. 所以一个 task 是一个分区的数据和数据上面的操作,这些操作可能包括一个 transformation,也可能是多个,但一定是 narrow transformation.

DAG scheduler 工作的结果就是产生一组 stages. 这组 stages 被传到 Spark 的另一个组件 task scheduler, task scheduler 会使用集群管理器依次执行 task, 当所有的 task 执行完毕,一个 stage 标记完成;再运行下一个 stage …… 直到整个 Spark job 完成。

三、Job Stage的划分算法

从前文了解到的处理流程,RDD Graph->DAG Scheduler->Task Scheduler,DAG Scheduler将RDD转换为Job Stage。

由于Spark的算子构建一般都是链式的,这就涉及了要如何进行这些链式计算,Spark的策略是对这些算子,先划分Stage,然后在进行计算。

由于数据是分布式的存储在各个节点上的,所以为了减少网络传输的开销,就必须最大化的追求数据本地性,所谓的数据本地性是指,在计算时,数据本身已经在内存中或者利用已有缓存无需计算的方式获取数据。

1.  Stage划分算法思想

(1)一个Job由多个Stage构成

一个Job可以有一个或者多个Stage,Stage划分的依据就是宽依赖,产生宽依赖的算子:reduceByKey、groupByKey等等

(2)根据依赖关系,从前往后依次执行多个Stage

SparkApplication 中可以因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是有一个或者多个Stage构成,后面的Stage依赖前面的Stage,也就是说只有前面的Stage计算完后,后面的Stage才会运行。

(3)Stage的执行时Lazy级别的

所有的Stage会形成一个DAG(有向无环图),由于RDD的Lazy特性,导致Stage也是Lazy级别的,只有遇到了Action才会真正发生作业的执行,在Action之前,Spark框架只是将要进行的计算记录下来,并没有真的执行。Action导致作业执行的代码如下:触发作业,发送消息。消息的接收和处理:

(1)DAGScheduler启动一个线程EventLoop(消息循环器),不断地从消息队列中取消息。消息是通过EventLoop的put方法放入消息队列,当EventLoop拿到消息后会回调DAGScheduler的OnReceive,进而调用doOnReceive方法进行处理。

为什么要开辟线程来执行消息的读、取?这样可以提交更多的Job,异步处理多Job,处理的业务逻辑一致(调用自己方法也是发送消息),解耦合,扩展性好。

(2)在doOnReceive中通过模式匹配的方式把JobSubmitted封装的内容路由到handleJobSubmitted。

(3)在handleJobSubmitted中首先创建finalStage。

(4)通过递归的方式创建DAG。

四、Task最佳计算位置算法

1.Task任务本算法运用场景

在上一节,我们介绍了Job Stage划分算法,并最终得到了DAG图中的Result Stage(final Stage)。接下来我们通过查看Task任务本地性(为了保证Data Locality)的运用场景----Task的运行调度处理,来引入Task任务本地性算法。

在得到逻辑上Result Stage,Spark为了进行计算就必须先报任务以一定的集群可识别形式提交给集群进行计算。Spark的任务提交过程如下:

(1)生成ActiveJob,为提交finalStage做准备。

(2)提交finalStage

提交Stage,如果有未提交的ParentStage,则会递归提交这些ParentStage,只有所有ParentStage都计算完了,才能提交当前Stag

(3)提交MissingTask

missingTask会最先会再到需要计算的分片,然后对Stage的运行环境进行设定,然后取得Task计算的本地性级别,最后会根据这些信息建立Tasks来处理每个分片,在提交给底层TaskScheduler之前,Spark还会将Tasks封装成TaskSet。最后提交TaskSet给TaskScheduler,等待TaskScheduler最终向集群提交这些Task,并且DAGScheduler会监听这些Task的状态。

2.数据本地性

(1)这里我们来着重讲解获取数据本地性部分的代码:

这里会将要计算的分片(Partition)转换为(id, getPreferredLocs(stage.rdd, id)) 类型的truple,进而由truple转换未一个Map映射,在Task构造时需要一个locs参数,便可以利用这个映射由id得到相应Partition的本地性级别。

在每个分片(Partition)内部则是通过getPreferredLocs方法得到的

在具体算法实现的时候,首先查询DAGScheduler的内存数据结构中是否存在当前partition的数据本地性信息,若有的话就直接放回该信息;若没有首先会调用rdd.getPreferredLocations来得到数据的本地性。

例如想让Spark运行在Hbase上或者是一种现在Spark还没有直接支持的数据库上,此时开发者需要自定义RDD,为了保证Task计算的数据本地性,最为关键的方式就是必须实现RDD的getPreferredLocations方法,来支持各种来源的数据。

DAGScheduler计算数据本地性时,巧妙的借助了RDD自身的getPreferredLocations中的数据,最大化的优化效率,因为getPreferredLocations中表明了每个Partition的数据本地性。虽然当然Partition可能被persist或checkpoint,但是persist或checkpoint默认情况下肯定和getPreferredLocations中的partition的数据本地性是一致的。所以,这中算法就极大的简化了Task数据本地性算法的实现,并且优化了效率

五、参考资料

1.http://mp.weixin.qq.com/s/nDRt1VQTYmsYcW98q4cvEQ-五分钟深入 Spark 运行机制

2.http://blog.csdn.net/sinat_25306771/article/details/51429984

时间: 2024-08-06 11:57:54

【Spark 深入学习 04】再说Spark底层运行机制的相关文章

【Spark深入学习-11】Spark基本概念和运行模式

----本节内容------- 1.大数据基础 1.1大数据平台基本框架 1.2学习大数据的基础 1.3学习Spark的Hadoop基础 2.Hadoop生态基本介绍 2.1Hadoop生态组件介绍 2.2Hadoop计算框架介绍 3.Spark概述 3.1 Spark出现的技术背景 3.2 Spark核心概念介绍 4.Spark运行模式 4.1.Spark程序组成 4.2.Spark运行模式 5.参考资料 --------------------- 1.大数据基础 1.1 大数据平台基本框架

<转>ASP.NET学习笔记之理解MVC底层运行机制

ASP.NET MVC架构与实战系列之一:理解MVC底层运行机制 今天,我将开启一个崭新的话题:ASP.NET MVC框架的探讨.首先,我们回顾一下ASP.NET Web Form技术与ASP.NET MVC的异同点,并展示各自在Web领域的优劣点.在讨论之前,我对这两种技术都非常热衷,我个人觉得在实际的项目开发中,两者都能让我们受益匪浅,因此是目前Web领域两大平行和流行的技术.我们都知道,在传统的ASP.NET Web Form应用程序中,Microsoft已为我们设计了较为完整.简洁的开发

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.

【Spark深入学习 -13】Spark计算引擎剖析

----本节内容------- 1.遗留问题解答 2.Spark核心概念 2.1 RDD及RDD操作 2.2 Transformation和Action 2.3 Spark程序架构 2.4 Spark on Yarn运行流程 2.5 WordCount执行原理 3.Spark计算引擎原理 3.1 Spark内部原理 3.2 生成逻辑执行图 3.3 生成物理执行图 4.Spark Shuffle解析 4.1 Shuffle 简史 4.2  Spark Shuffle ·Shuffle Write

【Spark 深入学习 01】 Spark是什么鬼?

经过一段时间的学习和测试,是时候给spark的学习经历做一个总结了,对于spark的了解相对晚了写.春节期间(预计是无大事),本博准备推出20篇左右spark系列原创文章(先把牛吹出去再说) ,尽量将枯燥无味的技术讲的通俗易懂- r.kelly 2013年的时候第一次听说spark这么个神器,那时候它还幼小,没什么人鸟它,但是它强大基因注定了它不是个凡夫俗子, 故事就是从那一小撮人群中开始的. 一.Spark何许人也 姓名:Spark 性别:未知 出生地:加州大学伯克利分校AMP实验室 出生年月

【Spark 深入学习 -09】Spark生态组件及Master节点HA

----本节内容------- 1.Spark背景介绍 2.Spark是什么 3.Spark有什么 4.Spark部署 4.1.Spark部署的2方面 4.2.Spark编译 4.3.Spark Standalone部署 4.4.Standalone HA配置 4.5.伪分布式部署 5.Spark任务提交 5.1.Spark-shell 5.2.Spark-submit 6.参考资料 --------------------- 1.Spark背景介绍 Spark是AMLab实验室贡献出的代码,是

【Spark深入学习 -12】Spark程序设计与企业级应用案例02

----本节内容------- 1.遗留问题答疑 1.1 典型问题解答 1.2 知识点回顾 2.Spark编程基础 2.1 Spark开发四部曲 2.2 RDD典型实例 2.3 非RDD典型实例 3.问题解答 4.参考资料 --------------------- 每一次答疑阶段,我都会站在老师的角度去思考一下,如果是我,我应该怎么回答,每每如此,不禁吓出一身冷汗.有些问题看答案确实挺容易的,但当自己作为一个答疑者去思考,可能不一样,因为快速确认一个答案的同时,你得否认很多的东西,脑海里闪过很

【spark 深入学习 03】Spark RDD的蛮荒世界

RDD真的是一个很晦涩的词汇,他就是伯克利大学的博士们在论文中提出的一个概念,很抽象,很难懂:但是这是spark的核心概念,因此有必要spark rdd的知识点,用最简单.浅显易懂的词汇描述.不想用学术话的语言来阐述RDD是什么,用简单.容易理解的方式来描述. 一.什么是RDD,RDD出现的背景 Mapreduce计算模型的出现解决了分布式计算的诸多难题,但是由于MR对数据共享的解决方案比较低效,导致MR编程模型效率不高,将数据写到一个稳定的外部存储系统,如HDFS,这个会引起数据复写.磁盘IO

Laravel5学习四:Facade的运行机制

什么是Facades 官方文档如是说: Facades 提供一个 静态接口 给在应用程序的 服务容器 中可以取用的类.Laravel 附带许多 facades,甚至你可能已经在不知情的状况下使用过它们!Laravel 的「facades」作为在 IoC 容器里面的基础类的静态代理,提供的语法有简洁.易表达的优点,同时维持比传统的静态方法更高的可测试性和弹性. 说实话,这段话读起来真不像人类的语言,我准备来拆解一下. 首先, Facades 是一个类,是一个什么类呢?它是基础类的一个静态代理. 其