shuffle机制
1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
2:写磁盘前,要partition,sort。如果有combiner,combine排序后数据。
3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
4:Reducer通过Http方式得到输出文件的分区。
5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
6:排序阶段合并map输出。然后走Reduce阶段。
TextInputFormat分片和读取分片数据
InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:
(1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split;
(2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用。
InputFormat有两个比较重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。这两个方法分别对应上面的两个功能。
InputSplit分片信息有两个特点:(1)是逻辑分片,只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,只记录一些元信息,比如起始位置、长度以及所在的节点列表等;(2)必须可序列化,分片信息要上传到HDFS文件,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。
RecordReader对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value,会作为MapTask的map方法的输入。