复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:
1. 求Sum
2. 求Count
3. 计算平均数
每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来
1 package yjmyzz.mr.job.link; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.DoubleWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import yjmyzz.util.HDFSUtil; 14 15 import java.io.IOException; 16 17 18 public class Avg2 { 19 20 private static final Text TEXT_SUM = new Text("SUM"); 21 private static final Text TEXT_COUNT = new Text("COUNT"); 22 private static final Text TEXT_AVG = new Text("AVG"); 23 24 //计算Sum 25 public static class SumMapper 26 extends Mapper<LongWritable, Text, Text, LongWritable> { 27 28 public long sum = 0; 29 30 public void map(LongWritable key, Text value, Context context) 31 throws IOException, InterruptedException { 32 sum += Long.parseLong(value.toString()); 33 } 34 35 protected void cleanup(Context context) throws IOException, InterruptedException { 36 context.write(TEXT_SUM, new LongWritable(sum)); 37 } 38 39 } 40 41 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 42 43 public long sum = 0; 44 45 public void reduce(Text key, Iterable<LongWritable> values, Context context) 46 throws IOException, InterruptedException { 47 for (LongWritable v : values) { 48 sum += v.get(); 49 } 50 context.write(TEXT_SUM, new LongWritable(sum)); 51 } 52 53 } 54 55 //计算Count 56 public static class CountMapper 57 extends Mapper<LongWritable, Text, Text, LongWritable> { 58 59 public long count = 0; 60 61 public void map(LongWritable key, Text value, Context context) 62 throws IOException, InterruptedException { 63 count += 1; 64 } 65 66 protected void cleanup(Context context) throws IOException, InterruptedException { 67 context.write(TEXT_COUNT, new LongWritable(count)); 68 } 69 70 } 71 72 public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 73 74 public long count = 0; 75 76 public void reduce(Text key, Iterable<LongWritable> values, Context context) 77 throws IOException, InterruptedException { 78 for (LongWritable v : values) { 79 count += v.get(); 80 } 81 context.write(TEXT_COUNT, new LongWritable(count)); 82 } 83 84 } 85 86 //计算Avg 87 public static class AvgMapper 88 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 89 90 public long count = 0; 91 public long sum = 0; 92 93 public void map(LongWritable key, Text value, Context context) 94 throws IOException, InterruptedException { 95 String[] v = value.toString().split("\t"); 96 if (v[0].equals("COUNT")) { 97 count = Long.parseLong(v[1]); 98 } else if (v[0].equals("SUM")) { 99 sum = Long.parseLong(v[1]); 100 } 101 } 102 103 protected void cleanup(Context context) throws IOException, InterruptedException { 104 context.write(new LongWritable(sum), new LongWritable(count)); 105 } 106 107 } 108 109 110 public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> { 111 112 public long sum = 0; 113 public long count = 0; 114 115 public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) 116 throws IOException, InterruptedException { 117 sum += key.get(); 118 for (LongWritable v : values) { 119 count += v.get(); 120 } 121 } 122 123 protected void cleanup(Context context) throws IOException, InterruptedException { 124 context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count)); 125 } 126 127 } 128 129 130 public static void main(String[] args) throws Exception { 131 132 Configuration conf = new Configuration(); 133 134 String inputPath = "/input/duplicate.txt"; 135 String maxOutputPath = "/output/max/"; 136 String countOutputPath = "/output/count/"; 137 String avgOutputPath = "/output/avg/"; 138 139 //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在) 140 HDFSUtil.deleteFile(conf, maxOutputPath); 141 HDFSUtil.deleteFile(conf, countOutputPath); 142 HDFSUtil.deleteFile(conf, avgOutputPath); 143 144 Job job1 = Job.getInstance(conf, "Sum"); 145 job1.setJarByClass(Avg2.class); 146 job1.setMapperClass(SumMapper.class); 147 job1.setCombinerClass(SumReducer.class); 148 job1.setReducerClass(SumReducer.class); 149 job1.setOutputKeyClass(Text.class); 150 job1.setOutputValueClass(LongWritable.class); 151 FileInputFormat.addInputPath(job1, new Path(inputPath)); 152 FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath)); 153 154 155 Job job2 = Job.getInstance(conf, "Count"); 156 job2.setJarByClass(Avg2.class); 157 job2.setMapperClass(CountMapper.class); 158 job2.setCombinerClass(CountReducer.class); 159 job2.setReducerClass(CountReducer.class); 160 job2.setOutputKeyClass(Text.class); 161 job2.setOutputValueClass(LongWritable.class); 162 FileInputFormat.addInputPath(job2, new Path(inputPath)); 163 FileOutputFormat.setOutputPath(job2, new Path(countOutputPath)); 164 165 166 Job job3 = Job.getInstance(conf, "Average"); 167 job3.setJarByClass(Avg2.class); 168 job3.setMapperClass(AvgMapper.class); 169 job3.setReducerClass(AvgReducer.class); 170 job3.setMapOutputKeyClass(LongWritable.class); 171 job3.setMapOutputValueClass(LongWritable.class); 172 job3.setOutputKeyClass(Text.class); 173 job3.setOutputValueClass(DoubleWritable.class); 174 175 //将job1及job2的输出为做job3的输入 176 FileInputFormat.addInputPath(job3, new Path(maxOutputPath)); 177 FileInputFormat.addInputPath(job3, new Path(countOutputPath)); 178 FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath)); 179 180 //提交job1及job2,并等待完成 181 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) { 182 System.exit(job3.waitForCompletion(true) ? 0 : 1); 183 } 184 185 } 186 187 188 }
输入文本在上一篇可以找到,上面这段代码的主要思路:
1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下
2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下
时间: 2024-10-09 08:29:39