对于MapReduce的输入输出Hadoop的官网如下所示
Input and Output types of a MapReduce job:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
这里将从源码分析 input <k1,v1>->map 的过程,
- Mapper 基类定义
Mapper基类如下我们要重写其map函数,其输入为K1,V1, 输出K2,V2,
org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { .... protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } ....}
- 分析例子WordCount
官网例子默认是以LocalJobRunner 运行的,也就是所有mapper,reduce,都在我们启动的client进程里面执行,所以比较容易使用eclipse设置断点调试跟踪
1. 从我们的Map代码设置断点
跟踪调用栈,我们可以看到我们的map是其父类org.apache.hadoop.mapreduce.Mapper.run(Context)调用的, 这个函数不停的从context中读取K,V数据传入到我们定义的map函数处理。
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
2. Mapper中contex的传入
在上一层调用org.apache.hadoop.mapred.MapTask.runNewMapper传入,代码如下,可以看到这个由 InputFormatClass, InputSplit 等信息构成
org.apache.hadoop.mapred.MapTask.runNewMapper(JobConf, TaskSplitIndex, TaskUmbilicalProtocol, TaskReporter){ org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); ... split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); ... org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); .... try { input.initialize(split, mapperContext); mapper.run(mapperContext); ...}
跟踪mapperContext是在runNewMapper 中构造,其类型如下
org.apache.hadoop.mapreduce.lib.map.WrappedMapper.Context<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
对于context.nextKeyValue()的调用关系如下
-->mapContext.nextKeyValue()
//org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue()
-->reader.nextKeyValue()
//org.apache.hadoop.mapred.MapTask.NewTrackingRecordReader.nextKeyValue()
-->real.nextKeyValue();
最后这里的real是由 NewTrackingRecordReader 构造时传入的inputFormat 和split 得到,
NewTrackingRecordReader(...)... { .... this.real = inputFormat.createRecordReader(split, taskContext); ... }
inputFormat: 由于我们没有设置inputformat class 的类型,taskContext.getInputFormatClass() 会默认返回TextInputFormat.
TextInputFromat的createRecordReader(...)会返回一个org.apache.hadoop.mapreduce.lib.input.LineRecordReader 对象, 这个类的nextKeyValue()从所指定的Split中逐行读取。
这里总结一下
Mapper 是由JobConf 设置的InputFormatClass 获取的RecordReader 在split上读取K,V数据,如下图所示
下面我们将分析split是如何获取到的
3. spilt 是函数org.apache.hadoop.mapred.MapTask.runNewMapper函数入参splitIndex的消息构造而来,其就是MapTask的成员变量splitMetaInfo
我们在MapTask上设置断点,MapTask由如下函数org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable.run()构造,并且传入info.getSplitIndex()作为入参
public void run() { .... MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); ....}
4. info 是由MapTaskRunnable的成员变量,其由org.apache.hadoop.mapred.LocalJobRunner.Job.run() 中读取systemJobDir得到的。
org.apache.hadoop.mapred.LocalJobRunner.Job.getMapTaskRunnables(TaskSplitMetaInfo[], JobID, Map<TaskAttemptID, MapOutputFile>)
可以看出其MapTask的数目等同taskInfo的数目(也就是InputSpilt的数目)
protected List<RunnableWithThrowable> getMapTaskRunnables( TaskSplitMetaInfo [] taskInfo, JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) { .... for (TaskSplitMetaInfo task : taskInfo) { list.add(new MapTaskRunnable(task, numTasks++, jobId, mapOutputFiles)); } .....
}
可以找到其通过systemJobDir 中的内容获取的
在org.apache.hadoop.mapred.LocalJobRunner.Job.run() 中代码如下
public void run() { ... TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); ... List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles); .... }
到这里总结一下:
1. 每一个maptask 是一个org.apache.hadoop.mapred.MapTask对象(这里的例子由org.apache.hadoop.mapred.LocalJobRunner.Job.MapTaskRunnable封装以时间在本地运行), 其数目等同split的数目
2. split 在上面的例子中是由org.apache.hadoop.mapred.LocalJobRunner.Job.run() 通过读取目录systemJobDir 获取。
如下图所示
5. 下面分析systemJobDir的由来
systemJobDir是LocalJobRunner.Job的成员变量,其由构造函数传入,在构造函数上设置断点可以找到其由函数调用如下
org.apache.hadoop.mapreduce.Job.submit() //也就是我们WordCount main中创建的job
-->org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job, Cluster)
-->org.apache.hadoop.mapred.LocalJobRunner.submitJob(JobID, String, Credentials)
-->new org.apache.hadoop.mapred.LocalJobRunner.Job.Job(LocalJobRunner, JobID, String)
能够找到org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job, Cluster)
创建submitJobDir并在其中写入了split的消息
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
….
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
….
int maps = writeSplits(job, submitJobDir);
…
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
…
}
在writeSplits 中调用了writeNewSplits, 并通过InputFormatClass.getSplits产生split信息,
这里默认的TextInputFormat将分析job中配置的输入目录文件,依据文件大小和blocksize产生split.
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
到此我们回溯到了原始数据,也就是我们的FileInputFormat.addInputPath(job, input); 设置的input 路径。
总结如下
1. InputFormatClass 将输入划分为多个Split,
2. InpuFromatClass提供一个RecordReader用于从Split中读取(K,V)Record。
3. MapTask 的数量通Split的个数是相同的,这一点决定了map的并发性(例子中默认的LocalJobRunner使用了只有1个线程的线程池执行这些MapTask,所以不具有并发性)