剖析Mapreduce作业运行机制:原理如下图:
原理图的解释的可以分为以下几个部分
1、客户端提交一个mapreduce的jar包给JobClient
2、JocClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)
3、JobClient将jar包写入到HDFS当中(path=hdfs上的地址(这个地址是有第二步的JobTracker返回的)+JobId)
将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门 为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个 作业启动多少个map任务等信息
4、开始提交任务(任务的描述信息:包括jobid,jar存放的位置,配置信息等等)
JobClient调用JobTracker的submitJob()方法提交任务
5、JobTracker进行初始化任务
JobTracker会把提交的作业放在一个内部队列中,交由作业调度器来进行调度,任务的初始化包括创建一个表示运行的作业的对象——封装任务和记录信息,以便跟踪 任务的状态和信息。
6、读取HDFS上要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个切片启动一个MapperTask任务
7、TaskTracker通过心跳机制领取任务(任务的描述信息)
map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的的TaskTracker上。同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。TaskTracker每隔一段时间会给 JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户
8、下载所需的jar,配置文件
9、TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
map函数端的执行过程:
a:每个输入分片会让一个map任务处理,默认情况下,以HDFS的一个快的大小为一个分片,map输出的结果暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort。spill。percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区的数据写入这个文件
b:在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据,这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分配到很少数据,其实分区就是对数据进行hash的过程,然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combiner操作,这样做的目的是让尽可能少的数据写入到磁盘。
c:当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并,合并的过程会不断的进行排序和Combiner操作,目的有两个:1、尽量减少每次写入磁盘的数据量,2、尽量减少下一次复制阶段网络传输的数据量,最后合并成一个已分区已排序的文件,为了减少网络传输的数据量,可以将数据进行雅俗,只要将mapred.compress.map.out设置为true就行了
d:将分区中的数据拷贝给相对应的reduce任务,有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。
reduce函数端执行过程
a:reduce会收到不同map任务传来的数据,并且每个map传来的数据都是有序的,如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.buffer.percent决定),则对数据合并后溢写到磁盘中。
b:伴随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间,其实不管在map端还是reduce端,MapReduce都是反复的执行排序,合并操作,
c:合并的过程中会产生许多的中间文件(写入磁盘了),但MaoReduce会让写入磁盘的数据尽可能的少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
10、将结果写入到HDFS当中