1、什么是Spark的Shuffle
图1
Spark有很多算子,比如:groupByKey、join等等都会产生shuffle。
产生shuffle的时候,首先会产生Stage划分。
上一个Stage会把
计算结果放在LocalSystemFile中,并汇报给Driver;
下一个Stage的运行由Driver触发,Executor向Driver请求,把上一个Stage的计算结果抓取过来。
2、Hadoop的Shuffle过程
图2
该图表达了Hadoop的map和reduce两个阶段,通过Shuffle怎样把map task的输出结果有效地传送到reduce端,描述着数据从map task输出到reduce task输入的这段过程。
map的计算为reduce产生不同的文件,在Hadoop集群环境中,大部分map task与reduce task的执行是在不同的节点上,reduce执行时需要跨节点去拉取其它节点上的map task结果,那么对集群内部的网络资源消耗会很严重。我们希望最大化地减少不必要的消耗, 于是对Shuffle过程的期望有:
- 完整地从map task端拉取数据到reduce 端。
- 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
- 减少磁盘IO对task执行的影响。
可优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。
map端的Shuffle细节:
整个map流程,简单些可以这样说:
1)input, 根据split输入数据,运行map任务;
2)patition, 每个map task都有一个内存缓冲区,存储着map的输出结果;
3)spill, 当缓冲区快满的时候需要将缓冲区的数据以临时文件的方式存放到磁盘;
4)merge, 当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
下面对map流程的细节进行说明:
1)输入数据:在Map Reduce中,map task只读取split,Split与block的对应关系可能是多对一,默认是一对一;
2)mapper运行后,通过Partitioner接口,根据key或value及reduce的数量来决定当前map的输出数据最终应该交由哪个reduce task处理。然后将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组;
3)内存缓冲区有大小限制,默认是100MB。需要在一定条件下将缓冲区中的数据临时写入磁盘,从内存往磁盘写数据的过程被称为Spill(溢写);
splill是由单独线程来完成,不影响往缓冲区写map结果的线程,splill的过程会涉及到Sort和Combiner,当splill线程启动后,需要对锁定内存块空间内的key做排序,是对序列化的字节做排序。 如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录,非正式地合并数据叫做combine了, Combiner会优化MapReduce的中间结果。
4)每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果很大,就会有多个溢写文件存在。当map task完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。
Merge是怎样的?比如WordCount示例中,某个单词“aaa”从某个map task读取过来时值是5,从另外一个map task 读取时值是8,因为它们有相同的key,所以就是像这样:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。
因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。
至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果获知TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。
下面讲解reduce 端的Shuffle细节:
reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。
1) Copy过程,简单地拉取数据。
Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
2)Merge阶段。
这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达阈值,就启动内存到磁盘的merge。
与map 端类似,这也是溢写的过程,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。
3)Reducer的输入文件。
不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,hadoop是把这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。
3、Hadoop的MapReduce Shuffle数据流动过程
图3
这张图非常有意思,形象地描述了整个数据流动的过程。
图上map阶段,有4个map;Reduce端,有3个reduce。
4个map 也就是4个JVM,每个JVM处理一个数据分片(split1~split4),每个map产生一个map输出文件,但是每个map都为后面的reduce产生了3部分数据(分别用红1、绿2、蓝3标识),也就是说每个输出的map文件都包含了3部分数据。正如前面第二节所述:
mapper运行后,通过Partitioner接口,根据key或value及reduce的数量来决定当前map的输出数据最终应该交由哪个reduce task处理
Reduce端一共有3个reduce,去前面的4个map的输出结果中抓取属于自己的数据。
在构建算法时,Shuffle是最重要的思考点。
4、Spark Shuffle
图4
该图描述了最简单的Spark 0.X版本的Spark Shuffle过程。
与Hadoop Map Reduce的区别在于输出文件个数的变化。
每个ShuffleMapTask产生与Ruducer个数相同的Shuffle blockFile文件,图中有3个reducer,那么每个ShuffleMapTask就产生3个Shuffle blockFile文件,4个ShuffleMapTask,那么一共产生12个Shuffle blockFile文件。
在内存中每个Shuffle blockFile文件都会存在一个句柄从而消耗一定内存,又因为物理内存的限制,就不能有很多并发,这样就限制了Spark集群的规模。
该图描绘的只是Spark 0.X版本而已,让人误以为Spark不支持大规模的集群计算,当时这只是Hash Based Shuffle。Spark后来做了改进,引入了Sort Based Shuffle之后,就再也没有人说Spark只支持小规模的集群运算了。
4.1 Hash based shuffle
Hash based shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M*R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。
Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销,于是Spark的Hash based shuffle设计的目标之一就是避免不需要的排序,
但是它在处理超大规模数据集的时候,产生了大量的磁盘IO和内存的消耗,很影响性能。
Hash based shuffle不断优化,Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。
4.2 Sort based shuffle
为了解决hash based shuffle性能差的问题,Spark 1.1 引入了Sort based shuffle,完全借鉴mapreduce实现,每个map产生一个文件,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;
相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。
避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。
而减少文件的数量可以避免同时写多个文件对系统带来的压力。
Sort based shuffle在速度和内存使用方面优于Hash based shuffle。
以上逻辑可以使用下图来描述: