2018-08-09期 MapReduce实现对单个用户支付金额最大的前N个商品排名

package cn.sjq.mr.sort;

import java.io.FileOutputStream;

import java.io.IOException;

import java.util.Comparator;

import java.util.Random;

import java.util.TreeSet;

import java.util.UUID;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* MapReduce实现对单个用户支付金额最大的前N个商品排名

* 输入数据:

* order.data1...10 10个订单文件,每个文件5000-10000条的购买记录,格式如下:

* orderid userid  payment productid

* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185

c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117

1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124

e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197

......

处理后最终输出的数据(假设N=5),输出排名前五的商品按升序排列:

U10102000100[Top5]

21010129 27530

21010132 28404

21010121 30183

21010176 31576

21010109 32166

U10102000101[Top5]

21010165 26643

21010150 26826

21010186 27288

21010163 32127

21010121 34032

U10102000102[Top5]

21010138 24253

21010116 24641

21010140 25890

21010174 27297

21010123 28498

U10102000103[Top5]

21010178 28687

21010104 29050

21010166 29114

21010159 33205

21010184 39350

......

U10102000198[Top5]

21010155 27789

21010186 28571

21010178 28947

21010159 31851

21010124 32765

U10102000199[Top5]

21010141 27624

21010112 27890

21010173 29424

21010138 30896

21010154 35204

实现逻辑:

(1)由于要实现对每个用户购买金额最大的前N个商品排名,因此这里将每一个用户输出到一个文件【假设100个用户,就有100个用户的排名文件】

(2)由于每个用户最终输出到一个文件,因此Reduce Tasks数量需要>=用户数

Mapper端(第一阶段):

(1)对读入的数据分片进行处理

(2)输出格式如下:

<k2> <v2>

userid&productId payment

自定义分区Partitioner:

目的是对每个用户分配独立的分区进行处理,处理后输出到独立文件

Reuder端(第一阶段):

(1)对Mapper传过来的用户商品进行求和

(2)对商品按照支付金额降序排序,且输出前TopN个用户购买金额最大的商品

(2)输出格式

<k4> <k4>

<userid,productId> <payment_total>

Mapper端(第二阶段)

(1)读取第一阶段Reducer输出用户排行文件

(2)处理后输出到第二阶段Reducer

Reducer端(第二阶段)

(1)接收第二阶段Mapper传递过来的数据

(2)将数据进行格式处理后输出到HDFS

* @author songjq

*

*/

public class SingleUserPaymentTopN {

/**

* 数据来源:

* 利用Java代码构造简单的订单数据,这里构造多个数据文件,每个文件5000-10000行数据

* 构造数据格式:

*  orderid ,userid, payment, productid

53d419fa-0df4-4b6d-8214-dac158bf33e7,U10102000186, 2008, 210100

7a200107-1711-4f83-a09d-76b21ef37575,U10102000182, 1155, 210100

367d1d43-2a38-48a1-a3bc-9065d215f093,U10102000177, 1951, 210100

6082506e-0cfb-47e2-902b-f5cbceac4a21,U10102000121, 2619, 210100

通过该程序,我们就构造了10个order.data文件,且每个文件中数据为5000-10000行

如果要通过MapReduce来对payment进行TopN排行,那数据的数据量为5万-10万行,足以支持我们的测试。

* @author songjq

*

*/

public static class OrderData {

public static void main(String[] args) throws Exception {

for(int i=0;i<10;i++) {

FileOutputStream out = new FileOutputStream("D:\\test\\tmp\\userTopN\\order.data"+(i+1));

int lines = 5000+new Random().nextInt(5000);

int count = 0;

while(count<lines) {

//订单ID,采用UUID是为了防止生成在多个文件的中订单ID全局唯一

UUID uuid = UUID.randomUUID();

//商品支付金额

int payment = 1000+new Random().nextInt(2000);

//用户ID,随机构造100-200之间编号的用户,用户数<=100

int userid = 100+new Random().nextInt(100);

//产品ID,随机构造100-200之间编号的商品,商品数<=100

int productId = 100+new Random().nextInt(100);

String orderdata = uuid+",U10102000"+userid+","+payment+",21010"+productId+"\n";

out.write(orderdata.getBytes());

count++;

}

out.flush();

out.close();

}

}

}

/**

* Mapper端(第一阶段):

(1)对读入的数据分片进行处理

(2)输出格式如下:

<k2> <v2>

userid&productId payment

* @author songjq

*

*/

static class SingleUserPaymentTopNStepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text tkey = new Text();

private LongWritable tvalue = new LongWritable();

@Override

protected void map(LongWritable k1, Text v1,Context context)

throws IOException, InterruptedException {

String line = v1.toString();

String[] order = line.split(",");

tkey.set(order[1]+"&"+order[3]);

tvalue.set(Long.valueOf(order[2]));

context.write(tkey, tvalue);

}

}

