Map Task内部实现分析

转自:http://blog.csdn.net/androidlushangderen/article/details/41142795

上篇我刚刚学习完,Spilt的过程,还算比较简单的了,接下来学习的就是Map操作的过程了,Map和Reduce一样,是整个MapReduce的重要内容,所以,这一篇,我会好好的讲讲里面的内部实现过程。首先要说,MapTask,分为4种,可能这一点上有人就可能知道了,分别是Job-setup Task,Job-cleanup Task,Task-cleanup和Map Task。前面3个都是辅助性质的任务,不是本文分析的重点,我讲的就是里面的最最重要的MapTask。

MapTask的整个过程分为5个阶段:

Read----->Map------>Collect------->Spill------>Combine

来张时序图,简单明了:

在后面的代码分析中,你会看到各自方法的调用过程。

在分析整个过程之前,得先了解里面的一些内部结构,MapTask类作为Map Task的一个载体,他的类关系如下:

我们调用的就是里面的run方法,开启map任务,相应的代码:

[java] view plaincopyprint?

  1. /**
  2. * mapTask主要执行流程
  3. */
  4. @Override
  5. public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  6. throws IOException, ClassNotFoundException, InterruptedException {
  7. this.umbilical = umbilical;
  8. // start thread that will handle communication with parent
  9. //发送task任务报告,与父进程做交流
  10. TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
  11. jvmContext);
  12. reporter.startCommunicationThread();
  13. //判断用的是新的MapReduceAPI还是旧的API
  14. boolean useNewApi = job.getUseNewMapper();
  15. initialize(job, getJobID(), reporter, useNewApi);
  16. // check if it is a cleanupJobTask
  17. //map任务有4种,Job-setup Task, Job-cleanup Task, Task-cleanup Task和MapTask
  18. if (jobCleanup) {
  19. //这里执行的是Job-cleanup Task
  20. runJobCleanupTask(umbilical, reporter);
  21. return;
  22. }
  23. if (jobSetup) {
  24. //这里执行的是Job-setup Task
  25. runJobSetupTask(umbilical, reporter);
  26. return;
  27. }
  28. if (taskCleanup) {
  29. //这里执行的是Task-cleanup Task
  30. runTaskCleanupTask(umbilical, reporter);
  31. return;
  32. }
  33. //如果前面3个任务都不是,执行的就是最主要的MapTask,根据新老API调用不同的方法
  34. if (useNewApi) {
  35. runNewMapper(job, splitMetaInfo, umbilical, reporter);
  36. } else {
  37. //我们关注一下老的方法实现splitMetaInfo为Spilt分片的信息,由于上步骤的InputFormat过程传入的
  38. runOldMapper(job, splitMetaInfo, umbilical, reporter);
  39. }
  40. done(umbilical, reporter);
  41. }

在这里我研究的都是旧的API所以往runOldMapper里面跳。在这里我要插入一句,后面的执行都会围绕着一个叫Mapper的东西,就是用户执行map函数的一个代理称呼一样,他可以完全自己重写map的背后的过程,也可以用系统自带的mapp流程。

系统已经给了MapRunner的具体实现:

[java] view plaincopyprint?

  1. public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  2. Reporter reporter)
  3. throws IOException {
  4. try {
  5. // allocate key & value instances that are re-used for all entries
  6. K1 key = input.createKey();
  7. V1 value = input.createValue();
  8. //从RecordReader中获取每个键值对,调用用户写的map函数
  9. while (input.next(key, value)) {
  10. // map pair to output
  11. //调用用户写的map函数
  12. mapper.map(key, value, output, reporter);
  13. if(incrProcCount) {
  14. reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
  15. SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
  16. }
  17. }
  18. } finally {
  19. //结束了关闭mapper
  20. mapper.close();
  21. }
  22. }

从这里我们可以看出Map的过程就是迭代式的重复的执行用户定义的Map函数操作。好了,有了这些前提,我们可以往里深入的学习了刚刚说到了runOldMapper方法,里面马上要进行的就是Map Task的第一个过程Read。

Read阶段的作业就是从RecordReader中读取出一个个key-value,准备给后面的map过程执行map函数操作。

