Mapper
// map的数量与数的分片有关系 public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = StringUtils.split(line, " "); for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }
Reducer
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable l : values) { count ++; } context.write(key, new LongWritable(count)); } }
Runner
public class WCRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WCRunner.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 设置reduce的数量,对应的会生成设置数量的文件,每个文件的内容是根据 // job.setPartitionerClass(HashPartitioner.class);中的Partitioner确定 job.setNumReduceTasks(10); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
public class WCRunner2 extends Configured implements Tool{ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WCRunner2.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new WCRunner2(), args); } }
执行: hadoop jar wc.jar com.easytrack.hadoop.mr.WCRunner2 /wordcount.txt /wc/output4
时间: 2024-10-29 19:07:15