/**

* Reuder端(第一阶段):

(1)对Mapper传过来的用户商品进行求和

(2)对商品按照支付金额降序排序,且输出前TopN个用户购买金额最大的商品

(2)输出格式

<k4> <k4>

<userid,productId> <payment_total>

* @author songjq

*

*/

static class SingleUserPaymentTopNStepOneReducer extends Reducer<Text, LongWritable, NullWritable, Text>{

//实现思路和Combiner一致

private TreeSet<String[]> treeSet = null;

//全局前N条商品排名

private Integer N = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

//获取全局N

N = Integer.valueOf(context.getConfiguration().get("Global_N"));

//实例化treeSet,并对其内容按照商品购买次数进行排序

treeSet = new TreeSet<String[]>(new Comparator<String[]>() {

@Override

public int compare(String[] o1, String[] o2) {

long payment1 = Long.valueOf(o1[1]);

long payment2 = Long.valueOf(o2[1]);

int result = 0;

if(payment1>payment2) {

result = -1;

}else if(payment1<payment2) {

result = 1;

}

return result;

}

});

}

@Override

protected void reduce(Text k3, Iterable<LongWritable> v3,

Context ctx) throws IOException, InterruptedException {

long total_payment = 0;

for(LongWritable val:v3) {

total_payment += val.get();

}

String[] arys = {k3.toString(),String.valueOf(total_payment)};

treeSet.add(arys);

//超过N条记录,则将最后一条移除

if(treeSet.size()>N) {

treeSet.pollLast();

}

}

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

for(String[] ary:treeSet) {

String[] arr = ary[0].split("&");

context.write(NullWritable.get(), new Text(arr[0]+","+arr[1]+","+ary[1]));

}

}

}

/**

* 自定义分区Partitioner

* 目的是对每个用户分配独立的分区进行处理,处理后输出到独立文件

* @author songjq

*

*/

static class SingleUserPaymentTopNPartitioner extends Partitioner<Text, LongWritable>{

@Override

public int getPartition(Text k2, LongWritable v2, int reduceTasks) {

/*

* <k2>的数据格式为UserId&ProductId,比如U10102000103&21010177

* 在我们构造数据时UserId的生成机制为"固定部分"+"可变部分","U10102000"+"103"

* "可变部分"又由 100+new Random().nextInt(100),new Random().nextInt(100)表示在100以内随机生成整数

* 这样就把用户现在在了100以内,因此我们reduceTasks设置为100就能保证每个userID都有对于Reduce任务进行处理。

* 因此对于输入的每个UserId,只要将new Random().nextInt(100)这部分作为分区号返回即可。

*/

String[] split = k2.toString().split("&");

String userid = split[0];

String lastTwoNumber = userid.substring(userid.length()-2,userid.length());

if(lastTwoNumber.startsWith("0")) {

lastTwoNumber = lastTwoNumber.substring(1);

}

return Integer.valueOf(lastTwoNumber);

}

}

/**

* 第二阶段Mapper

* 该Mapper不需要Reducer

* @author songjq

*

*/

static class SingleUserPaymentTopNStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] split = line.split(",");

String userid = split[0];

String productId = split[1];

String total_payment = split[2];

tkey.set(userid);

tvalue.set(productId+"\t"+total_payment);

context.write(tkey, tvalue);

}

}

/**

* 第二阶段Reducer

* @author songjq

*

*/

