hadoop核心逻辑shuffle代码分析-map端

首先要推荐一下:http://www.alidata.org/archives/1470

阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下。

不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本 shuffle的逻辑,整理成本文,为以后回顾所用。

首先用一张图展示下map的流程:

在上图中,我们假设此次mapreduce有多个mapper和2个reducer,p0 p1分别代表该数据应该分配到哪个reducer端。我将mapper的过程大致分为5个过程。

1.prepare Input。

Mapreduce程序都需要指定输入文件,输入的格式有很多种,最常见的是保存 在hdfs上的文本文件。在用户提交job到jobtrack(ResourceManager)前的job就会根据用户的输入文件计算出需要多少 mapper,多少reducer,mapper的输入InputSplit有多大,block块名称等。mapper在prepare input阶段只需要根据inputFormat类型创建对应的RecordReader打开对应的inputSplit分片即可。如果job配置了 combiner还需初始化combiner。代码见MapTask类run方法

2.mapper process

这里的mapper指用户使用或自己继承的mapper类,这也是所有初学mapreduce的同学首先看到的类。

[java] view plaincopy

  1. <span style="font-size:18px;">  /**
  2. * Called once for each key/value pair in the input split. Most applications
  3. * should override this, but the default is the identity function.
  4. */
  5. @SuppressWarnings("unchecked")
  6. protected void map(KEYIN key, VALUEIN value,
  7. Context context) throws IOException, InterruptedException {
  8. context.write((KEYOUT) key, (VALUEOUT) value);
  9. }
  10. </span>

可以看到mapper默认的map方法就是取出key,value并放到context对象中。context对象包装了一个内存中的buf,下面会介绍。

[java] view plaincopy

  1. <span style="font-size:18px;">public void run(Context context) throws IOException, InterruptedException {
  2. setup(context);
  3. while (context.nextKeyValue()) {
  4. map(context.getCurrentKey(), context.getCurrentValue(), context);
  5. }
  6. cleanup(context);
  7. }</span>

run方法就是mapper实际运行的过程:不停的从context的inputSplit对象中取出keyvalue对,通过map方法处理再保存到context包装的内存buf中。

3.buffer in memery
key value在写入context中后实际是写入MapOutputBuffer类中。在第一个阶段的初始化过程中,MapOutputBuffer类会根据配置文件初始化内存buffer,我们来看下都有哪些参数:

[java] view plaincopy

  1. <span style="font-size:18px;">partitions = job.getNumReduceTasks();
  2. rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
  3. //sanity checks
  4. final float spillper =
  5. job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
  6. final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
  7. indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
  8. INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
  9. if (spillper > (float)1.0 || spillper <= (float)0.0) {
  10. throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
  11. "\": " + spillper);
  12. }
  13. if ((sortmb & 0x7FF) != sortmb) {
  14. throw new IOException(
  15. "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
  16. }
  17. sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
  18. QuickSort.class, IndexedSorter.class), job);</span>

partition:mapper的数据需要分配到reduce端的个数,由用户的job指定,默认为1.

spillper:内存buf使用到此比例就会触发spill,将内存中的数据flush成一个文件。默认为0.8

sortmb:内存buf的大小,默认100MB

indexCacheMemoryLimit:内存index的大小。默认为1024*1024

sorter:对mapper输出的key的排序,默认是快排

内存buffer比较复杂,贴一张图介绍一下这块内存buf的结构:

当一对keyvalue写入时首先会从wrap
buf的右侧开始往左写,同时,会把一条keyvalue的meta信息(partition,keystart,valuestart)写入到最左边的
index区域。当wrap
buf大小达到spill的触发比例后会block写入,挖出一部分数据开始spill,直到spill完成后才能继续写,不过写入位置不会置零,而是类
似循环buf那样,在spill掉数据后可以重复利用内存中的buf区域。

这里单独讲一下partition:

[java] view plaincopy

  1. <span style="font-size:18px;">@Override
  2. public void write(K key, V value) throws IOException, InterruptedException {
  3. collector.collect(key, value,
  4. partitioner.getPartition(key, value, partitions));
  5. }</span>

在keyvalue对写入MapOutputBuffer时会调用
partitioner.getPartition方法计算partition即应该分配到哪个reducer,这里的partition只是在内存的
buf的index区写入一条记录而已,和下一个部分的partition不一样哦。看下默认的partitioner:HashPartition

[java] view plaincopy

  1. <span style="font-size:18px;">/** Use {@link Object#hashCode()} to partition. */
  2. public int getPartition(K key, V value,
  3. int numReduceTasks) {
  4. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  5. }</span>

HashPartition只是把key hash后按reduceTask的个数取模,因此一般来说,不同的key分配到哪个reducer是随即的!所以,reducer内的所有数据是有序的,但reducer之间的数据却是乱序的!要想数据整体排序,要不只设一个reducer,要不使用TotalOrderPartitioner!

4.Partition Sort Store

