Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示

Input and Output types of a MapReduce job:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

这里将从源码分析 input <k1,v1>->map 的过程,

  • Mapper 基类定义

Mapper基类如下我们要重写其map函数,其输入为K1,V1, 输出K2,V2,

org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {  ....  protected void map(KEYIN key, VALUEIN value,                      Context context) throws IOException, InterruptedException {    context.write((KEYOUT) key, (VALUEOUT) value);  }  ....}
 

官网例子默认是以LocalJobRunner 运行的,也就是所有mapper,reduce,都在我们启动的client进程里面执行,所以比较容易使用eclipse设置断点调试跟踪

1.  从我们的Map代码设置断点

跟踪调用栈,我们可以看到我们的map是其父类org.apache.hadoop.mapreduce.Mapper.run(Context)调用的, 这个函数不停的从context中读取K,V数据传入到我们定义的map函数处理。

  public void run(Context context) throws IOException, InterruptedException {    setup(context);    try {      while (context.nextKeyValue()) {        map(context.getCurrentKey(), context.getCurrentValue(), context);      }    } finally {      cleanup(context);    }  }

2.  Mapper中contex的传入

在上一层调用org.apache.hadoop.mapred.MapTask.runNewMapper传入,代码如下,可以看到这个由 InputFormatClass, InputSplit 等信息构成

org.apache.hadoop.mapred.MapTask.runNewMapper(JobConf, TaskSplitIndex, TaskUmbilicalProtocol, TaskReporter){    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);    ...    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),        splitIndex.getStartOffset());    ...    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =      new NewTrackingRecordReader<INKEY,INVALUE>        (split, inputFormat, reporter, taskContext);    ....    try {      input.initialize(split, mapperContext);      mapper.run(mapperContext);      ...}

跟踪mapperContext是在runNewMapper 中构造,其类型如下

org.apache.hadoop.mapreduce.lib.map.WrappedMapper.Context<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

对于context.nextKeyValue()的调用关系如下

-->mapContext.nextKeyValue()

//org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue()

-->reader.nextKeyValue()

//org.apache.hadoop.mapred.MapTask.NewTrackingRecordReader.nextKeyValue()

-->real.nextKeyValue();

最后这里的real是由 NewTrackingRecordReader 构造时传入的inputFormat 和split 得到,

NewTrackingRecordReader(...)... {      ....      this.real = inputFormat.createRecordReader(split, taskContext);      ...    }

inputFormat: 由于我们没有设置inputformat class 的类型,taskContext.getInputFormatClass() 会默认返回TextInputFormat.

TextInputFromat的createRecordReader(...)会返回一个org.apache.hadoop.mapreduce.lib.input.LineRecordReader 对象, 这个类的nextKeyValue()从所指定的Split中逐行读取。

这里总结一下

Mapper 是由JobConf 设置的InputFormatClass 获取的RecordReader 在split上读取K,V数据,如下图所示

下面我们将分析split是如何获取到的

3. spilt 是函数org.apache.hadoop.mapred.MapTask.runNewMapper函数入参splitIndex的消息构造而来,其就是MapTask的成员变量splitMetaInfo

我们在MapTask上设置断点,MapTask由如下函数org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable.run()构造,并且传入info.getSplitIndex()作为入参

      public void run() {       ....          MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,            info.getSplitIndex(), 1);       ....}

4. info 是由MapTaskRunnable的成员变量,其由org.apache.hadoop.mapred.LocalJobRunner.Job.run() 中读取systemJobDir得到的。

org.apache.hadoop.mapred.LocalJobRunner.Job.getMapTaskRunnables(TaskSplitMetaInfo[], JobID, Map<TaskAttemptID, MapOutputFile>)

