/*** * MapReduce Module * @author nele * */ public class ModuleMapReduce extends Configured implements Tool { // map class /** * * @author nele * TODO */ public static class ModuleMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO } } // reduce class /*** * * @author nele * TODO */ public static class ModuleReducer extends Reducer<LongWritable, Text, LongWritable, Text> { @Override //TODO public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // TODO } } // run method public int run(String[] args) throws Exception { Configuration conf = super.getConf(); // create job Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // set input path Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // map //TODO job.setMapperClass(ModuleMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // conbile job.setCombinerClass(ModuleReducer.class); // reduce //TODO job.setReducerClass(ModuleReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // submit return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { // args = new String[] { // "hdfs://bigdata5:8020/user/nele/data/input/wc.txt", // "hdfs://bigdata5:8020/user/nele/data/output/output3" }; Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new ModuleMapReduce(), args); System.exit(status); } }
时间: 2024-11-29 11:12:22