组成部分
Shuffle阶段分为两部分:Map端和Reduce端。
Sort阶段就是对Map端输出的key进行排序。
第一部分:Map端Shuffle
对于输入文件,会进行分片,对于一个split,有一个map任务进行处理,每个Map在内存中都有一个缓存区,map的输出结果会先放到这个缓冲区中,在缓冲区中,会进行预排序(即sort和comibner),以提高效率。
缓冲区默认大小是100MB(可以通过io.sort.mb属性更改大小),当缓冲区中的数据达到特定的阈值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent默认是0.80)时,系统会启动一个后台线程把缓冲区的内容spill(溢写)到磁盘。溢出到磁盘的一个临时文件中,即80%的内容成为一个临时文件。当这80%的内容溢出时,map会继续向剩余的20%缓冲区中输出。
spill线程在把缓冲区中的数据写到磁盘前,会进行一个二次快速排序,首先根据数据所属的Partition排序,然后每个Partition中再按Key排序。输出包括一个索引文件和数据文件。如果设定了Combiner,将在排序输出的基础上进行。
Comibner就是一个Mini Reducer,在执行Map任务的节点本身运行,对Map的输出做一次简单Reduce,使得Map‘de输出更紧凑,更少的数据会被写入磁盘和传送到Reduce端。
一个Map任务会产生多个spill文件,在Map任务完成前,所有的spill文件将会归并排序为一个索引文件和数据文件。当spill文件归并完成后,Map将删除所有的临时文件,并告知TaskTracker任务已完成。
对写入到磁盘的数据可以选择采取压缩的方式,如果需要压缩,则需要设置mapred.compress.map.output为true。
还有一个Partition的概念,一个临时文件是进行了分区的,并且分区的数量由reduce的数量决定,不同的分区传给不同的reduce。
第二部分:Reduce端Shuffle
Reduce端通过HTTP获取Map端的数据,只要有一个map任务完成,Reduce任务就开始复制它的输出,这称为copy阶段。
JobTracker知道Map输出与TaskTracker的映射关系,Reduce端有一个线程间歇地向JobTracker询问Map输出的地址,直到把所有的数据都获取到。
如果map输出比较小,他们被复制到Reduce的内存中,如果缓冲区空间不足,会被复制到磁盘上。复制的数据放在磁盘上,后台线程会进行归并为更大的排序文件,对于压缩文件,系统会自动解压到内存方便归并。
当所有的Map输出被复制后,Reduce任务进入排序阶段(确切的说是归并阶段),这个过程会重复多次。Merge有三种形式:内存到内存,内存到磁盘,磁盘到磁盘。
内存到内存默认不启用;内存到磁盘的方式也会产生溢写,如果设置了Combiner,此时也会启用,在磁盘上生成多个溢写文件;磁盘到磁盘会生成一个最终的文件作为Reduce的输入。