static class SingleUserPaymentTopNStepTwoReducer extends Reducer<Text, Text, Text, Text>{

private String N = null;

@Override

protected void setup(Context context) throws IOException, InterruptedException {

N = context.getConfiguration().get("Global_N");

}

@Override

protected void reduce(Text k3, Iterable<Text> v3, Context ctx)

throws IOException, InterruptedException {

ctx.write(new Text(k3.toString()+"[Top"+N+"]"), new Text(""));

for(Text val:v3) {

ctx.write(new Text("\t"), val);

}

}

}

@Test

public void SingleUserPaymentTopNJob() throws Exception {

// 构造第一阶段的基本job对象job1

Configuration conf1 = new Configuration();

conf1.set("Global_N", "5");

Job job1 = Job.getInstance(conf1,"stepOne");

job1.setJarByClass(SingleUserPaymentTopN.class);

//Mapper

job1.setMapperClass(SingleUserPaymentTopNStepOneMapper.class);

job1.setMapOutputKeyClass(Text.class);

job1.setMapOutputValueClass(LongWritable.class);

//Reducer

job1.setReducerClass(SingleUserPaymentTopNStepOneReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(Text.class);

//指定自定义分区

job1.setPartitionerClass(SingleUserPaymentTopNPartitioner.class);

//这里设置Reduce Tasks数量为100,>=订单中用户数量,保障每个用户都有一个对应的Reduce任务处理

job1.setNumReduceTasks(100);

//输入路径

FileInputFormat.setInputPaths(job1, "D:\\test\\tmp\\userTopN");

job1.setInputFormatClass(TextInputFormat.class);

//输出路径

Path outpath = new Path("D:\\test\\tmp\\SingleUserPaymentTopNout");

outpath.getFileSystem(conf1).delete(outpath, true);

FileOutputFormat.setOutputPath(job1, outpath);

// 构造第二阶段的基本job对象job2

Configuration conf2 = new Configuration();

conf2.set("Global_N", "5");

Job job2 = Job.getInstance(conf2,"stepTwo");

job2.setJarByClass(SingleUserPaymentTopN.class);

//Mapper

job2.setMapperClass(SingleUserPaymentTopNStepTwoMapper.class);

job2.setMapOutputKeyClass(Text.class);

job2.setMapOutputValueClass(Text.class);

//Reducer

job2.setReducerClass(SingleUserPaymentTopNStepTwoReducer.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);

job2.setNumReduceTasks(1);

//输入路径

FileInputFormat.setInputPaths(job2, "D:\\test\\tmp\\SingleUserPaymentTopNout");

job1.setInputFormatClass(TextInputFormat.class);

//输出路径

Path outpath2 = new Path("D:\\test\\tmp\\SingleUserPaymentTopNout2");

outpath.getFileSystem(conf2).delete(outpath2, true);

FileOutputFormat.setOutputPath(job2, outpath2);

// ControlledJob是基本的job的封装

ControlledJob controlledJob1 = new ControlledJob(conf1);

// 将job1封装到controlledJob1中去

controlledJob1.setJob(job1);

ControlledJob controlledJob2 = new ControlledJob(conf2);

// 将job2封装到controlledJob2中去

controlledJob2.setJob(job2);

// 先构造一个job控制器

JobControl jobControl = new JobControl("index");

// 指定两个job之间的依赖关系

controlledJob2.addDependingJob(controlledJob1);

// 向job控制器中添加job

jobControl.addJob(controlledJob1);

jobControl.addJob(controlledJob2);

// 创建一个线程去启动jobControl

Thread thread = new Thread(jobControl);

thread.start();

// 如果job没有运行完,主线程就等等

while (!jobControl.allFinished()) {

thread.sleep(500);

}

int succeedSize = jobControl.getSuccessfulJobList().size();

//0正常退出 1异常退出

System.exit(succeedSize == 2 ? 0 : 1);

}

}

原文地址:http://blog.51cto.com/2951890/2156610

时间: 2024-11-09 02:56:27

2018-08-09期 MapReduce实现对单个用户支付金额最大的前N个商品排名的相关文章