[java] view plaincopyprint?

  1. //获取输入inputSplit信息
  2. InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
  3. splitIndex.getStartOffset());
  4. updateJobWithSplit(job, inputSplit);
  5. reporter.setInputSplit(inputSplit);
  6. //是否是跳过错误记录模式,获取RecordReader
  7. RecordReader<INKEY,INVALUE> in = isSkipping() ?
  8. new SkippingRecordReader<INKEY,INVALUE>(inputSplit, umbilical, reporter) :
  9. new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, reporter);

        后面的就是Map阶段,把值取出来之后,就要给Mapper去执行里面的run方法了,run方法里面会调用用户自己实现的map函数,之前也都是分析过了的。在用户编写的map的尾部,一般会调用collect.collect()方法,把处理后的key-value输出,这个时候,也就来到了collect阶段。

[java] view plaincopyprint?

  1. runner.run(in, new OldOutputCollector(collector, conf), reporter);

        之后进行的是Collect阶段主要的操作时什么呢,就是把一堆堆的key-value进行分区输出到环形缓冲区中,这是的数据仅仅放在内存中,还没有写到磁盘中。在collect这个过程中涉及的东西还比较多,看一下结构关系图;

里面有个partitioner的成员变量,专门用于获取key-value的的分区号,默认是通过key的哈希取模运算,得到分区号的,当然你可以自定义实现,如果不分区的话partition就是等于-1。

[java] view plaincopyprint?

  1. /**
  2. * Since the mapred and mapreduce Partitioners don‘t share a common interface
  3. * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
  4. * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
  5. * the configured partitioner should not be called. It‘s common for
  6. * partitioners to compute a result mod numReduces, which causes a div0 error
  7. */
  8. private static class OldOutputCollector<K,V> implements OutputCollector<K,V> {
  9. private final Partitioner<K,V> partitioner;
  10. private final MapOutputCollector<K,V> collector;
  11. private final int numPartitions;
  12. @SuppressWarnings("unchecked")
  13. OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
  14. numPartitions = conf.getNumReduceTasks();
  15. if (numPartitions > 0) {
  16. //如果分区数大于0,则反射获取系统配置方法,默认哈希去模,用户可以自己实现字节的分区方法
  17. //因为是RPC传来的,所以采用反射
  18. partitioner = (Partitioner<K,V>)
  19. ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
  20. } else {
  21. //如果分区数为0,说明不进行分区
  22. partitioner = new Partitioner<K,V>() {
  23. @Override
  24. public void configure(JobConf job) { }
  25. @Override
  26. public int getPartition(K key, V value, int numPartitions) {
  27. //分区号直接返回-1代表不分区处理
  28. return -1;
  29. }
  30. };
  31. }
  32. this.collector = collector;
  33. }
  34. .....


collect的代理调用实现方法如下,注意此时还不是真正调用:

[java] view plaincopyprint?

  1. .....
  2. @Override
  3. public void collect(K key, V value) throws IOException {
  4. try {
  5. //具体通过collect方法分区写入内存,调用partitioner.getPartition获取分区号
  6. //缓冲区为环形缓冲区
  7. collector.collect(key, value,
  8. partitioner.getPartition(key, value, numPartitions));
  9. } catch (InterruptedException ie) {
  10. Thread.currentThread().interrupt();
  11. throw new IOException("interrupt exception", ie);
  12. }
  13. }


这里的collector指的是上面代码中的MapOutputCollector对象,开放给用调用的是OldOutputCollector,但是我们看看代码:

[java] view plaincopyprint?

  1. interface MapOutputCollector<K, V> {
  2. public void collect(K key, V value, int partition
  3. ) throws IOException, InterruptedException;
  4. public void close() throws IOException, InterruptedException;
  5. public void flush() throws IOException, InterruptedException,
  6. ClassNotFoundException;
  7. }

他只是一个接口,真正的实现是谁呢?这个时候应该回头看一下代码:

