我们主要来学习MapTask的内部实现。
整体执行流程
如上图示,MapTask的整个处理流程分五个阶段:
●read阶段:通过RecordReader从InputSplit分片中将数据解析成一个个key/value。
●map阶段:将由RecordReader解析出的key/value交给map()方法处理,并生成一个个新的key/value。
●collect阶段:将map()中新生成key/value由OutpCollector.collect()写入内存中的环形数据缓冲区。
●spill阶段:当环形缓冲区达到一定阀值后,会将数据写到本地磁盘上,生成一个spill文件。在写文件之前,会先将数据进行一次本地排序,必要的时候(按配置要求)还会对数据进行压缩。
●combine阶段:当所有数据处理完后,将所有的临时的spill文件进行一次合并,最终之生成一个数据文件。
接下来我们会对该流程中最重要的collect、spill和combine三个阶段进行更深入的学习。
Collect过程
前阶段的map中新生成key/value对后,会调用OutpCollector.collect(key,value),在该方法内部,先调用Partitioner.getPartition()获取该记录的分区号,然后将<key,value,partition>传给MapOutputBuffer.collect()作进一步的处理。
MapOutputBuffer内部使用了一个内部的环形的缓冲区来暂时保存用户的输出数据,当缓冲区使用率达到一定阀值后,由SpillThread线程将缓冲区中的数据spill到本地磁盘上,当所有的数据处理完毕后,对所有的文件进行合并,最终只生成一个文件。该数据缓冲区直接用想到MapTask的写效率。
环形缓冲区使得collect阶段和spill阶段可以并行处理。
MapOutputBuffer内部采用了两级索引结构,涉及三个环形的内存缓冲区,分别是kvoffsets、kvindices和kvbuffer,这个环形缓冲区的大小可以通过io.sot.mb来设置,默认大小是100MB,图示如下:
kvoffsets即偏移量索引数组,用于保存key/value在kvindices中的偏移量。一个key/value对在kvoffsets数组中占一个int的大小,而在kvindices数组中站3个int的大小(如上图示,包括分区号partition,key的起始位置和value的起始位置)。
当kvoffsets的使用率超过io.sort.spill.percent(默认为80%)后,便会触发SpillTread线程将数据spill到磁盘上。
kvindices即文职索引数组,用于保存实际的key/value在数据缓冲区kvbuffer中的起始位置。
kvbuffer即数据局缓冲区,用于实际保存key/value,默认情况下可使用io.sort.mb的95%,当该缓冲区使用率使用率超过io.sort.spill.percent后,便会触发SpillTread线程将数据spill到磁盘上。
Spill过程
在collect阶段的执行过程中,当内存中的环形数据缓冲区中的数据达到一定发之后,便会触发一次Spill操作,将部分数据spill到本地磁盘上。SpillThread线程实际上是kvbuffer缓冲区的消费者,主要代码如下:
Java代码
- spillLock.lock();
- while(true){
- spillDone.sinnal();
- while(kvstart == kvend){
- spillReady.await();
- }
- spillDone.unlock();
- //排序并将缓冲区kvbuffer中的数据spill到本地磁盘上
- sortAndSpill();
- spillLock.lock;
- //重置各个指针,为下一下spill做准备
- if(bufend < bufindex && bufindex < bufstart){
- bufvoid = kvbuffer.length;
- }
- vstart = vend;
- bufstart = bufend;
- }
- spillLock.unlock();
sortAndSpill()方法中的内部流程是这样的:
第一步,使用用快速排序算法对kvbuffer[bufstart,bufend)中的数据排序,先对partition分区号排序,然后再按照key排序,经过这两轮排序后,数据就会以分区为单位聚集在一起,且同一分区内的数据按key有序;
第二步,按分区大小由小到大依次将每个分区中的数据写入任务的工作目录下的临时文件中,如果用户设置了Combiner,则写入文件之前,会对每个分区中的数据做一次聚集操作,比如<key1,val1>和<key1,val2>合并成<key1,<val1,val2>>;
第三步,将分区数据的元信息写到内存索引数据结构SpillRecord中。分区的元数据信息包括临时文件中的偏移量、压缩前数据的大小和压缩后数据的大小。
Combine过程
当任务的所有数据都处理完后,MapTask会将该任务所有的临时文件年合并成一个大文件,同时生成相应的索引文件。在合并过程中,是以分区文单位进行合并的。
让每个Task最终生成一个文件,可以避免同时打开大量文件和对小文件产生随机读带来的开销。