package cn.sjq.mr.sort;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.Random;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* MapReduce实现对单个用户支付金额最大的前N个商品排名
* 输入数据:
* order.data1...10 10个订单文件,每个文件5000-10000条的购买记录,格式如下:
* orderid userid payment productid
* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185
c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117
1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124
e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197
......
处理后最终输出的数据(假设N=5),输出排名前五的商品按升序排列:
U10102000100[Top5]
21010129 27530
21010132 28404
21010121 30183
21010176 31576
21010109 32166
U10102000101[Top5]
21010165 26643
21010150 26826
21010186 27288
21010163 32127
21010121 34032
U10102000102[Top5]
21010138 24253
21010116 24641
21010140 25890
21010174 27297
21010123 28498
U10102000103[Top5]
21010178 28687
21010104 29050
21010166 29114
21010159 33205
21010184 39350
......
U10102000198[Top5]
21010155 27789
21010186 28571
21010178 28947
21010159 31851
21010124 32765
U10102000199[Top5]
21010141 27624
21010112 27890
21010173 29424
21010138 30896
21010154 35204
实现逻辑:
(1)由于要实现对每个用户购买金额最大的前N个商品排名,因此这里将每一个用户输出到一个文件【假设100个用户,就有100个用户的排名文件】
(2)由于每个用户最终输出到一个文件,因此Reduce Tasks数量需要>=用户数
Mapper端(第一阶段):
(1)对读入的数据分片进行处理
(2)输出格式如下:
<k2> <v2>
userid&productId payment
自定义分区Partitioner:
目的是对每个用户分配独立的分区进行处理,处理后输出到独立文件
Reuder端(第一阶段):
(1)对Mapper传过来的用户商品进行求和
(2)对商品按照支付金额降序排序,且输出前TopN个用户购买金额最大的商品
(2)输出格式
<k4> <k4>
<userid,productId> <payment_total>
Mapper端(第二阶段)
(1)读取第一阶段Reducer输出用户排行文件
(2)处理后输出到第二阶段Reducer
Reducer端(第二阶段)
(1)接收第二阶段Mapper传递过来的数据
(2)将数据进行格式处理后输出到HDFS
* @author songjq
*
*/
public class SingleUserPaymentTopN {
/**
* 数据来源:
* 利用Java代码构造简单的订单数据,这里构造多个数据文件,每个文件5000-10000行数据
* 构造数据格式:
* orderid ,userid, payment, productid
53d419fa-0df4-4b6d-8214-dac158bf33e7,U10102000186, 2008, 210100
7a200107-1711-4f83-a09d-76b21ef37575,U10102000182, 1155, 210100
367d1d43-2a38-48a1-a3bc-9065d215f093,U10102000177, 1951, 210100
6082506e-0cfb-47e2-902b-f5cbceac4a21,U10102000121, 2619, 210100
通过该程序,我们就构造了10个order.data文件,且每个文件中数据为5000-10000行
如果要通过MapReduce来对payment进行TopN排行,那数据的数据量为5万-10万行,足以支持我们的测试。
* @author songjq
*
*/
public static class OrderData {
public static void main(String[] args) throws Exception {
for(int i=0;i<10;i++) {
FileOutputStream out = new FileOutputStream("D:\\test\\tmp\\userTopN\\order.data"+(i+1));
int lines = 5000+new Random().nextInt(5000);
int count = 0;
while(count<lines) {
//订单ID,采用UUID是为了防止生成在多个文件的中订单ID全局唯一
UUID uuid = UUID.randomUUID();
//商品支付金额
int payment = 1000+new Random().nextInt(2000);
//用户ID,随机构造100-200之间编号的用户,用户数<=100
int userid = 100+new Random().nextInt(100);
//产品ID,随机构造100-200之间编号的商品,商品数<=100
int productId = 100+new Random().nextInt(100);
String orderdata = uuid+",U10102000"+userid+","+payment+",21010"+productId+"\n";
out.write(orderdata.getBytes());
count++;
}
out.flush();
out.close();
}
}
}
/**
* Mapper端(第一阶段):
(1)对读入的数据分片进行处理
(2)输出格式如下:
<k2> <v2>
userid&productId payment
* @author songjq
*
*/
static class SingleUserPaymentTopNStepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
private Text tkey = new Text();
private LongWritable tvalue = new LongWritable();
@Override
protected void map(LongWritable k1, Text v1,Context context)
throws IOException, InterruptedException {
String line = v1.toString();
String[] order = line.split(",");
tkey.set(order[1]+"&"+order[3]);
tvalue.set(Long.valueOf(order[2]));
context.write(tkey, tvalue);
}
}
/**
* Reuder端(第一阶段):
(1)对Mapper传过来的用户商品进行求和
(2)对商品按照支付金额降序排序,且输出前TopN个用户购买金额最大的商品
(2)输出格式
<k4> <k4>
<userid,productId> <payment_total>
* @author songjq
*
*/
static class SingleUserPaymentTopNStepOneReducer extends Reducer<Text, LongWritable, NullWritable, Text>{
//实现思路和Combiner一致
private TreeSet<String[]> treeSet = null;
//全局前N条商品排名
private Integer N = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//获取全局N
N = Integer.valueOf(context.getConfiguration().get("Global_N"));
//实例化treeSet,并对其内容按照商品购买次数进行排序
treeSet = new TreeSet<String[]>(new Comparator<String[]>() {
@Override
public int compare(String[] o1, String[] o2) {
long payment1 = Long.valueOf(o1[1]);
long payment2 = Long.valueOf(o2[1]);
int result = 0;
if(payment1>payment2) {
result = -1;
}else if(payment1<payment2) {
result = 1;
}
return result;
}
});
}
@Override
protected void reduce(Text k3, Iterable<LongWritable> v3,
Context ctx) throws IOException, InterruptedException {
long total_payment = 0;
for(LongWritable val:v3) {
total_payment += val.get();
}
String[] arys = {k3.toString(),String.valueOf(total_payment)};
treeSet.add(arys);
//超过N条记录,则将最后一条移除
if(treeSet.size()>N) {
treeSet.pollLast();
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for(String[] ary:treeSet) {
String[] arr = ary[0].split("&");
context.write(NullWritable.get(), new Text(arr[0]+","+arr[1]+","+ary[1]));
}
}
}
/**
* 自定义分区Partitioner
* 目的是对每个用户分配独立的分区进行处理,处理后输出到独立文件
* @author songjq
*
*/
static class SingleUserPaymentTopNPartitioner extends Partitioner<Text, LongWritable>{
@Override
public int getPartition(Text k2, LongWritable v2, int reduceTasks) {
/*
* <k2>的数据格式为UserId&ProductId,比如U10102000103&21010177
* 在我们构造数据时UserId的生成机制为"固定部分"+"可变部分","U10102000"+"103"
* "可变部分"又由 100+new Random().nextInt(100),new Random().nextInt(100)表示在100以内随机生成整数
* 这样就把用户现在在了100以内,因此我们reduceTasks设置为100就能保证每个userID都有对于Reduce任务进行处理。
* 因此对于输入的每个UserId,只要将new Random().nextInt(100)这部分作为分区号返回即可。
*/
String[] split = k2.toString().split("&");
String userid = split[0];
String lastTwoNumber = userid.substring(userid.length()-2,userid.length());
if(lastTwoNumber.startsWith("0")) {
lastTwoNumber = lastTwoNumber.substring(1);
}
return Integer.valueOf(lastTwoNumber);
}
}
/**
* 第二阶段Mapper
* 该Mapper不需要Reducer
* @author songjq
*
*/
static class SingleUserPaymentTopNStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text tkey = new Text();
private Text tvalue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String userid = split[0];
String productId = split[1];
String total_payment = split[2];
tkey.set(userid);
tvalue.set(productId+"\t"+total_payment);
context.write(tkey, tvalue);
}
}
/**
* 第二阶段Reducer
* @author songjq
*
*/
static class SingleUserPaymentTopNStepTwoReducer extends Reducer<Text, Text, Text, Text>{
private String N = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
N = context.getConfiguration().get("Global_N");
}
@Override
protected void reduce(Text k3, Iterable<Text> v3, Context ctx)
throws IOException, InterruptedException {
ctx.write(new Text(k3.toString()+"[Top"+N+"]"), new Text(""));
for(Text val:v3) {
ctx.write(new Text("\t"), val);
}
}
}
@Test
public void SingleUserPaymentTopNJob() throws Exception {
// 构造第一阶段的基本job对象job1
Configuration conf1 = new Configuration();
conf1.set("Global_N", "5");
Job job1 = Job.getInstance(conf1,"stepOne");
job1.setJarByClass(SingleUserPaymentTopN.class);
//Mapper
job1.setMapperClass(SingleUserPaymentTopNStepOneMapper.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
//Reducer
job1.setReducerClass(SingleUserPaymentTopNStepOneReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
//指定自定义分区
job1.setPartitionerClass(SingleUserPaymentTopNPartitioner.class);
//这里设置Reduce Tasks数量为100,>=订单中用户数量,保障每个用户都有一个对应的Reduce任务处理
job1.setNumReduceTasks(100);
//输入路径
FileInputFormat.setInputPaths(job1, "D:\\test\\tmp\\userTopN");
job1.setInputFormatClass(TextInputFormat.class);
//输出路径
Path outpath = new Path("D:\\test\\tmp\\SingleUserPaymentTopNout");
outpath.getFileSystem(conf1).delete(outpath, true);
FileOutputFormat.setOutputPath(job1, outpath);
// 构造第二阶段的基本job对象job2
Configuration conf2 = new Configuration();
conf2.set("Global_N", "5");
Job job2 = Job.getInstance(conf2,"stepTwo");
job2.setJarByClass(SingleUserPaymentTopN.class);
//Mapper
job2.setMapperClass(SingleUserPaymentTopNStepTwoMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
//Reducer
job2.setReducerClass(SingleUserPaymentTopNStepTwoReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setNumReduceTasks(1);
//输入路径
FileInputFormat.setInputPaths(job2, "D:\\test\\tmp\\SingleUserPaymentTopNout");
job1.setInputFormatClass(TextInputFormat.class);
//输出路径
Path outpath2 = new Path("D:\\test\\tmp\\SingleUserPaymentTopNout2");
outpath.getFileSystem(conf2).delete(outpath2, true);
FileOutputFormat.setOutputPath(job2, outpath2);
// ControlledJob是基本的job的封装
ControlledJob controlledJob1 = new ControlledJob(conf1);
// 将job1封装到controlledJob1中去
controlledJob1.setJob(job1);
ControlledJob controlledJob2 = new ControlledJob(conf2);
// 将job2封装到controlledJob2中去
controlledJob2.setJob(job2);
// 先构造一个job控制器
JobControl jobControl = new JobControl("index");
// 指定两个job之间的依赖关系
controlledJob2.addDependingJob(controlledJob1);
// 向job控制器中添加job
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);
// 创建一个线程去启动jobControl
Thread thread = new Thread(jobControl);
thread.start();
// 如果job没有运行完,主线程就等等
while (!jobControl.allFinished()) {
thread.sleep(500);
}
int succeedSize = jobControl.getSuccessfulJobList().size();
//0正常退出 1异常退出
System.exit(succeedSize == 2 ? 0 : 1);
}
}
原文地址:http://blog.51cto.com/2951890/2156610