[java] view plaincopyprint?

  1. private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  2. void runOldMapper(final JobConf job,
  3. final TaskSplitIndex splitIndex,
  4. final TaskUmbilicalProtocol umbilical,
  5. TaskReporter reporter
  6. ) throws IOException, InterruptedException,
  7. ClassNotFoundException {
  8. ...
  9. int numReduceTasks = conf.getNumReduceTasks();
  10. LOG.info("numReduceTasks: " + numReduceTasks);
  11. MapOutputCollector collector = null;
  12. if (numReduceTasks > 0) {
  13. //如果存在ReduceTask,则将数据存入MapOutputBuffer环形缓冲
  14. collector = new MapOutputBuffer(umbilical, job, reporter);
  15. } else {
  16. //如果没有ReduceTask任务的存在,直接写入把操作结果写入HDFS作为最终结果
  17. collector = new DirectMapOutputCollector(umbilical, job, reporter);
  18. }
  19. MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
  20. ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
  21. try {
  22. runner.run(in, new OldOutputCollector(collector, conf), reporter);
  23. .....


分为2种情况当有Reduce任务时,collector为MapOutputBuffer,没有Reduce任务时为DirectMapOutputCollector,从这里也能明白,作者考虑的很周全呢,没有Reduce直接写入HDFS,效率会高很多。也就是说,最终的collect方法就是MapOutputBuffer的方法了。

因为collect的操作时将数据存入环形缓冲区,这意味着,用户对数据的读写都是在同个缓冲区上的,所以为了避免出现脏数据的现象,一定会做额外处理,这里作者用了和BlockingQueue类似的操作,用一个ReetrantLocj,获取2个锁控制条件,一个为spillDone

,一个为spillReady,同个condition的await,signal方法实现丢缓冲区的读写控制。

[java] view plaincopyprint?

  1. .....
  2. private final ReentrantLock spillLock = new ReentrantLock();
  3. private final Condition spillDone = spillLock.newCondition();
  4. private final Condition spillReady = spillLock.newCondition();
  5. .....


然后看collect的方法:

[java] view plaincopyprint?

  1. public synchronized void collect(K key, V value, int partition
  2. ) throws IOException {
  3. .....
  4. try {
  5. // serialize key bytes into buffer
  6. int keystart = bufindex;
  7. keySerializer.serialize(key);
  8. if (bufindex < keystart) {
  9. // wrapped the key; reset required
  10. bb.reset();
  11. keystart = 0;
  12. }
  13. // serialize value bytes into buffer
  14. final int valstart = bufindex;
  15. valSerializer.serialize(value);
  16. int valend = bb.markRecord();
  17. if (partition < 0 || partition >= partitions) {
  18. throw new IOException("Illegal partition for " + key + " (" +
  19. partition + ")");
  20. }
  21. ....

至于环形缓冲区的结构,不是本文的重点,结构设计还是比较复杂的,大家可以自行学习。当环形缓冲区内的数据渐渐地被填满之后,会出现"溢写"操作,就是把缓冲中的数据写到磁盘DISK中,这个过程就是后面的Spill阶段了。

Spill的阶段会时不时的穿插在collect的执行过程中。

[java] view plaincopyprint?

  1. ...
  2. if (kvstart == kvend && kvsoftlimit) {
  3. LOG.info("Spilling map output: record full = " + kvsoftlimit);
  4. startSpill();
  5. }


如果开头kvstart的位置等kvend的位置,说明转了一圈有到头了,数据已经满了的状态,开始spill溢写操作。

[java] view plaincopyprint?

  1. private synchronized void startSpill() {
  2. LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
  3. "; bufvoid = " + bufvoid);
  4. LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
  5. "; length = " + kvoffsets.length);
  6. kvend = kvindex;
  7. bufend = bufmark;
  8. spillReady.signal();
  9. }


会触发condition的信号量操作:

[java] view plaincopyprint?

  1. private synchronized void startSpill() {
  2. LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
  3. "; bufvoid = " + bufvoid);
  4. LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
  5. "; length = " + kvoffsets.length);
  6. kvend = kvindex;
  7. bufend = bufmark;
  8. spillReady.signal();
  9. }


就会跑到了SpillThead这个地方执行sortAndSpill方法:

