MapReduce 是一个计算模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于Key/Value pair 的数据集合,输出中间的基于Key/Value pair的数据集合,然后再创建一个Reduce 函数用来合并所有的具有相同中间Key值的中间Value值。其最主要的两个部分就是Map过程和Reduce过程。
一. Map 处理过程
1. Mapper 类的处理原理
Mapper 类的最主要的功能就是将输入的Key/Value pair 通过自己Map函数的处理,得到符合要求的Kay/Value pair。在前面分析Hadoop作业的输入的时候,MapReduce框架会为作业分割后的每一个Split分片都分配一个Mapper处理函数。
Mapper 的输入是通过InputSplit 的函数createRecordReader 创建的一个LineRecordReader 行记录读取器,该读取器针对输入分片Split 进行读取,每次读取一行,得到以偏移量为Key,行的内容为Value的Key/value pair 传给Mapper。
Mapper的输出Key/Value pair 是按照Key来分组的,使得具有相同Key的Key/Value 键值对放在一起,然后将他们分发给相同的Reducer。用户可以通过指定特定的RawComparator实现类来控制分组过程的执行。此外,用户也可以通过指定Partitioner实现类来控制Mapper的输出被分发给哪个具体的Reducer进行处理。
由于一般情况下,Mapper和Reducer都会分布在不同的主机上面,Mapper和Reducer之间的K-V 对的传递是依据http协议 通过网络传递的。为了减少数据在网络上传输量,一般会在Mapper端使用combiner实现类对数据进行处理。使用Reducer在Mapper端对Mapper的输出结果进行本地的聚集处理,从而减少了发送给Reducer的数据量。
2. Mapper类的源码分析
Mapper 类主要有五个方法:
protected void setup(Context context)//默认是空方法,在Mapper任务开始的时候会被调用一次。
protected void cleanup(Context context)//默认是空方法,在Mapper任务结束的时候会被调用一次。
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}//map函数,一般我们自定义的Mapper只会覆盖map函数来完成对Key/Value的处理
public void run(Context context) throws IOException, InterruptedException {
setup(context);//初始化
try {
while (context.nextKeyValue()) {//每次读取一行,得到相应的K-V 对
map(context.getCurrentKey(), context.getCurrentValue(), context);//对该K-V对进行map处理
}
} finally {
cleanup(context);//结束处理
}
}//该方法会在Hadoop运行mapper任务的时候自动被调用,我们可以覆写该方法得到一些更高级的操作
public Context()//一个Context类,该类只是继承了MapContext类,没有任何新方法。
Hadoop还为我们提供了若干个针对不同情况的Maooer实现子类,这里就不一一介绍了。
二. Reducer 处理过程
1. Reducer 概述
Reducer 的主要作用就是将Mapper 的输出结果中具有相同key的键值对进行进一步的reduce(规约),产生数量更少的键值对。Reducer 的数量可以通过Job 的setNumReduceTask 方法进行设置。Reducer 主要包括以下三个阶段:
a. Shuffle 阶段:利用Http 协议将所有Mapper的输出中与该Reducer 相关的数据复制到Reducer 主机上。
b. Sort 阶段:将来自不同的Mapper 的具有相同Key 的输出Key/Value 键值对按照Key进行排序
c. Reduce 阶段:为已经分好组的Key/list(Value) 调用一次Reduce方法,进行规约。
2. Reducer 源码分析
Reducer 的源码和Mapper 的源码差不多,功能也差不多。
protected void setup(Context context)//默认是空方法,在Reducer任务开始的时候会被调用一次。
protected void cleanup(Context context)//默认是空方法,在Reducer任务结束的时候会被调用一次。
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}//map函数,一般我们自定义的Reducer只会覆盖reduce函数来完成对Key/Value的处理
public void run(Context context) throws IOException, InterruptedException {
setup(context);//初始化
try {
while (context.nextKey()) {//每次读取一行,得到相应的K值
reduce(context.getCurrentKey(), context.getCurrentValue(), context);//对该K-V对进行reduce处理
}
} finally {
cleanup(context);//结束处理
}
}//该方法会在Hadoop运行Reducer任务的时候自动被调用,我们可以覆写该方法得到一些更高级的操作
public Context()//一个Context类,该类只是继承了ReduceContext类,没有任何新方法。
三. Partitioner 分区处理过程
Partitioner 分区处理过程在Mapper之后,Reducer之前执行。它主要的功能就是把Mapper输出的中间结果按照key分发给不同的Redcuer 任务进行处理。要保证Hadoop的负载均衡,Partitioner需要满足两个条件:平均分布和高效。
Partitioner 类是一个抽象类,其只有一个抽象函数:
public abstract int getPartition(KEY key, VALUE value, int numPartitions);//根据给定的键值对和分区的总数量(一般为Reduce任务的数量),返回该键值对所对应的分区号。
HashPartitioner 是Partitioner 的默认实现类,该类会使用Hash函数来对Mapper的输出进行分区处理:
public abstract int getPartition(KEY key, VALUE value, int numPartitions) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTask;
}