一、先理解MapReduce作业组成
一个完整的MapReduce作业称作job,它包括三部分:
-
- 输入数据
- MapReduce程序
- 配置信息
Hadoop工作时会将job分成若干个task:map任务和reduce任务
有两类节点控制作业执行的过程:JobTracker和TaskTracker
-
- JobTracker:记录作业整体进度,对TaskTracker进行调度
- TaskTracker:执行task任务并向JobTracker汇报
二、大块数据先流入map
Hadoop会将输入数据划分成等长的数据块,成为数据分片。Hadoop会为每个分片构建一个map任务。并行的处理分片时间肯定会少于处理整个大数据块的时间,但由于各个节点性能及作业运行情况的不同,每个分片的处理时间可能不一样,因此,把数据分片切分的更细可以得到更好的负载均衡。
但另一方面,分片太小的话,管理分片和构建map任务的时间将会增多。因此,需要在hadoop分片大小和处理分片时间之间做一个权衡。对大多数作业来说,一个分片大小为64MB比较合适,其实,Hadoop的默认块大小也是64MB。
我们上面看到了hadoop的数据块大小与最佳分片大小相同,这样的话,数据分片就不容易跨数据块存储,因此,一个map任务的输入分片便可以直接读取本地数据块,这就避免了再从其它节点读取分片数据,从而节省了网络开销。
map的任务输出是写入到本地磁盘而非HDFS的。那么为什么呢?因为map任务输出的是中间结果,一旦map任务完成即会被删除,如果把它存入HDFS中并实现备份容错,未免有点大题小做。如果一个map任务失败,hadoop会再另一个节点重启map一个map任务。
三、数据从map流入reduce
而reduce任务并不具备数据本地化优势——单个reduce任务的输入通常来自所有mapper输出。一般排序过的map输出需要通过网络传输发送到运行reduce任务的节点,并在reduce端进行合并。reduce的输出通常需要存储到HDFS中以实现可靠存储。每个reduce输出HDFS块第一个复本会存储在本地节点,而其它复本则存储到其它节点,因此reduce输出也需要占用网络带宽。
如下图:一个reduce任务的MapReduce任务数据流
reduce任务的数量并非由输入数据大小决定,而是特别指定。如有多个reduce任务,则每个map任务都会对其输出进行分区(partition),因为每个reduce任务会建一个分区。相同键的记录都会被partition到同一个分区中。具体的分区方式由分区函数来控制,一般通过hash函数来分区。
我们把map任务和reduce任务之间的数据流称为shuffle,因为每个reduce任务的输入都来自多个map任务,因此,这个阶段比较复杂,而shuffle过程中的参数调整对job运行的总时间是有非常大的影响的,一般MapReduce的调优主要就是调整shuffle阶段的参数。
如下图:多个reduce任务的数据流
四、如何减少从map到reduce的数据量
集群上的可用带宽限制了MapReduce的作业数量,因为map的中间结果传递给reduce是要经过网络传输的,因此最重要的一点就是尽量减少map和reduce任务间传输的数据量。不过,Hadoop允许用户针对map任务的输出指定一个合并函数(combiner),用合并函数的输出作为reduce函数的输入,但需要注意,合并函数的运用不应该改变reduce函数的计算结果。
例如有两个map的输出分别是map1={0,20,10};map2={15,25},求最大值,我们可以对先每个map的数据的数据进行合并,合并完成之后再传输给reducer:
map1={0,20,10}->combiner->{20};
map2={15,25}->combiner->{25};
reducer->{25}
即 max(0,20,10,15,25)=max(max(0,20,10),max(15,25))=25
如下图:将combiner后的输出作为reducer的输入
但需要特别注意的是,并不是任何场景都是可以用combiner的,比如把上面的例子改成求平均值:
-
- combiner后的reducer的结果: avg(avg(0,20,10),avg(15,25))=avg(10,20)=15;
- 没有进行combiner的reducer结果: avg(0,20,10,15,25)=14;