可以看出其MapTask的数目等同taskInfo的数目(也就是InputSpilt的数目)

 protected List<RunnableWithThrowable> getMapTaskRunnables(        TaskSplitMetaInfo [] taskInfo, JobID jobId,        Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {     ....      for (TaskSplitMetaInfo task : taskInfo) {        list.add(new MapTaskRunnable(task, numTasks++, jobId,            mapOutputFiles));      }     .....

}

可以找到其通过systemJobDir 中的内容获取的

在org.apache.hadoop.mapred.LocalJobRunner.Job.run()  中代码如下

public void run() {      ...        TaskSplitMetaInfo[] taskSplitMetaInfos =           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

        ...                List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(            taskSplitMetaInfos, jobId, mapOutputFiles);                      ....    }
 
 

到这里总结一下:

1. 每一个maptask 是一个org.apache.hadoop.mapred.MapTask对象(这里的例子由org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable封装以时间在本地运行), 其数目等同split的数目

2. split 在上面的例子中是由org.apache.hadoop.mapred.LocalJobRunner.Job.run() 通过读取目录systemJobDir 获取。

如下图所示

 
5. 下面分析systemJobDir的由来
systemJobDir是LocalJobRunner.Job的成员变量,其由构造函数传入,在构造函数上设置断点可以找到其由函数调用如下
org.apache.hadoop.mapreduce.Job.submit() //也就是我们WordCount main中创建的job
    -->org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job, Cluster)
          -->org.apache.hadoop.mapred.LocalJobRunner.submitJob(JobID, String, Credentials)
             -->new org.apache.hadoop.mapred.LocalJobRunner.Job.Job(LocalJobRunner, JobID, String)
能够找到org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job, Cluster)
创建submitJobDir并在其中写入了split的消息
 

JobStatus submitJobInternal(Job job, Cluster cluster)
  throws ClassNotFoundException, InterruptedException, IOException {

….
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    ….
    int maps = writeSplits(job, submitJobDir);
    …

status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
     …

}

 
在writeSplits 中调用了writeNewSplits, 并通过InputFormatClass.getSplits产生split信息, 
这里默认的TextInputFormat将分析job中配置的输入目录文件,依据文件大小和blocksize产生split.

private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

List<InputSplit> splits = input.getSplits(job);
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator());
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
      jobSubmitDir.getFileSystem(conf), array);
  return array.length;
}

到此我们回溯到了原始数据,也就是我们的FileInputFormat.addInputPath(job, input); 设置的input 路径。

总结如下

1. InputFormatClass 将输入划分为多个Split,

2. InpuFromatClass提供一个RecordReader用于从Split中读取(K,V)Record。

3. MapTask 的数量通Split的个数是相同的,这一点决定了map的并发性(例子中默认的LocalJobRunner使用了只有1个线程的线程池执行这些MapTask,所以不具有并发性)

时间: 2024-11-03 03:47:26

Hadoop源码分析之Map输入的相关文章

hadoop源码分析,map输出

Mapper  的输入官方文档如下 The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementi

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

Hadoop源码分析(2)——Configuration类

这篇文章主要介绍Hadoop的系统配置类Configuration. 接着上一篇文章介绍,上一篇文章中Hadoop Job的main方法为: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res); } 其中ToolRunner.run方法传入的第一个变量

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的. 通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);} 可以看到这个Job任务的MapR

细水长流Hadoop源码分析(3)RPC Server初始化构造

声明:个人原创,转载请注明出处.文中引用了一些网上或书里的资料,如有不妥之处请告之. 本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分.Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路. 目录 4 RPC服务器(org.apache.hadoop,ipc.Server) 4.1 服务器初始化 4 RPC服务器(org.apache.hadoop,ipc.Serve

[hadoop]Hadoop源码分析-Context

学编程第一个肯定是hello world,Hadoop也不例外,它的hello world就是Wordcount,单词统计例子 1 package org.apache.hadoop.examples; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.P

[hadoop]Hadoop源码分析-Text

Text是Hadoop中的一个Writable类,定义了Hadoop中的其中的数据类型以及操作. This class stores text using standard UTF8 encoding. It provides methods to serialize, deserialize, and compare texts at byte level. The type of length is integer and is serialized using zero-compresse

【转】【java源码分析】Map中的hash算法分析

全网把Map中的hash()分析的最透彻的文章,别无二家. 2018年05月09日 09:08:08 阅读数:957 你知道HashMap中hash方法的具体实现吗?你知道HashTable.ConcurrentHashMap中hash方法的实现以及原因吗?你知道为什么要这么实现吗?你知道为什么JDK 7和JDK 8中hash方法实现的不同以及区别吗?如果你不能很好的回答这些问题,那么你需要好好看看这篇文章.文中涉及到大量代码和计算机底层原理知识.绝对的干货满满.整个互联网,把hash()分析的

Hadoop源码分析下载、最新最全资料分享

apache_hadoop源码,下载: http://archive.apache.org/dist/ Hadoop 工具下载: http://hadoop.apache.org/ Hadoop大数据最新最全资料下载地址: http://download.csdn.net/album/detail/3047