三、MapReduce运行原理
1、Map过程简述:
1)读取数据文件内容,对每一行内容解析成<k1,v1>键值对,每个键值对调用一次map函数
2)编写映射函数处理逻辑,将输入的<k1,v1>转换成新的<k2,v2>
3)对输出的<k2,v2>按reducer个数和分区规则进行分区
4)不同的分区,按k2进行排序、分组,将相同的k2的value放到同一个集合中
5)(可选)将分组后的数据重新reduce归约
2、reduce处理过程:
1)对多个Map的输出,按不同分区通过网络将copy到不同的reduce节点
2)对多个map的输出进行排序,合并,编写reduce函数处理逻辑,将接收到的数据转化成<k3,v3>
3)将reduce节点输出的数据保存到HDFS上
说明:
1)Mapper Task 是逻辑切分。因为Maper记录的都是block的偏移量,是逻辑切分,但相对于内存中他确实是物理切分,因为每个Mapper都是记录的分片段之后的数据。
2)shuffle是物理切分。MapReduce的过程是俩过程需要用到Shuffle的,1个mapper的Shufflle,1个多个reduce的Shuffle,一般每个计算模型都要多次的reduce,所以要用到多次的Shuffle。.
MapReduce原理图
正常HDFS存储3份文件,Jar包默认写10份,NameNode通过心跳机制领取HDFS任务,运行完毕后JAR包会被删除。
Map端处理流程分析:
1) 每个输入分片会交给一个Map任务(是TaskTracker节点上运行的一个Java进程),默认情况下,系统会以HDFS的一个块大小作为一个分片(hadoop2默认128M,配置dfs.blocksize)。Map任务通过InputFormat将输入分片处理成可供Map处理的<k1,v1>键值对。
2) 通过自己的Map处理方法将<k1,v1>处理成<k2,v2>,输出结果会暂时放在一个环形内存缓冲(缓冲区默认大小100M,由mapreduce.task.io.sort.mb属性控制)中,当缓冲区快要溢出时(默认为缓冲区大小的80%,由mapreduce.map.sort.spill.percent属性控制),会在本地操作系统文件系统中创建一个溢出文件(由mapreduce.cluster.local.dir属性控制,默认${hadoop.tmp.dir}/mapred/local),保存缓冲区的数据。溢写默认控制为内存缓冲区的80%,是为了保证在溢写线程把缓冲区那80%的数据写到磁盘中的同时,Map任务还可以继续将结果输出到缓冲区剩余的20%内存中,从而提高任务执行效率。
3) 每次spill将内存数据溢写到磁盘时,线程会根据Reduce任务的数目以及一定的分区规则将数据进行分区,然后分区内再进行排序、分组,如果设置了Combiner,会执行规约操作。
4) 当map任务结束后,可能会存在多个溢写文件,这时候需要将他们合并,合并操作在每个分区内进行,先排序再分组,如果设置了Combiner并且spill文件大于mapreduce.map.combine.minspills值(默认值3)时,会触发Combine操作。每次分组会形成新的键值对<k2,{v2...}>。
5) 合并操作完成后,会形成map端的输出文件,等待reduce来拷贝。如果设置了压缩,则会将输出文件进行压缩,减少网络流量。是否进行压缩,mapreduce.output.fileoutputformat.compress,默认为false。设置压缩库,mapreduce.output.fileoutputformat.compress.codec,默认值org.apache.hadoop.io.compress.DefaultCodec。
Reduce端处理流程分析:
1) Reduce端会从AM那里获取已经执行完的map任务,然后以http的方法将map输出的对应数据拷贝至本地(拷贝最大线程数mapreduce.reduce.shuffle.parallelcopies,默认值5)。每次拷贝过来的数据都存于内存缓冲区中,当数据量大于缓冲区大小(由mapreduce.reduce.shuffle.input.buffer.percent控制,默认0.7)的一定比例(由mapreduce.reduce.shuffle.merge.percent控制,默认0.66)时,则将缓冲区的数据溢写到一个本地磁盘中。由于数据来自多个map的同一个分区,溢写时不需要再分区,但要进行排序和分组,如果设置了Combiner,还会执行Combine操作。溢写过程与map端溢写类似,输出写入可同时进行。
2) 当所有的map端输出该分区数据都已经拷贝完毕时,本地磁盘可能存在多个spill文件,需要将他们再次排序、分组合并,最后形成一个最终文件,作为Reduce任务的输入。此时标志Shuffle阶段结束,然后Reduce任务启动,将最终文件中的数据处理形成新的键值对<k3,v3>。
3) 将生成的数据<k3,v3>输出到HDFS文件中。
Map与Reduce执行过程图