1. 需求
求每年的最高温度
2. 样例数据
1995 10 1996 11 1995 16 1995 22 1996 26 1995 3 1996 7 1996 10 1996 20 1996 33 1995 21 1996 9 1995 31 1995 -13 1995 22 1997 -2 1997 28 1997 15 1995 8
3. 思路、代码
将记录按年份分组并按温度降序排序,然后才将同一年份的所有记录送到一个 reducer 组,则各组的首条记录就是这一年的最高温度。实现此方案的要点是:
a. 定义包括自然键(年份)和自然值(温度)的组合键。
b. 根据组合键对记录进行排序。
c. 针对组合键进行分区和分组时均只考虑自然键。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 组合键,此例中用于辅助排序,包括年份和温度。 */ public class IntPair implements WritableComparable<IntPair> { private IntWritable first; private IntWritable second; public IntPair() { this.first = new IntWritable(); this.second = new IntWritable(); //若注释掉上面两行,使用时会发生异常 java.lang.NullPointerException at IntPair.readFields } public IntPair(int first, int second) { set(new IntWritable(first), new IntWritable(second)); } public IntPair(IntWritable first, IntWritable second) { set(first, second); } public void set(IntWritable first, IntWritable second) { this.first = first; this.second = second; } public IntWritable getFirst() { return first; } public IntWritable getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof IntPair) { IntPair ip = (IntPair) obj; return first.get() == ip.first.get() && second.get() == ip.second.get(); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(IntPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new IntPair(Integer.parseInt(val[0]), Integer.parseInt(val[1])), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> { @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); //仅输出第一行 } } //仅根据 first 分区 public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } //仅根据 first 分组 public static class GroupComparator extends WritableComparator { private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator(); protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return INT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof IntPair && b instanceof IntPair) { return ((IntPair) a).getFirst().compareTo(((IntPair) b).getFirst()); } return super.compare(a, b); } } //根据组合键排序 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof IntPair && b instanceof IntPair) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; int cmp = ip1.getFirst().compareTo(ip2.getFirst()); //升序(年份) if (cmp != 0) { return cmp; } return -ip1.getSecond().compareTo(ip2.getSecond()); //降序(温度) } return super.compare(a, b); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Parameter number is wrong, please enter two parameters:<input> <output>"); System.exit(-1); } Path inputPath = new Path(otherArgs[0]); Path outputPath = new Path(otherArgs[1]); //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000"); Job job = Job.getInstance(conf, "MaxTemperatureUsingSecondarySort"); //job.setJar("F:/workspace/AssistRanking2/target/AssistRanking2-1.0-SNAPSHOT.jar"); job.setMapperClass(MaxTemperatureMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); //默认根据 Key 的 compareTo 函数排序 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
4. 运行截图
注:本例源自 《Hadoop权威指南》第三版 8.2.4
时间: 2024-10-28 15:31:37