以Wordcount程序为例,简单描述MapReduce程序的编程模型。
MapReduce程序组成
MapReduce程序一般分成三个部分:
- 一个程序主引导部分;
- 一个Map程序部分;
- 一个Reduce部分。
主引导部分用来设置MapReduce(以下简称 “ MR” )程序的一些非业务逻辑属性,例如最终生成jar包时指定MR框架执行该程序的入口、MR程序的map和reduce部分是哪个(一个jar包中可能封装了多个程序)、map和reduce部分的输出参数类型、输入数据的来源(路径)和输出数据的路径等。
Map程序部分和Reduce程序部分就是业务逻辑部分,只要按照MR框架要求的格式编写。对于这两个部分,分别有map( )和reduce( )两个关键的函数,这两个函数要么是重写继承其父类Mapper和Reducer的成员函数,要么是实现其所实现接口内定义的具体函数。
map( )和reduce( )方法
Mapper类和Reducer类在继承时要分别提供4个参数,分别代表当前模块的输入、输出数据的类型。在实现map、reduce方法时会用到输入参数(key-value)的类型。而这里的类型并不是Java自身的数据类型,而是hadoop自己的一套数据标准。
在hadoop中,由于要考虑对象序列化,而Java自带的对象序列化机制添加了大量的冗余——少量数据为保证网络传输的正确,冗余的占比已经很多。对于hadoop中的海量数据而言,如果仍采用Java自带的序列化机制,添加的冗余数据的量更大,不利于集群之间频繁且大量的网络通信,因此hadoop有自己的一套序列化标准。
主引导方法
最后用一个job类指定本次作业的其余设置。job中指定:
1. getInstance()获得job实例。
2. 设置主类即main函数所在jar包的位置,找到这个主函数后,hadoop才能知道到相应的类的依赖关系(即要告诉classload我的jar包中每个程序的主类在哪,紧跟着才能根据主类找到相应的map和reduce类)。
3. 设置map和reduce方法所在的类。
4. 设置map和reduce方法的输出参数的类型。
5. 设置源数据的路径。
6. 设置输出数据的目录地址路径。
7. 设置job提交给集群,waitforcompletion(true),布尔值表示执行过程是否展示在屏幕上,还是在后台运行。
Wordcount程序示例
package mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Wordcount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
/*
*指定jar包中job的起始入口
*/
job.setJarByClass(Wordcount.class);
/*
* 设置map和reduce方法所在的类
*/
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
/*
* reduce 没有相应的setReduceOutputKey这样的方法。故应先用整体的对map和reduce设置相同的参数
* 再对map进行输出参数的特定指定
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
/*
* 输入输出路径设置,当然也可以从main函数参数输入
*/
FileInputFormat.setInputPaths(job, new Path("/wordcount_Test/input"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount_Test/output2"));
/*
* 设置是否在shell中显示执行过程
* 同时该方法会调用启动yarn资源管理框架的方法
*/
job.waitForCompletion(true);
}
}
____________________________________________________________________
package mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
/*
* 其中key是map程序所读文件的每读一行的文件偏移量,而非业务逻辑上的数据所需的对应key-value
* 而四个参数的后两个则描述处理后的标准key-value结果,经map程序处理后形成序列化的类型数据,输出到reduce模块。
* 程序执行时map方法根据数据的特点,可能执行多次
* 但无论执行多少次。都要等全部输入数据被map处理完,在整体发送给reduce模块
*/
protected void map(LongWritable value, Text key, Context context)
throws IOException, InterruptedException {
//得到文本中的一行
String string = value.toString();
//这里可以用Java自带的分割方法,推荐用Hadoop提供的工具方法
String[] words = StringUtils.split(string, ‘ ‘);
//遍历的方式提交,缓存起来
for(String word:words){
context.write(new Text(word), new LongWritable(1));
}
}
}
____________________________________________________________________
package mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
/*
* Reducer后面的四个泛型参数中的前两个必须要和Mapper后面最后两个泛型类型一致
*/
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
long count = 0;
for(LongWritable value:values){
count = count + value.get();
}
//reduce的结果仍输出给mapreduce框架
context.write(key, new LongWritable(count));
}
}
原文地址:https://www.cnblogs.com/fusiji/p/11409925.html