1.新建一个WCMapper继承Mapper
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//接收数据V1
String line = value.toString();
//切分数据
String[] wordsStrings = line.split(" ");
//循环
for (String w: wordsStrings) {
//出现一次,记一个一,输出
context.write(new Text(w), new LongWritable(1));
}
}
}
2.新建一个WCReducer继承Reducer
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//接收数据
//Text k3 = k2;
//定义一个计算器
long counter = 0;
//循环v2s
for (LongWritable i : v2s)
{
counter += i.get();
}
//输出
context.write(key, new LongWritable(counter));
}
}
3.WordCount类实现Main方法
/*
* 1.分析具体的业力逻辑,确定输入输出数据样式
* 2.自定义一个类,这个类要继承import org.apache.hadoop.mapreduce.Mapper;
* 重写map方法,实现具体业务逻辑,将新的kv输出
* 3.自定义一个类,这个类要继承import org.apache.hadoop.mapreduce.Reducer;
* 重写reduce,实现具体业务逻辑
* 4.将自定义的mapper和reducer通过job对象组装起来
*/
public class WordCount {
public static void main(String[] args) throws Exception {
// 构建Job对象
Job job = Job.getInstance(new Configuration());
// 注意:main方法所在的类
job.setJarByClass(WordCount.class);
// 设置Mapper相关属性
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("/words.txt"));
// 设置Reducer相关属性
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path("/wcount619"));
// 提交任务
job.waitForCompletion(true);
}
}
4.打包为wc.jar,并上传到linux,并在Hadoop下运行
hadoop jar /root/wc.jar