Hadoop中的Shuffle 与 Spark中的Shuffle得区别与联系

MapReduce过程、Spark和Hadoop以Shuffle为中心的对比分析

mapreduce与Spark的map-Shuffle-reduce过程

mapreduce过程解析(mapreduce采用的是sort-based shuffle)

将获取到的数据分片partition进行解析,获得k/v对,之后交由map()进行处理.

map函数处理完成之后,进入collect阶段,对处理后的k/v对进行收集,存储在内存的环形缓冲区中。

当环形缓冲区中的数据达到阀值之后(也可能一直没有达到阀值,也一样要将内存中的数据写入磁盘),将内存缓冲区中的数据通过SpillThread线程转移到磁盘上。需要注意的是,转移之前,首先利用快排对记录数据进行排序(原则是先按照分区编号,再按照key进行排序,注意,排序是在写入磁盘之前的)。之后按照partition编号,获取上述排序之后的数据并将其写入Spill.out文件中(一个Spill.out文件中可能会有多个分区的数据--因为一次map操作会有多次的spill的过程),需要注意的是,如果人为设置了combiner,在写入文件之前,需要对每个分区中的数据进行聚集操作。该文件同时又对应SpillRecord结构(Spill.out文件索引)。

map的最后一个阶段是merge:该过程会将每一个Spill.out文件合并成为一个大文件(该文件也有对应的索引文件),合并的过程很简单,就是将多个Spill.out文件的在同一个partition的数据进行合并。(第一次聚合)

shuffle阶段。首先要说明的是shuffle阶段有两种阀值设置。第一,获取来自map的结果数据的时候,根据数据大小(file.out的大小)自然划分到内存或者是磁盘(这种阀值的设置跟map阶段完全不同);第二,内存和磁盘能够保存的文件数目有阀值,超出阀值,会对文件进行merge操作,即小文件合并成为大文件。Shuffle过程:

1)获取完成的Map Task列表。

2)进行数据的远程拷贝(http get的方法),根据数据文件的大小自然划分到内存或者是磁盘。

3)当内存或者磁盘的文件较多时,进行文件合并。(第二次聚合)

reduce之前需要进行Sort操作,但是两个阶段是并行化的,Sort在内存或者磁盘中建立小顶堆,并保存了指向该小顶堆根节点的迭代器,同时Reduce Task通过迭代器将key相同的数据顺次讲给reduce()函数进行处理。

Spark Shuffle过程解析(采用hash-based shuffle)

RDD是Spark与Hadoop之间最明显的差别(数据结构),Spark中的RDD具有很多特性,在这里就不再赘述。

Spark与Hadoop之间的Shuffle过程大致类似,Spark的Shuffle的前后也各有一次聚合操作。但是也有很明显的差别:Hadoop的shuffle过程是明显的几个阶段:map(),spill,merge,shuffle,sort,reduce()等,是按照流程顺次执行的,属于push类型;但是,Spark不一样,因为Spark的Shuffle过程是算子驱动的,具有懒执行的特点,属于pull类型。

Spark与Hadoop的Shuffle之间第二个明显的差别是,Spark的Shuffle是hash-based类型的,而Hadoop的Shuffle是sort-based类型的。下面简介一下Spark的Shuffle:

1.正因为是算子驱动的,Spark的Shuffle主要是两个阶段:Shuffle Write和Shuffle Read。

2.ShuffleMapTask的整个的执行过程就是Shuffle Write阶段

3.Sprk的Shuffle过程刚开始的操作就是将map的结果文件中的数据记录送到对应的bucket里面(缓冲区),分到哪一个bucket根据key来决定(该过程是hash的过程,每一个bucket都对应最终的reducer,也就是说在hash-based下,数据会自动划分到对应reducer的bucket里面)。之后,每个bucket里面的数据会不断被写到本地磁盘上,形成一个ShuffleBlockFile,或者简称FileSegment。上述就是整个ShuffleMapTask过程。之后,reducer会去fetch属于自己的FileSegment,进入shuffle
read阶段。

4.需要注意的是reducer进行数据的fetch操作是等到所有的ShuffleMapTask执行完才开始进行的,因为所有的ShuffleMapTask可能不在同一个stage里面,而stage执行后提交是要在父stage执行提交之后才能进行的,所以fetch操作并不是FileSegment产生就执行的。

5.需要注意的是,刚fetch来的FileSegment存放在softBuffer缓冲区,Spark规定这个缓冲界限不能超过spark.reducer.maxMbInFlight,这里用softBuffer表示,默认大小48MB。

6.经过reduce处理后的数据放在内存+磁盘上(采用相关策略进行spill)。

7.fetch一旦开始,就会边fetch边处理(reduce)。MapReduce shuffle阶段就是边fetch边使用combine()进行处理,但是combine()处理的是部分数据。MapReduce不能做到边fetch边reduce处理,因为MapReduce为了让进入reduce()的records有序,必须等到全部数据都shuffle-sort后再开始reduce()。然而,Spark不要求shuffle后的数据全局有序,因此没必要等到全部数据shuffle完成后再处理。为了实现边shuffle边处理,而且流入的records是无序的可以用aggregate的数据结构,比如HashMap。

hash-based 和 sort-based的对比

hash-based故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中。这样带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件(Map*Reduce)。如果文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量众多,序列化,以及压缩等操作需要分配的临时内存空间也可能会迅速膨胀到无法接受的地步,对内存的使用和GC带来很大的压力,在Executor内存比较小的情况下尤为突出,例如Spark
on Yarn模式。但是这种方式也是有改善的方法的:

