程序事例: 日志信息: 二手车 1345 二手房 3416 洗衣机 2789 输入: N=2 输出: 二手房 洗衣机
map函数如下:
import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TopNMapper extends Mapper<Object, Text, NullWritable, Text> { private TreeMap<IntWritable, Text> tm = new TreeMap<IntWritable, Text>(); private IntWritable mykey = new IntWritable(); private Text myvalue = new Text(); private int N = 10; @Override protected void map(Object key, Text value, Mapper<Object, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { String word = value.toString().split("\t")[0]; int num = Integer.parseInt(value.toString().split("\t")[1]); mykey.set(num); myvalue.set(word); tm.put(mykey, myvalue); if (tm.size() > N) tm.remove(tm.firstKey()); } @Override protected void cleanup( Mapper<Object, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Map.Entry<IntWritable, Text> entry : tm.entrySet()) { Text value = new Text(entry.getKey() + " " + entry.getValue()); context.write(NullWritable.get(), value); } } }
Reduce函数如下:
import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TopNReducer extends Reducer<NullWritable, Text, NullWritable, Text>{ private TreeMap<IntWritable, Text> tm = new TreeMap<IntWritable, Text>(); private IntWritable mykey = new IntWritable(); private Text myvalue = new Text(); private int N = 10; @Override protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Text val : values) { String[] tmp = val.toString().split(" "); mykey.set(Integer.parseInt(tmp[0])); myvalue.set(tmp[1]); tm.put(mykey, myvalue); if (tm.size() > N) tm.remove(tm.firstKey()); } for (Text res : tm.descendingMap().values()) { context.write(NullWritable.get(), res); } } }
时间: 2024-10-15 13:58:07