[java] view plaincopyprint?

  1. spillThreadRunning = true;
  2. try {
  3. while (true) {
  4. spillDone.signal();
  5. while (kvstart == kvend) {
  6. spillReady.await();
  7. }
  8. try {
  9. spillLock.unlock();
  10. //当缓冲区溢出时,写到磁盘中
  11. sortAndSpill();


sortAndSpill里面会对数据做写入文件操作写入之前还会有sort排序操作,数据多了还会进行一定的combine合并操作。

[java] view plaincopyprint?

  1. private void sortAndSpill() throws IOException, ClassNotFoundException,
  2. InterruptedException {
  3. ......
  4. try {
  5. // create spill file
  6. final SpillRecord spillRec = new SpillRecord(partitions);
  7. final Path filename =
  8. mapOutputFile.getSpillFileForWrite(numSpills, size);
  9. out = rfs.create(filename);
  10. final int endPosition = (kvend > kvstart)
  11. ? kvend
  12. : kvoffsets.length + kvend;
  13. //在写入操作前进行排序操作
  14. sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  15. int spindex = kvstart;
  16. IndexRecord rec = new IndexRecord();
  17. InMemValBytes value = new InMemValBytes();
  18. for (int i = 0; i < partitions; ++i) {
  19. IFile.Writer<K, V> writer = null;
  20. try {
  21. long segmentStart = out.getPos();
  22. writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
  23. spilledRecordsCounter);
  24. if (combinerRunner == null) {
  25. // spill directly
  26. DataInputBuffer key = new DataInputBuffer();
  27. while (spindex < endPosition &&
  28. kvindices[kvoffsets[spindex % kvoffsets.length]
  29. + PARTITION] == i) {
  30. final int kvoff = kvoffsets[spindex % kvoffsets.length];
  31. getVBytesForOffset(kvoff, value);
  32. key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
  33. (kvindices[kvoff + VALSTART] -
  34. kvindices[kvoff + KEYSTART]));
  35. //writer中写入键值对操作
  36. writer.append(key, value);
  37. ++spindex;
  38. }
  39. } else {
  40. int spstart = spindex;
  41. while (spindex < endPosition &&
  42. kvindices[kvoffsets[spindex % kvoffsets.length]
  43. + PARTITION] == i) {
  44. ++spindex;
  45. }
  46. // Note: we would like to avoid the combiner if we‘ve fewer
  47. // than some threshold of records for a partition
  48. //如果分区多的话,执行合并操作
  49. if (spstart != spindex) {
  50. combineCollector.setWriter(writer);
  51. RawKeyValueIterator kvIter =
  52. new MRResultIterator(spstart, spindex);
  53. //执行一次文件合并combine操作
  54. combinerRunner.combine(kvIter, combineCollector);
  55. }
  56. }
  57. ......
  58. //写入到文件中
  59. spillRec.writeToFile(indexFilename, job);
  60. } else {
  61. indexCacheList.add(spillRec);
  62. totalIndexCacheMemory +=
  63. spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  64. }
  65. LOG.info("Finished spill " + numSpills);
  66. ++numSpills;
  67. } finally {
  68. if (out != null) out.close();
  69. }
  70. }


       每次Spill的过程都会产生一堆堆的文件,在最后的时候就会来到了Combine阶段,也就是Map任务的最后一个阶段了,他的任务就是把所有上一阶段的任务产生的文件进行Merge操作,合并成一个文件,便于后面的Reduce的任务的读取,在代码的对应实现中是collect.flush()方法。

[java] view plaincopyprint?

  1. .....
  2. try {
  3. runner.run(in, new OldOutputCollector(collector, conf), reporter);
  4. //将collector中的数据刷新到内存中去
  5. collector.flush();
  6. } finally {
  7. //close
  8. in.close();                               // close input
  9. collector.close();
  10. }
  11. }


这里的collector的flush方法调用的就是MapOutputBuffer.flush方法,

[java] view plaincopyprint?

  1. public synchronized void flush() throws IOException, ClassNotFoundException,
  2. InterruptedException {
  3. ...
  4. // shut down spill thread and wait for it to exit. Since the preceding
  5. // ensures that it is finished with its work (and sortAndSpill did not
  6. // throw), we elect to use an interrupt instead of setting a flag.
  7. // Spilling simultaneously from this thread while the spill thread
  8. // finishes its work might be both a useful way to extend this and also
  9. // sufficient motivation for the latter approach.
  10. try {
  11. spillThread.interrupt();
  12. spillThread.join();
  13. } catch (InterruptedException e) {
  14. throw (IOException)new IOException("Spill failed"
  15. ).initCause(e);
  16. }
  17. // release sort buffer before the merge
  18. kvbuffer = null;
  19. //最后进行merge合并成一个文件
  20. mergeParts();
  21. Path outputPath = mapOutputFile.getOutputFile();
  22. fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
  23. }


至此,Map任务宣告结束了,整体流程还是真是有点九曲十八弯的感觉。分析这么一个比较庞杂的过程,我一直在想如何更好的表达出我的想法,欢迎MapReduce的学习者,提出意见,共同学习

时间: 2024-10-05 16:10:28

Map Task内部实现分析的相关文章

Task运行过程分析3——Map Task内部实现

Map Task内部实现 在Task运行过程分析2中提到,MapTask分为4种,分别是Job-setup Task.Job-cleanup Task.Task-cleanup Task和Map Task.其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录:而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务.本文

Task运行过程分析4——Map Task内部实现2

在Task运行过程分析3--MapTask内部实现中,我们分析了MapTask的Collect阶段,并且解读了环形缓冲区使得MapTask的Collect阶段和Spill阶段可并行执行...接下来分析Spill阶段和Combine阶段... Spill过程分析 Spill过程由SpillThread线程完成,SpillThread线程实际上是缓冲区kvbuffer的消费者 protected class SpillThread extends Thread { @Override public

${mapred.local.dir}选择策略--Map Task存放中间结果

上篇说了block在DataNode配置有多个${dfs.data.dir}时的存储策略,本文主要介绍TaskTracker在配置有多个${mapred.local.dir}时的选择策略. 1 mapred-site.xml 2 <property> 3 <name>mapred.local.dir</name> 4 <value>/mnt/localdir1/local,/mnt/localdir2/local,/mnt/localdir3/local&l

hadoop输入分片计算(Map Task个数的确定)

作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split.默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit).这里要注意,split只是逻辑上的概念,并不对文件做实际的切分.一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等.每个split

hadoop 分片与分块,map task和reduce task的理解

分块:Block HDFS存储系统中,引入了文件系统的分块概念(block),块是存储的最小单位,HDFS定义其大小为64MB.与单磁盘文件系统相似,存储在 HDFS上的文件均存储为多个块,不同的是,如果某文件大小没有到达64MB,该文件也不会占据整个块空间.在分布式的HDFS集群上,Hadoop系统保证一个块存储在一个datanode上. 把File划分成Block,这个是物理上真真实实的进行了划分,数据文件上传到HDFS里的时候,需要划分成一块一块,每块的大小由hadoop-default.

Map.Entry&lt;K,V&gt;分析

一.好处 你是否已经对每次从Map中取得关键字然后再取得相应的值感觉厌倦? 1 Set keys = map.keySet( ); 2 if(keys != null) { 3 Iterator iterator = keys.iterator( ); 4 while(iterator.hasNext( )) { 5 Object key = iterator.next( ); 6 Object value = map.get(key); 7 } 8 } 二.用法 使用Map.Entry类,你可

Hadoop-2.4.1学习之Map任务源码分析(下)

在Map任务源码分析(上)中,对MAP阶段的代码进行了学习,这篇文章文章将学习Map任务的SORT阶段.如果Reducer的数量不为0,则还需要进行SORT阶段,但从上面的学习中并未发现与MAP阶段执行完毕调用mapPhase.complete()类似的在SORT阶段执行完毕调用sortPhase.complete()的源码,那SORT阶段是在什么时候启动的?对于Map任务来说,有输入就有输出,输入由RecordReader负责,输出则由RecordWriter负责,当Reducer的数量不为0

MapReduce作业的map task和reduce task调度参数

MapReduce作业可以细分为map task和reduce task,而MRAppMaster又将map task和reduce task分为四种状态: 1.pending:刚启动但尚未向resourcemanager发送资源请求: 2.scheduled:已经向resourceManager发送资源请求,但尚未分配到资源: 3.assigned:已经分配到了资源且正在运行: 4.completed:已经运行完成. map task的生命周期为:scheduled -> assigned -

Spark技术内幕:Shuffle Map Task运算结果的处理

Shuffle Map Task运算结果的处理 这个结果的处理,分为两部分,一个是在Executor端是如何直接处理Task的结果的:还有就是Driver端,如果在接到Task运行结束的消息时,如何对Shuffle Write的结果进行处理,从而在调度下游的Task时,下游的Task可以得到其需要的数据. Executor端的处理 在解析BasicShuffle Writer时,我们知道ShuffleMap Task在Executor上运行时,最终会调用org.apache.spark.sche