mapreduce的处理过程分为2个阶段,map阶段,和reduce阶段。在要求统计指定文件中的所有单词的出现次数时,
map阶段把每个关键词写到一行上以逗号进行分隔,并初始化数量为1(相同的单词hadoop中的map会自动放到一行中)
reduce阶段是把每个单词出现的频率统计出来重新写回去。
如代码:
package com.clq.hadoop2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { final Text key2 = new Text(); // value2 表示单词在该行中的出现次数 final IntWritable value2 = new IntWritable(1); // key 表示文本行的起始位置 // value 表示文本行 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { final String[] splited = value.toString().split(","); for (String word : splited) { key2.set(word); // 把key2、value2写入到context中 context.write(key2, value2); } } }
package com.clq.hadoop2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // value3表示单词出现的总次数 final IntWritable value3 = new IntWritable(0); /** * key 表示单词 values 表示map方法输出的1的集合 context 上下文对象 */ protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException { int sum = 0; for (IntWritable count : values) { sum += count.get(); } // 执行到这里,sum表示该单词出现的总次数 // key3表示单词,是最后输出的key final Text key3 = key; // value3表示单词出现的总次数,是最后输出的value value3.set(sum); context.write(key3, value3); } }
package com.clq.hadoop2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MapperReducer { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //指定输入和输出路径 final String INPUT_PATH = "hdfs://ubuntu:9000/Input"; final String OUTPUT_PATH = "hdfs://ubuntu:9000/output"; //创建一个job对象封装运行时所需要的信息 final Job job = new Job(new Configuration(),"MapperReducer"); //打成jar执行 job.setJarByClass(MapperReducer.class); FileInputFormat.setInputPaths(job, INPUT_PATH); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //指定自己自定义的mapper类 job.setMapperClass(MyMapper.class); //指定运行mapper类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定自己定义的reducer类 job.setReducerClass(MyReducer.class); //指定reducer的key和value类型 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } }
时间: 2024-10-14 05:37:38