data:
3 3
3 2
3 2
2 2
2 1
1 1
---------------------
需求:
1 1
2 2
3 3
当第一列相同时候要第二列的最小值
package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * 自定义分组 k2 * */ public class GroupApp { private static final String inputPaths = "hdfs://hadoop:9000/data"; private static final String OUT_PATH = "hdfs://hadoop:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); fileSystem.delete(new Path(OUT_PATH), true); Job job = new Job(conf, GroupApp.class.getSimpleName()); job.setJarByClass(GroupApp.class); FileInputFormat.setInputPaths(job, inputPaths); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(LongWritable.class); //设置自定义的分组键 job.setGroupingComparatorClass(MyGropComparator.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ @Override protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); context.write(new NewK2(Long.parseLong(split[0]),Long.parseLong(split[1])),new LongWritable(Long.parseLong(split[1]))); } } /** * * k2这时候没有相等的,意味着Reduce接收到6个分组 * */ public static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce(NewK2 key, Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { Long min = Long.MAX_VALUE; for (LongWritable longWritable : values) { if(longWritable.get()<min){ min = longWritable.get(); } } context.write(new LongWritable(key.frist),new LongWritable(min)); } } /** * * 自定义排序 * * */ public static class NewK2 implements WritableComparable<NewK2>{ long frist; long second; public NewK2(){} public NewK2(long frist, long second) { this.frist = frist; this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.frist); out.writeLong(this.frist); } @Override public void readFields(DataInput in) throws IOException { this.frist = in.readLong(); this.second = in.readLong(); } /** * 做比较,先按照第一列进行排序,当第一列相同时,按照第二列进行排序 */ @Override public int compareTo(NewK2 o) { long minus = this.frist - o.frist; if(minus != 0){ //不相等 return (int)minus; } //第一列相等,让第二列进行处理 return (int)(this.second - o.second); } } /** * 自定义分组 * */ public static class MyGropComparator implements RawComparator<NewK2>{ @Override public int compare(NewK2 o1, NewK2 o2) { return 0; } /** * b1相当于this b2相当于o * s1 和s2 表示的是 从很长的字节数组中从哪个位置开始读取值, * l1和l2处理的长度 */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } } }
时间: 2024-10-20 09:47:48