2018-08-08 期 MapReduce实现单个商品支付金额最大的前N个用户排行(TopN)

package cn.sjq.mr.sort; import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.Random; import java.util.TreeSet; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache

2018-08-07 期 MapReduce模拟实现热销商品排行

package cn.sjq.mr.sort; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoo

Bootstrap 3.2.0 源码试读 2014/08/09

第一部分 normalize.css 104至110行 code,    /* 编辑代码 */ kbd,    /* 键盘输入的文本 */ pre, samp {    /* 范例,sample的简写 */   font-family: monospace, monospace;    /* 这个地方应该是写错了,第二字体应该是serif */   font-size: 1em; } 设置字体的大小为1em,字体为monospace. 111至119行 button, input, optgro

MFC DAY06 07 08 09

一 切分窗口 1 类型 动态切分-程序在运行时,由用户拖动分隔条动态的切分窗口. 每一个视图窗口使用的是相同的视图类. 静态切分-在编码创建时已经完成窗口切分.每一个视图窗口 可以使用不同的视图类. 2 相关类 CSplitterWnd类-完成窗口切分的类. #include <afxext.h>//扩展窗口的头文件 3 使用 3.1 动态切分 3.1.1 在CMainFrame中定义切分窗口对象 3.1.2 通过使用CCreateContext结构指定使用的视图类 3.1.3 创建动态切分

饥饿疗法是目前唯一确信能够延缓衰老的办法:4星|《三联生活周刊》2018年3期

三联生活周刊·人类到底能活多久:抗衰老科学指南(2018年3期) 本期主题是抗衰老,科学记者袁越走访了全球抗衰老研究的顶级机构,把这个领域最前沿的进展深入浅出地展现出来,非常有价值.这一类报道也是国内比较稀缺的. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码,[]中是我根据上下文补充的信息: 1:2016年世界人均一次性能源消费量为1.87吨油当量,中国为2.25吨,相比十几年前还不足1吨有了飞跃,但只相当于经合组织(OECD)4.5吨的一半.OECD目前有34

沈阳当年对学校承认了他和高岩的性关系:3星|《三联生活周刊》2018年16期

三联生活周刊·教授的权力:高校内的不平等关系(2018年16期) 本期主题是高校教师性侵学生的调查与思考. 总体评价3星,有参考价值. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:自第二次世界大战以来,以色列制造的暗杀事件比任何西方国家都多.该国领导人甚至认为通过杀戮指定的目标保护其国家安全,危害无辜平民的生命是合情合理的.#52 2:旅游业难以聚集大量的财富,给其从业者带来的回报也有限,这就是为什么海南成了高消费的代名词,可当地人收入却普遍不高的原因.这也可以说是资源

新手C#string类常用函数的学习2018.08.04

ToLower()用于将字符串变为小写,注意字符串的不可变特性,需要重新赋值给另一个字符串变量. s = s.ToLower();//字符串具有不可变性,转换后需要重新赋值,不可仅有s.ToLower(); 这可以使用户的输入不区分大小写,例如验证码. ToUpper()用于将字符串全部变为大写,与上面类似. Trim()可以用于去掉两边的空格. string s1 = " a b c "; s1 = s1.Trim();//用于去除字符串两边的空格 Console.WriteLine

感染弓形虫的人更愿意创业:4星|《环球科学》2018年09月号

<环球科学>2018年09月号 有趣可信的美国科普杂志的中文版.本期我认为有趣的信息:1:感染弓形虫的人,更愿意创业:2:另一种理论认为暗物质并不存在,需要修改的是我们的引力方程:3:新泽西州发布了“蓝色英亩”项目,由州财政出钱购买一些频繁被洪水淹没的房屋4:白蚁巢使得土地更耐旱了:5:重做棉花糖实验,结果发现延迟满足和日后成就之间的关系要比原来认为的小得多. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:他们还发现,被弓形虫感染的学生选修商业的可能性,

Intel Digital Innovation Industry Summit(2018.08.17)

时间:2018.08.17地点:北京金隅喜来登大酒店 原文地址:https://www.cnblogs.com/xuefeng1982/p/10331638.html