在一个core上连续执行的ShuffleMapTasks可以共用一个输出文件ShuffleFile。先执行完的ShuffleMapTask形成ShuffleBlock i,后执行的ShuffleMapTask可以将输出数据直接追加到ShuffleBlock i后面,形成ShuffleBlock i’,每个ShuffleBlock被称为FileSegment。下一个stage的reducer只需要fetch整个ShuffleFile就行了。这样的话,整个shuffle文件的数目就变为C*R了。

sort-based是Spark1.1版本之后实现的一个试验性(也就是一些功能和接口还在开发演变中)的ShuffleManager,它在写入分区数据的时候,首先会根据实际情况对数据采用不同的方式进行排序操作,底线是至少按照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都可以写入到同一个外部磁盘文件中去,用简单的Offset标志不同Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只需要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题。上述过程与mapreduce的过程类似。

两者的性能比较,取决于内存,排序,文件操作等因素的综合影响。

对于不需要进行排序的Shuffle操作来说,如repartition等,如果文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager需要额外的根据Partition进行排序,显然HashShuffleManager的效率会更高。

而对于本来就需要在Map端进行排序的Shuffle操作来说,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其它的步骤中仍然需要排序,而SortShuffleManager则可以将写数据和排序两个工作合并在一起执行,因此即使不考虑HashShuffleManager的内存使用问题,SortShuffleManager依旧可能更快。

时间: 2024-10-16 02:44:48

Hadoop中的Shuffle 与 Spark中的Shuffle得区别与联系的相关文章

Spark中的Shuffle机制

Spark中的shuffle是在干嘛? Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD.也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分区. 但这只是shuffle的过程,却不是shuffle的原因.为何需要shuffle呢? Shuffle和Stage 在分布式计算框架中,比如map-reduce,数据本地化是一个很重要的考虑,即计算需要被分发到数据所在的位置,从而减少数据的移动,提高运行效率. Map-Reduce的输入数

Spark中的Shuffle过程

一.Spark中的Shuffle过程 Shuffle分为两种:Shuffle write.Shuffle read Spark中Shuffle分为两种:HahShuffle.SortShuffle: 1.HashShuffle 磁盘小文件的个数为:M*R = 4*3 =12个 每一个buffer的大小为32k,由于产生的磁盘小文件过多,会产生一系列的问题 如:因为在写文件的时候会产生大量的写句柄,导致产生大量的临时对象,产生OM问题 在Reduce端读取小文件的时候,又会产生的大量的读句柄,浪费

Spark中的矩阵乘法分析

前言: 矩阵乘法在数据挖掘/机器学习中是常用的计算步骤,并且在大数据计算中,shuffle过程是不可避免的,矩阵乘法的不同计算方式shuffle的数据量都不相同.通过对矩阵乘法不同计算方式的深入学习,希望能够对大数据算法实现的shuffle过程优化有所启发.网上有很多分布式矩阵乘法相关的文章和论文,但是鲜有对Spark中分布式矩阵乘法的分析.本文针对Spark中分布式矩阵乘法的实现进行必要的说明讨论. 分布式矩阵乘法原理: 矩阵乘法计算可以分为内积法和外积法.根据实现颗粒度的不同,也可以分为普通

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR

全面解析Apache Spark中的决策树

Apache Spark中的决策树 决策树是在顺序决策问题进行分类,预测和促进决策的有效方法.决策树由两部分组成: 决策(Desion) 结果(Outcome) 决策树包含三种类型的节点: 根节点(Root node):包含所有数据的树的顶层节点. 分割节点(Splitting node):将数据分配给子组(subgroup)的节点. 终端节点(Terminal node):最终决定(即结果). 分割节点(Splitting node),仅就离散数学中的树的概念而言,就是指分支节点. 为了抵达终

Hadoop与 Spark中的Shuffle之区别与联系

Hadoop与 Spark中的Shuffle之区别与联系 2018年08月22日 20:24:46 小爷欣欣 阅读数:175 转自:http://mini.eastday.com/mobile/180114141035935.html mapreduce过程解析(mapreduce采用的是sort-based shuffle),将获取到的数据分片partition进行解析,获得k/v对,之后交由map()进行处理.map函数处理完成之后,进入collect阶段,对处理后的k/v对进行收集,存储在

第37课:Spark中Shuffle详解及作业

1.什么是Spark的Shuffle 图1 Spark有很多算子,比如:groupByKey.join等等都会产生shuffle. 产生shuffle的时候,首先会产生Stage划分. 上一个Stage会把 计算结果放在LocalSystemFile中,并汇报给Driver: 下一个Stage的运行由Driver触发,Executor向Driver请求,把上一个Stage的计算结果抓取过来. 2.Hadoop的Shuffle过程 图2 该图表达了Hadoop的map和reduce两个阶段,通过S

Hadoop学习笔记—10.Reduce阶段中的Shuffle过程

一.回顾Reduce阶段三大步凑 在第四篇博文<初识MapReduce>中,我们认识了MapReduce的八大步凑,其中在Reduce阶段总共三个步凑,如下图所示: 其中,Step2.1就是一个Shuffle操作,它针对多个map任务的输出按照不同的分区(Partition)通过网络复制到不同的reduce任务节点上,这个过程就称作为Shuffle. PS:Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程,这一段应该是Hadoop中最核心的部分,因为涉及到Had

spark中的广播变量broadcast

Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPartitions(iter => { broadcastValues.getValue.foreach(println) }) 在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进