MultipleOutputs:
write data to multiple files with customized name, can be used for both map and reduce phase.
http://www.lichun.cc/blog/2013/11/how-to-use-hadoop-multipleoutputs/
public static class MyMap extends Mapper<LongWritable, Text, Text, DoubleWritable> { MultipleOutputs<Text, DoubleWritable> mos; public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { mos.write(map_out_file, NullWritable.get(), new Text(name)); } @Override public void setup(Context context) { mos = new MultipleOutputs<Text, DoubleWritable>(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } }
example
package a5p2; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class ClassAvg2 { public static final String map_out_file = "mapOutFileIndividualStudentAverage"; public static final String reduce_out_file = "reduceOutFileClassAverage"; public static class AvgMap extends Mapper<LongWritable, Text, Text, DoubleWritable> { MultipleOutputs<Text, DoubleWritable> mos; public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { String line = inValue.toString(); StringTokenizer myToken = new StringTokenizer(line); String name = myToken.nextToken(); int cnt = 0; double sum = 0; double avg; while (myToken.hasMoreTokens()) { sum += Float.parseFloat(myToken.nextToken()); cnt++; } avg = sum / cnt; context.write(new Text(name), new DoubleWritable(avg)); mos.write(map_out_file, NullWritable.get(), new Text(name + " " + avg)); } @Override public void setup(Context context) { mos = new MultipleOutputs<Text, DoubleWritable>(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } public static class AvgReduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { MultipleOutputs<Text, DoubleWritable> mos; public void reduce(Text key, Iterable<DoubleWritable> inValues, Context context) throws IOException, InterruptedException { double classSum = 0; int cnt = 0; for (DoubleWritable dw : inValues) { classSum += dw.get(); cnt++; } double classAvg = classSum / cnt; mos.write(reduce_out_file, NullWritable.get(), new Text( "Class average: " + classAvg)); // context.write(new Text("class average"), new DoubleWritable( // classAvg)); } @Override public void setup(Context context) { mos = new MultipleOutputs<Text, DoubleWritable>(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } public static class AvgGroupComparator implements RawComparator<Text> { public int compare(Text t1, Text t2) { return 0; } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return 0; } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "class avg"); job.setJarByClass(ClassAvg2.class); // mapper job.setMapperClass(AvgMap.class); job.setGroupingComparatorClass(AvgGroupComparator.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); // reducer job.setReducerClass(AvgReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // input and output FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); MultipleOutputs.addNamedOutput(job, map_out_file, TextOutputFormat.class, NullWritable.class, Text.class); MultipleOutputs.addNamedOutput(job, reduce_out_file, TextOutputFormat.class, NullWritable.class, Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
时间: 2024-10-11 00:42:16