在第四步中,partition是和sort一起做的,负责Spill的线程在拿到一段内存buf后会调用QuickSort的sort方法进行内存中的快排。

[java] view plaincopy

  1. <span style="font-size:18px;">sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);</span>

排序的算法是先按keyvalue记录的partition排序后按key的compare方法:

[java] view plaincopy

  1. <span style="font-size:18px;">public int compare(final int mi, final int mj) {
  2. final int kvi = offsetFor(mi % maxRec);
  3. final int kvj = offsetFor(mj % maxRec);
  4. final int kvip = kvmeta.get(kvi + PARTITION);
  5. final int kvjp = kvmeta.get(kvj + PARTITION);
  6. // sort by partition
  7. if (kvip != kvjp) {
  8. return kvip - kvjp;
  9. }
  10. // sort by key
  11. return comparator.compare(kvbuffer,
  12. kvmeta.get(kvi + KEYSTART),
  13. kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
  14. kvbuffer,
  15. kvmeta.get(kvj + KEYSTART),
  16. kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
  17. }</span>

因此,mapper输出的keyvalue首先是按partition聚合。而我们如果指定key的compare方法会在这里生效并进行排序。最后,一次spill的输出文件类似下图。

在对内存中的buf排序后开始写文件。

[java] view plaincopy

  1. <span style="font-size:18px;">for (int i = 0; i < partitions; ++i) {
  2. IFile.Writer<K, V> writer = null;
  3. try {
  4. long segmentStart = out.getPos();
  5. writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
  6. spilledRecordsCounter);
  7. if (combinerRunner == null) {
  8. // spill directly
  9. DataInputBuffer key = new DataInputBuffer();
  10. while (spindex < mend &&
  11. kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
  12. final int kvoff = offsetFor(spindex % maxRec);
  13. key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
  14. (kvmeta.get(kvoff + VALSTART) -
  15. kvmeta.get(kvoff + KEYSTART)));
  16. getVBytesForOffset(kvoff, value);
  17. writer.append(key, value);
  18. ++spindex;
  19. }
  20. } else {
  21. int spstart = spindex;
  22. while (spindex < mend &&
  23. kvmeta.get(offsetFor(spindex % maxRec)
  24. + PARTITION) == i) {
  25. ++spindex;
  26. }
  27. // Note: we would like to avoid the combiner if we‘ve fewer
  28. // than some threshold of records for a partition
  29. if (spstart != spindex) {
  30. combineCollector.setWriter(writer);
  31. RawKeyValueIterator kvIter =
  32. new MRResultIterator(spstart, spindex);
  33. combinerRunner.combine(kvIter, combineCollector);
  34. }
  35. }</span>

如果job没有定义combiner则直接写文件,如果有combiner则在这里进行combine。
在生成spill文件后还会将此次spillRecord的记录写在一个index文件中。

[java] view plaincopy

  1. <span style="font-size:18px;">Path indexFilename =
  2. mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
  3. * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  4. spillRec.writeToFile(indexFilename, job);</span>

[java] view plaincopy

  1. <span style="font-size:18px;">rec.startOffset = segmentStart;
  2. rec.rawLength = writer.getRawLength();
  3. rec.partLength = writer.getCompressedLength();
  4. spillRec.putIndex(rec, i);</span>

5.merge

当mapper执行完毕后,就进入merge阶段。首先看下相关的配置参数:

[java] view plaincopy

  1. <span style="font-size:18px;">int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);</span>

mergeFactor:同时merge的文件数。

merge阶段的目的是将多个spill生成的中间文件合并为一个输出文件,这里
的合并不同于combiner,无论有没有配置combiner这里的merge都会执行。merge阶段的输出是一个数据文件
MapFinalOutputFile和一个index文件。看下相关代码:

[java] view plaincopy

  1. <span style="font-size:18px;">RawKeyValueIterator kvIter = Merger.merge(job, rfs,
  2. keyClass, valClass, codec,
  3. segmentList, mergeFactor,
  4. new Path(mapId.toString()),
  5. job.getOutputKeyComparator(), reporter, sortSegments,
  6. null, spilledRecordsCounter, sortPhase.phase());
  7. //write merged output to disk
  8. long segmentStart = finalOut.getPos();
  9. Writer<K, V> writer =
  10. new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
  11. spilledRecordsCounter);
  12. if (combinerRunner == null || numSpills < minSpillsForCombine) {
  13. Merger.writeFile(kvIter, writer, reporter, job);
  14. } else {
  15. combineCollector.setWriter(writer);
  16. combinerRunner.combine(kvIter, combineCollector);
  17. }</span>


下merge的算法。每个spill生成的文件中keyvalue都是有序的,但不同的文件却是乱序的,类似多个有序文件的多路归并算法。Merger分
别取出需要merge的spillfile的最小的keyvalue,放入一个内存堆中,每次从堆中取出一个最小的值,并把此值保存到merge的输出文
件中。这里和hbase中scan的算法非常相似,在分布式系统中多路归并排序真是当红小生啊!

