1、对于排名,一般都是很热衷的,那么如何实现在数据量多的情况下,得到所需要的数据呢,选取前几名的实际应用中,也会有许多,形成统一的算法实现,比着参考就可以了。
2、数据文件a.txt:
2
4
6
7
9
6
4
3、输出数据为(例如取前三名,前面为数据,后面为名次,名次可通过输入参数配置):
9 1
7 2
6 3
4、设计思路:
Map端:
1)因为是topK问题,自然想到可以自己设计key,重写比较方法,按自己所想要的比较方法实现即可。
2)map端输出,仅需输出最后的n个最大数即可,所以需要在最后的那个cleanup方法里实现,通过一个成员变量list保存数据。
3)最后输出此list中的数据即可
4)list为自定义的一个容器,在往里面添加元素的时候,会进行比较。
5)在这个例子中,没有使用list,而是直接采用map的suffle对key进行的排序。一个map的输出量会很多。下一篇优化里面使用了list
Reduce端:
1) 类似一个map,只针对key进行处理即可,输出前n的数值(此n个key已是经过suffle排过序的,取前n个即可,其它丢弃)。
5、程序实现:
package test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TopK { public static class Map extends Mapper<Object, Text, MyKey, NullWritable>{ //此处可以进行优化,只需输出此map处理的最大的那几个值即可。 protected void map(Object key, Text value, Context context) throws java.io.IOException ,InterruptedException { try { int num = Integer.parseInt(value.toString()); context.write(new MyKey(num), NullWritable.get()); } catch (Exception e) { // TODO: handle exception return ; } }; } public static class Reduce extends Reducer<MyKey, NullWritable, Text, NullWritable>{ private static Text k = new Text(); //默认值,获取的个数 private static int count = 5; //开始值 private static int start = 1; //初始化时获取配置的参数 protected void setup(Context context) throws IOException ,InterruptedException { //取得配置的参数,需要获取几个值 count = Integer.parseInt(context.getConfiguration().get("top_num")); }; protected void reduce(MyKey key, Iterable<NullWritable> values, Context context) throws IOException ,InterruptedException { //因为key是排序过来的,所以输出count个值就可以了 if(start <= count){ k.set(key.getNum()+"\t"+start); context.write(k, NullWritable.get()); start++; } }; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 3){ System.err.println("Usage:TopK"); System.exit(2); } //参数3 为要获取的最大个数 conf.set("top_num", args[2]); Job job = new Job(conf, "TopK"); job.setJarByClass(TopK.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } private static class MyKey implements WritableComparable<MyKey>{ private int num; public int getNum() { return num; } public MyKey() { } public MyKey(int num) { super(); this.num = num; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub num = in.readInt(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(num); } @Override public int compareTo(MyKey o) { // TODO Auto-generated method stub //反序输出 return o.num - this.num; } } }
计算结果:
9 1
7 2
6 3
hadoop1-TopK问题实现,布布扣,bubuko.com
时间: 2025-01-01 09:58:43