hadoop源码分析,map输出

Mapper  的输入官方文档如下

The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

mapper的输出是已经排序并且针对每个reducer划分开的,那么hadoop代码是如何划分的,这里将跟从代码分析。

还是根据官方示例WordCount的示例

第一次分析为了简化map的输出复杂情况,

只分析一个文档,并且其中只有10个‘单词‘,分别为“J", .."c", "b",  "a" ( 这里10个字母最好是乱序的,后面会看到其排序),

注释掉设置combine class的代码。

1. 单步跟踪map中的context.write(生产kvbuffer 和kvmeta)

可以追踪到最终实际是由org.apache.hadoop.mapred.MapTask.MapOutputBuffer.collect(K, V, int)

这里因为我们的output 只有10个Record 且每个大小都比较小,所以跳过了spill了处理以及combine处理,主要代码如下,

public synchronized void collect(K key, V value, final int partition  ) throws IOException {

{

...

keySerializer.serialize(key);

...

valSerializer.serialize(value);

....        kvmeta.put(kvindex + PARTITION, partition);        kvmeta.put(kvindex + KEYSTART, keystart);

kvmeta.put(kvindex + VALSTART, valstart);

kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));     ...

}

这里实际是将(K,V) 序列化到了byte数组org.apache.hadoop.mapred.MapTask.MapOutputBuffer.kvbuffer 中,

并将(K,V)在内存中的位置信息 以及 其partition(相同partition的record由同一个reducer处理) 消息 存在 kvmeta 中.

到此map的输出都存在了内存中

2. 通过查找kvmeta的代码索引, 找到消费kvbuffer和kvmeta代码,生产spillRecv到indexCacheList

可以找到在 org.apache.hadoop.mapred.MapTask.MapOutputBuffer.sortAndSpill() 中找到有使用,设置断点,看到如下,

private void sortAndSpill() throws IOException, ClassNotFoundException,    InterruptedException {     ...

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);

     ...

     for (int i = 0; i < partitions; ++i) {

     ...

          if (combinerRunner == null) {

              // spill directly              DataInputBuffer key = new DataInputBuffer();

              while (spindex < mend &&

                  kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {

                ....

                 writer.append(key, value);

                ++spindex;

              }

           }      ...

          spillRec.putIndex(rec, i);

     }

     ...

indexCacheList.add(spillRec);

     ...}

这里有三个操作,

1. Sorter.sort :是以partition  和key  来排序的,目的是聚合相同partition的record, 并以key的顺序排列。

2. writer.append :  将序列化的record 写入输出流,这里写入到文件spill0.out

3. indexCacheList.add :  每个spillRec记录某个spill out文件中包含的partition信息。

3. 查找消费indexCacheList的代码,org.apache.hadoop.mapred.MapTask.MapOutputBuffer.mergeParts()

在此设置断点,可以看到这里我们只有一个spill文件,不需要merge,

这里只是唯一的spillRec 写入到到文件中, file.out.index

将spill0.out 重命名为file.out, 可以vim打开这个文件看到里面存在顺序号的字符。

    private void mergeParts() throws IOException, InterruptedException,     ClassNotFoundException {

...

sameVolRename(filename[0],

            mapOutputFile.getOutputFileForWriteInVolume(filename[0]));...

     indexCacheList.get(0).writeToFile(

            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);

...}

总结如下:

1. map的输出首先序列化到内存中kvbuffer,kvmeta

2. sortAndSpill 会将内存中的record写入到文件中

3. merge将spill出的文件merge问一个文件file.out,并将每个文件中partition的信息写入file.out.index

还没分析的情况:

map 输出大量数据,出现多个spill 文件的复杂情况的细节(1. 异步spill, 2. merge 多个文件)

时间: 2024-08-25 21:21:57

hadoop源码分析,map输出的相关文章

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的. 通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);} 可以看到这个Job任务的MapR

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

Hadoop源码分析(2)——Configuration类

这篇文章主要介绍Hadoop的系统配置类Configuration. 接着上一篇文章介绍,上一篇文章中Hadoop Job的main方法为: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res); } 其中ToolRunner.run方法传入的第一个变量

细水长流Hadoop源码分析(3)RPC Server初始化构造

声明:个人原创,转载请注明出处.文中引用了一些网上或书里的资料,如有不妥之处请告之. 本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分.Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路. 目录 4 RPC服务器(org.apache.hadoop,ipc.Server) 4.1 服务器初始化 4 RPC服务器(org.apache.hadoop,ipc.Serve

Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示 Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 这里将从源码分析 input <k1,v1>->map 的过程, Mapper 基

[hadoop]Hadoop源码分析-Context

学编程第一个肯定是hello world,Hadoop也不例外,它的hello world就是Wordcount,单词统计例子 1 package org.apache.hadoop.examples; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.P

hadoop源码分析(2):Map-Reduce的过程解析

一.客户端 Map-Reduce的过程首先是由客户端提交一个任务开始的. 提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的: public static RunningJob runJob(JobConf job) throws IOException { //首先生成一个JobClient对象 JobClient jc = new JobClient(job); …… //调用submitJob来提交一个任务 running = jc.submitJob(jo

[hadoop]Hadoop源码分析-Text

Text是Hadoop中的一个Writable类,定义了Hadoop中的其中的数据类型以及操作. This class stores text using standard UTF8 encoding. It provides methods to serialize, deserialize, and compare texts at byte level. The type of length is integer and is serialized using zero-compresse

Hadoop源码分析下载、最新最全资料分享

apache_hadoop源码,下载: http://archive.apache.org/dist/ Hadoop 工具下载: http://hadoop.apache.org/ Hadoop大数据最新最全资料下载地址: http://download.csdn.net/album/detail/3047