这里merge时不同的partition的key是不会比较的,只有相同的partition的keyvalue才会进行排序和合并。最后的输出文件类似下图。

如果用户定义了combiner,在merge的过程中也会进行combine,
因为虽然第四步中combine过但那只是部分输入的combine,在merge时仍然需要combine。这里有人问了,既然这里有
combiner,为啥在spill输出时还要combine纳,我认为是因为每次combine都会大大减少输出文件的大小,spill时就
combine能减少一定的IO操作。

在merge完后会把不同partition的信息保存进一个index文件以便之后reducer来拉自己部分的数据。

[java] view plaincopy

  1. <span style="font-size:18px;">// record offsets
  2. rec.startOffset = segmentStart;
  3. rec.rawLength = writer.getRawLength();
  4. rec.partLength = writer.getCompressedLength();
  5. spillRec.putIndex(rec, parts);</span>

最后,我们再对mapper过程中的要点总结一下:

1.对map输出<key,value>的分区(partition)是在写入内存buf前就做好的了,方法是对key的hash。我们可以通过继承Partitioner类自己实现分区,将自己想要的数据分到同一个reducer中。

2.写入内存buf速度是非常快的,但spill过程会block写入。因此,对内存buf相关参数的调优是mapreduce调优的重点之一。

3.对数据的排序是基于MapOutKey排序的,因此,我们可以重载对应的方法实现customize的排序顺序

4.combine在spill和merge中都是进行。多次的combine会减少mapreduce中的IO操作,如果使用得当会很好的提高性能。但需要注意的是要深刻理解combine的意义,比如平均值就不适合用combine。

hadoop核心逻辑shuffle代码分析-map端

时间: 2024-07-29 17:51:16

hadoop核心逻辑shuffle代码分析-map端的相关文章

hadoop核心逻辑shuffle代码分析-map端 (转)

一直对书和各种介绍不太满意, 终于看到一篇比较好的了,迅速转载. 首先要推荐一下:http://www.alidata.org/archives/1470 阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下. 不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本 shu

Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载

原文地址:http://yanbohappy.sinaapp.com/?p=110 最新版本的Hadoop代码中已经默认了Protocol buffer(以下简称PB,http://code.google.com/p/protobuf/)作为RPC的默认实现,原来的WritableRpcEngine已经被淘汰了.来自cloudera的Aaron T. Myers在邮件中这样说的“since PB can provide support for evolving protocols in a co

Hadoop on Mac with IntelliJ IDEA - 10 陆喜恒. Hadoop实战(第2版)6.4.1(Shuffle和排序)Map端 内容整理

下午对着源码看陆喜恒. Hadoop实战(第2版)6.4.1  (Shuffle和排序)Map端,发现与Hadoop 1.2.1的源码有些出入.下面作个简单的记录,方便起见,引用自书本的语句都用斜体表示. 依书本,从MapTask.java开始.这个类有多个内部类: 从书的描述可知,collect()并不在MapTask类,而在MapOutputBuffer类,其函数功能是 1.定义输出内存缓冲区为环形结构2.定义输出内存缓冲区内容到磁盘的操作 在collect函数中将缓冲区的内容写出时会调用s

Hadoop2.4.1 MapReduce通过Map端shuffle(Combiner)完成数据去重

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWrita

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

hadoop map端join

map端的联结比reduce端的联结实现起来复杂,而且限制也多,一般我们将小表置于内存中, 对于大表的一个纪录我们在内存中查找即可. 改例子摘自hadoop基础教程, 我们实现sales和accounts的联结, 其中sales记录的顾客的销售信息,accounts纪录的是用户的账户信息,我们的目的是统计每个用户消费的次数和消费总额. 数据如下: sales.txt 002 12.29   2004-07-02 004 13.42   2005-12-20 003 499.99  2010-12

Hadoop 2.0 Yarn代码:心跳驱动服务分析

当RM(ResourcesManager)和NM(NodeManager)陆续将所有模块服务启动,最后启动是NodeStatusUpdater,NodeStatusUpdater将用Hadoop RPC远程调用ResourcesTrackerService中的函数,进行资源是初始化等操作,为将要运行的Job做好准备.以下主要分析在Job提交之前 RM与NM在心跳的驱动下操作. AD: hadoop-yarn-server-resourcemanager下的包 org.apache.hadoop.

微信公众号抢现金红包活动的核心代码分析

红包使用说明及规则,请仔细阅读 (1)必须是认证过的服务号,开通了微信支付功能:在商家后台充足够多的钱来发红包. (2)发送频率规则◆ 每分钟发送红包数量不得超过1800个:◆ 北京时间0:00-8:00不触发红包赠送:(如果以上规则不满足您的需求,请发邮件至[email protected]获取升级指引) (3)红包规则◆ 单个红包金额介于[1.00元,200.00元]之间:◆ 同一个红包只能发送给一个用户:(如果以上规则不满足您的需求,请发邮件至[email protected]获取升级指引