Mapreduce为了确保每个reducer的输入都按键排序。系统执行排序的过程-----将map的输出作为输入传给reducer 称为shuffle。学习shuffle是如何工作的有助于我们理解mapreduce工作机制。shuffle属于hadoop不断被优化和改进的代码库的一部分。从许多方面看,shuffle是mapreduce的“心脏”,是奇迹出现的地方。
下面这张图介绍了mapreduce里shuffle的工作原理:
从图可以看出shuffle发生在map端和reduce端之间,将map端的输出与reduce端的输入对应。
map 端
map函数开始产生输出时,并不是简单地将它输出到磁盘。这个过程更复杂,利用缓冲的方式写到内存,并出于效率的考虑进行预排序。shuffle原理图就看出来。
每个map任务都有一个环形内存缓冲区,用于存储任务的输出。默认情况是100MB,可以通过io.sort.mb属性调整。一旦缓冲内容达到阀值(io.sort.spill.percent,默认0.80,或者80%),一个后台线程开始把内容写到磁盘中。在写磁盘过程中,map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,map会阻塞直到写磁盘过程完成。在写磁盘之前,线程首先根据数据最终要传送到reducer把数据划分成相应的分区,在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。
reducer通过HTTP方式得到输出文件的分区。用于文件分区的工作线程的数量由任务的tracker.http.threads属性控制,此设置针对每个tasktracker,而不是针对每个map任务槽。默认值是40,在运行大型作业的大型集群上,此值可以根据需要调整。
reducer端
map端输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker需要为分区文件运行reduce任务。更进一步,reduce任务需要集群上若干个map任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。reduce任务有少量复制线程,所以能并行取得map输出。默认值是5个线程,可以通过设置mapred.reduce.parallel.copies属性改变。
在这个过程中我们由于要提到一个问题,reducer如何知道要从那个tasktracker取得map输出呢?
map任务成功完成之后,它们通知其父tasktracker状态已更新,然后tasktracker通知jobtracker。这些通知都是通过心跳机制传输的。因此,对于指定作业,jobtracker知道map输出和tasktracker之间的映射关系。reduce中的一个线程定期询问jobtracker以便获得map输出的位置,直到它获得所有输出位置。
由于reducer可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们。相反,tasktracker会等待,直到jobtracker告知它可以删除map输出,这是作业完成后执行的。
如果map输出相当小,则会被复制到reduce tasktracker的内存(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制),否则,map输出被复制到磁盘。一旦内存缓冲区达到阀值大小(由mapred.job.shuffle.merge.percent决定)或达到map输出阀值(mapred.inmem.merge.threshold控制),则合并后溢出写到磁盘中。
随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出都必须在内存中被解压缩。
复制完所有map输出被复制期间,reduce任务进入排序阶段(sort phase 更恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子是10 (10默认值设置,由io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。
在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将5个文件合并成一个已排序的文件作为最后一趟。最后的合并既可来自内存和磁盘片段。
在reduce阶段,对已排序输出中的每个键都要调用reduce函数。此阶段的输出直接写到输出文件系统中。