package cn.sjq.mr.sort;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* MapReduce实现热销商品TopN排行
* 这里按照商品购买次数排名在前面的为热销商品
* 输入数据:
* order.data1...10 10个订单文件,每个文件5000-10000条的购买记录,格式如下:
* orderid userid payment productid
* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185
c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117
1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124
e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197
......
* 最终输出数据(TopN):
热销商品排行Top10
商品ID 销售数量
21010129 871
21010182 852
21010153 839
21010131 837
21010142 835
21010159 833
21010117 830
21010110 828
21010141 824
21010198 823
*
* 实现逻辑:
* Mapper端:
* (1)实现数据分片,将读取的数据分片通过map方法处理后输出到Combiner
* (2)数据的输出格式
* <k2>Text <v2>Intwritable
* 21010185 <1>
* 21010117 <1>
* 21010185 <1>
* ... ...
* Combiner端:
* (1)Combiner是一种特殊的Reducer,使用Combiner需要注意不要改变程序原有逻辑,且保障Mapper端和Reducer端的数据类型一致
* (2)这里使用Combiner主要是为了实现
* 1)每个商品购买次数求和
* 2)对于每个局部的Combiner任务,对接收到Mapper端输出的数据处理后进行局部TopN排行,这样可以避免不必要的数据传递到Reducer端,同时提高Reducer程序的执行效率
* (3)TopN中的N由Hadoop的configuration中set(K,V)来设置,这样可以保障运行在各个机器上的任务可以获取到这个全局唯一的N值
* (4)处理后数据输出格式如下:
* <k2`> <v2`>
* 21010185 <30>
* 21010117 <20>
* ... ...
* 注意:这里输出为局部TopN排行
*
* Reducer端:
* (1)Reducer端主要对Combiner端输出的多个局部排行的TopN条数据进行全局排行汇总
* (2)由于最终输出只会到一个文件,因此需要保障Reducer Tasks任务数为1
* (3)通过Reducer处理后,最终输出为
* <k3> <v4>
* 21010185 <30>
* 21010117 <20>
* ... ...
*
* @author songjq
*
*/
public class HotProductTopN {
/**
* Mapper端:
* (1)实现数据分片,将读取的数据分片通过map方法处理后输出到Combiner
* (2)数据的输出格式
* <k2>Text <v2>Intwritable
* 21010185 <1>
* 21010117 <1>
* 21010185 <1>
* ... ...
* @author songjq
*
*/
static class HotProductTopNMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text tkey = new Text();
private IntWritable tvalue = new IntWritable();
/*
* 读取文件分片,并处理后输出到Combiner
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//读入一行数据
String line = v1.toString();
//分词处理
String[] order = line.split(",");
if(null!=order && order.length == 4) {
//商品ID
String productId = order[3];
tkey.set(productId);
tvalue.set(1);
//通过context将数据传递到Combiner
context.write(tkey, tvalue);
}else {
return;
}
}
}
/**
* * Combiner端:
* (1)Combiner是一种特殊的Reducer,使用Combiner需要注意不要改变程序原有逻辑,且保障Mapper端和Reducer端的数据类型一致
* (2)这里使用Combiner主要是为了实现
* 1)每个商品购买次数求和
* 2)对于每个局部的Combiner任务,对接收到Mapper端输出的数据处理后进行局部TopN排行,这样可以避免不必要的数据传递到Reducer端,同时提高Reducer程序的执行效率
* (3)TopN中的N由Hadoop的configuration中set(K,V)来设置,这样可以保障运行在各个机器上的任务可以获取到这个全局唯一的N值
* (4)处理后数据输出格式如下:
* <k2`> <v2`>
* 21010185 <30>
* 21010117 <20>
* ... ...
* 注意:这里输出为局部TopN排行
* @author songjq
*
*/
static class HotProductTopNCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
private TreeSet<String[]> treeSet = null;
//全局前N条商品排名
private Integer N = null;
/*
* 初始化方法,在reduce方法调用前执行,只会被执行一次
* 通过该方法,我们可以获取全局N变量的值,且可以初始化TopN的treeset集合。
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取全局N
N = Integer.valueOf(context.getConfiguration().get("Global_N"));
//实例化treeSet,并对其内容按照商品购买次数进行排序
treeSet = new TreeSet<String[]>(new Comparator<String[]>() {
@Override
public int compare(String[] o1, String[] o2) {
Integer count1 = Integer.valueOf(o1[1]);
Integer count2 = Integer.valueOf(o2[1]);
int result = 0;
if(count1>count2) {
result = -1;
}else if(count1<count2) {
result = 1;
}
return result;
}
});
}
/*
* 对相同的ProductId求和,并将其加到treeSet集合,treeSet只存放排名TopN的N条商品
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void reduce(Text k3_, Iterable<IntWritable> v3_, Context ctx)
throws IOException, InterruptedException {
//商品次数求和
Integer count = 0;
for(IntWritable val:v3_) {
count += val.get();
}
//将商品放入treeSet集合
String[] arys = {k3_.toString(),count.toString()};
treeSet.add(arys);
//treeSet记录超过N条,就删除最后一条数据
if(treeSet.size()>N) {
treeSet.pollLast();
}
}
/*
* cleanup在reduce调用结束后执行
* 这里利用cleanup方法将treeSet集合中数据写出去
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for(String[] ary:treeSet) {
context.write(new Text(ary[0]), new IntWritable(Integer.valueOf(ary[1])));
}
}
}
/**
* * Reducer端:
* (1)Reducer端主要对Combiner端输出的多个局部排行的TopN条数据进行全局排行汇总
* (2)由于最终输出只会到一个文件,因此需要保障Reducer Tasks任务数为1
* (3)通过Reducer处理后,最终输出为
* <k3> <v4>
* 21010185 <30>
* 21010117 <20>
* ... ...
* @author songjq
*
*/
static class HotProductTopNReducer extends Reducer<Text, IntWritable, Text, Text>{
//实现思路和Combiner一致
//存放TopN记录 HashMap<"ProductId", count>
private TreeSet<String[]> treeSet = null;
//全局前N条商品排名
private Integer N = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//获取全局N
N = Integer.valueOf(context.getConfiguration().get("Global_N"));
//实例化treeSet,并对其内容按照商品购买次数进行排序
treeSet = new TreeSet<String[]>(new Comparator<String[]>() {
@Override
public int compare(String[] o1, String[] o2) {
Integer count1 = Integer.valueOf(o1[1]);
Integer count2 = Integer.valueOf(o2[1]);
int result = 0;
if(count1>count2) {
result = -1;
}else if(count1<count2) {
result = 1;
}
return result;
}
});
}
/*
* 对Combiner输出的数据进行全局排行
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3,
Context ctx) throws IOException, InterruptedException {
//汇总Combiner任务输出过来的商品次数
int count = 0;
for(IntWritable val:v3) {
count+=val.get();
}
String[] arys = {k3.toString(),String.valueOf(count)};
treeSet.add(arys);
//treeSet超过N条记录,则删除最后一个节点
if(treeSet.size()>N) {
treeSet.pollLast();
}
}
/*
* reduce方法结束后执行,这里将treeSet结果集写到HDFS
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
context.write(new Text("热销商品排行Top"+N), new Text());
context.write(new Text("商品ID"), new Text("销售数量"));
for(String[] ary:treeSet) {
context.write(new Text(ary[0]), new Text(ary[1]));
}
}
}
/**
* 提交任务Job
* @throws Exception
*/
@Test
public void HotProductTopNJob() throws Exception {
Configuration conf = new Configuration();
conf.set("Global_N", "10");
Job job = Job.getInstance(conf);
job.setJarByClass(HotProductTopN.class);
//Mapper
job.setMapperClass(HotProductTopNMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//Combiner
job.setCombinerClass(HotProductTopNCombiner.class);
//Reducer
job.setReducerClass(HotProductTopNReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//必须设置为1
job.setNumReduceTasks(1);
//输入路径
FileInputFormat.setInputPaths(job, "D:\\test\\tmp\\userTopN");
job.setInputFormatClass(TextInputFormat.class);
//输出路径
Path outpath = new Path("D:\\test\\tmp\\TopNout");
outpath.getFileSystem(conf).delete(outpath, true);
FileOutputFormat.setOutputPath(job, outpath);
job.waitForCompletion(true);
}
}
原文地址:http://blog.51cto.com/2951890/2155538