2018-08-08 期 MapReduce实现单个商品支付金额最大的前N个用户排行(TopN)

package cn.sjq.mr.sort;

import java.io.FileOutputStream;

import java.io.IOException;

import java.util.Comparator;

import java.util.Random;

import java.util.TreeSet;

import java.util.UUID;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* MapReduce实现单个商品支付金额最大的前N个用户排行(TopN)

* 输入数据:

* order.data1...10 10个订单文件,每个文件5000-10000条的购买记录,格式如下:

* orderid userid  payment productid

* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185

c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117

1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124

e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197

......

最终输出数据(TopN):

单个商品支付金额最大的前10个用户排行

用户ID 商品ID 支付总额

U10102000178 21010139 38084

U10102000171 21010130 37329

U10102000113 21010191 34700

U10102000102 21010124 34523

U10102000167 21010118 33939

U10102000184 21010156 33870

U10102000129 21010137 32839

U10102000115 21010124 32793

U10102000145 21010199 32630

U10102000123 21010149 32328

实现逻辑:本程序的实现逻辑同统计热销商品实现逻辑类似

* 实现逻辑:

* Mapper端:

* (1)实现数据分片,将读取的数据分片通过map方法处理后输出到Combiner

* (2)数据的输出格式<k2:userid+productid> <v2:payment>

* <k2>Text <v2>Intwritable

* U10102000139&21010185 <1008>

* U10102000150&21010185 <1357>

* U10102000139&21010185 <2310>

* ... ...

* Combiner端:

* (1)Combiner是一种特殊的Reducer,使用Combiner需要注意不要改变程序原有逻辑,且保障Mapper端和Reducer端的数据类型一致

* (2)这里使用Combiner主要是为了实现

* 1)每个用户每个商品支付金额总和

* 2)通过在Combiner端对每个用户下同一个商品进行payment求和,这块可以大大减少数据在网络中传输,同时提高Reducer程序的执行效率

* (3)处理后数据输出格式如下:

* <k2`> <v2`>

* U10102000139&21010185 <20202>

* U10102000150&21010176 <11422>

* U10102000139&21010154 <10132>

* ... ...

* 注意:这里输出为局部TopN排行

*

* Reducer端:

* (1)Reducer端主要对Combiner端输出的多个局部排行的TopN条数据进行全局排行汇总

* (2)由于最终输出只会到一个文件,因此需要保障Reducer Tasks任务数为1

* (3)通过Reducer处理后,最终输出为

* <k3> <v4>

* U10102000139&21010185 <39872>

* U10102000150&21010176 <21422>

* U10102000139&21010154 <10132>

* ... ...

* @author songjq

*

*/

public class UserPaymentTopN {

/**

* 数据来源:

* 利用Java代码构造简单的订单数据,这里构造多个数据文件,每个文件5000-10000行数据

* 构造数据格式:

*  orderid ,userid, payment, productid

53d419fa-0df4-4b6d-8214-dac158bf33e7,U10102000186, 2008, 210100

7a200107-1711-4f83-a09d-76b21ef37575,U10102000182, 1155, 210100

367d1d43-2a38-48a1-a3bc-9065d215f093,U10102000177, 1951, 210100

6082506e-0cfb-47e2-902b-f5cbceac4a21,U10102000121, 2619, 210100

通过该程序,我们就构造了10个order.data文件,且每个文件中数据为5000-10000行

如果要通过MapReduce来对payment进行TopN排行,那数据的数据量为5万-10万行,足以支持我们的测试。

* @author songjq

*

*/

public static class OrderData {

public static void main(String[] args) throws Exception {

for(int i=0;i<10;i++) {

FileOutputStream out = new FileOutputStream("D:\\test\\tmp\\userTopN\\order.data"+(i+1));

int lines = 5000+new Random().nextInt(5000);

int count = 0;

while(count<lines) {

//订单ID,采用UUID是为了防止生成在多个文件的中订单ID全局唯一

UUID uuid = UUID.randomUUID();

//商品支付金额

int payment = 1000+new Random().nextInt(2000);

//用户ID,随机构造100-200之间编号的用户,用户数<=100

int userid = 100+new Random().nextInt(100);

//产品ID,随机构造100-200之间编号的商品,商品数<=100

int productId = 100+new Random().nextInt(100);

String orderdata = uuid+",U10102000"+userid+","+payment+",21010"+productId+"\n";

out.write(orderdata.getBytes());

count++;

}

out.flush();

out.close();

}

}

}

/**

* Mapper端:

* (1)实现数据分片,将读取的数据分片通过map方法处理后输出到Combiner

* (2)数据的输出格式<k2:userid+productid> <v2:payment>

* <k2>Text <v2>Intwritable

* U10102000139&21010185 <1008>

* U10102000150&21010185 <1357>

* U10102000139&21010185 <2310>

* ... ...

* @author songjq

*

*/

static class UserPaymentTopNMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text tkey = new Text();

private LongWritable tvalue = new LongWritable();

/*

* 读取文件分片,并处理后输出到Combiner

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//读入一行数据

String line = v1.toString();

//分词处理

String[] order = line.split(",");

if(null!=order && order.length == 4) {

//用户编码ID

String userid = order[1];

//商品ID

String productId = order[3];

//商品金额

long payment = Long.valueOf(order[2]);

//<k2>

tkey.set(userid+"&"+productId);

//<v2>

tvalue.set(payment);

//通过context将数据传递到Combiner

context.write(tkey, tvalue);

}else {

return;

}

}

}

/**

* Combiner端:

* (1)Combiner是一种特殊的Reducer,使用Combiner需要注意不要改变程序原有逻辑,且保障Mapper端和Reducer端的数据类型一致

* (2)这里使用Combiner主要是为了实现

* 1)每个用户每个商品支付金额总和

* 2)通过在Combiner端对每个用户下同一个商品进行payment求和,这块可以大大减少数据在网络中传输,同时提高Reducer程序的执行效率

* (3)处理后数据输出格式如下:

* <k2`> <v2`>

* U10102000139&21010185 <20202>

* U10102000150&21010176 <11422>

* U10102000139&21010154 <10132>

* ... ...

* 注意:这里输出为局部TopN排行

* @author songjq

*

*/

static class UserPaymentTopNCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{

@Override

protected void reduce(Text k3_, Iterable<LongWritable> v3_, Context ctx)

throws IOException, InterruptedException {

//商品次数求和

long count = 0;

for(LongWritable val:v3_) {

count += val.get();

}

ctx.write(k3_, new LongWritable(count));

}

}

/**

* Reducer端:

* (1)Reducer端主要对Combiner端输出的多个局部排行的TopN条数据进行全局排行汇总

* (2)由于最终输出只会到一个文件,因此需要保障Reducer Tasks任务数为1

* (3)通过Reducer处理后,最终输出为

* <k3> <v4>

* U10102000139&21010185 <39872>

* U10102000150&21010176 <21422>

* U10102000139&21010154 <10132>

* ... ...

* @author songjq

*

*/

static class UserPaymentTopNReducer extends Reducer<Text, LongWritable, Text, Text>{

//实现思路和Combiner一致

private TreeSet<String[]> treeSet = null;

//全局前N条商品排名

private Integer N = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

//获取全局N

N = Integer.valueOf(context.getConfiguration().get("Global_N"));

//实例化treeSet,并对其内容按照商品购买次数进行排序

treeSet = new TreeSet<String[]>(new Comparator<String[]>() {

@Override

public int compare(String[] o1, String[] o2) {

long payment1 = Long.valueOf(o1[1]);

long payment2 = Long.valueOf(o2[1]);

int result = 0;

if(payment1>payment2) {

result = -1;

}else if(payment1<payment2) {

result = 1;

}

return result;

}

});

}

/*

* 对Combiner输出的数据进行全局排行

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void reduce(Text k3, Iterable<LongWritable> v3,

Context ctx) throws IOException, InterruptedException {

//汇总Combiner任务输出过来的商品次数

long payment_total = 0;

for(LongWritable val:v3) {

payment_total+=val.get();

}

String[] arys = {k3.toString(),String.valueOf(payment_total)};

treeSet.add(arys);

//treeSet超过N条记录,则删除最后一个节点

if(treeSet.size()>N) {

treeSet.pollLast();

}

}

/*

* reduce方法结束后执行,这里将treeSet结果集写到HDFS

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

context.write(new Text("单个商品支付金额最大的前"+N+"个用户排行"), new Text());

context.write(new Text("用户ID\t\t\t商品ID\t\t"), new Text("支付总额"));

for(String[] ary:treeSet) {

String[] arr = ary[0].split("&");

context.write(new Text(arr[0]+"\t"+arr[1]), new Text(ary[1]));

}

}

}

/**

* 提交任务Job

* @throws Exception

*/

@Test

public void UserPaymentTopNJob() throws Exception {

Configuration conf = new Configuration();

conf.set("Global_N", "10");

Job job = Job.getInstance(conf);

job.setJarByClass(UserPaymentTopN.class);

//Mapper

job.setMapperClass(UserPaymentTopNMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

//Combiner

job.setCombinerClass(UserPaymentTopNCombiner.class);

//Reducer

job.setReducerClass(UserPaymentTopNReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

//必须设置为1

job.setNumReduceTasks(1);

//输入路径

FileInputFormat.setInputPaths(job, "D:\\test\\tmp\\userTopN");

job.setInputFormatClass(TextInputFormat.class);

//输出路径

Path outpath = new Path("D:\\test\\tmp\\UserPaymentTopNout");

outpath.getFileSystem(conf).delete(outpath, true);

FileOutputFormat.setOutputPath(job, outpath);

job.waitForCompletion(true);

}

}

原文地址:http://blog.51cto.com/2951890/2156408

时间: 2024-08-30 05:55:40

2018-08-08 期 MapReduce实现单个商品支付金额最大的前N个用户排行(TopN)的相关文章

2018-08-09期 MapReduce实现对单个用户支付金额最大的前N个商品排名

package cn.sjq.mr.sort; import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.Random; import java.util.TreeSet; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache

2018-08-07 期 MapReduce模拟实现热销商品排行

package cn.sjq.mr.sort; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoo

2014/08/08 – Backbonejs

[来自: Backbone.js 开发秘笈 第5章] Event API: (function ($) { //define ------------------------- var obj = {}; var obj2 = { commonEvent: function () { window.document.title = new Date().toString(); } }; //扩展对象包含事件 _.extend(obj, Backbone.Events); _.extend(obj

饥饿疗法是目前唯一确信能够延缓衰老的办法:4星|《三联生活周刊》2018年3期

三联生活周刊·人类到底能活多久:抗衰老科学指南(2018年3期) 本期主题是抗衰老,科学记者袁越走访了全球抗衰老研究的顶级机构,把这个领域最前沿的进展深入浅出地展现出来,非常有价值.这一类报道也是国内比较稀缺的. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码,[]中是我根据上下文补充的信息: 1:2016年世界人均一次性能源消费量为1.87吨油当量,中国为2.25吨,相比十几年前还不足1吨有了飞跃,但只相当于经合组织(OECD)4.5吨的一半.OECD目前有34

沈阳当年对学校承认了他和高岩的性关系:3星|《三联生活周刊》2018年16期

三联生活周刊·教授的权力:高校内的不平等关系(2018年16期) 本期主题是高校教师性侵学生的调查与思考. 总体评价3星,有参考价值. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:自第二次世界大战以来,以色列制造的暗杀事件比任何西方国家都多.该国领导人甚至认为通过杀戮指定的目标保护其国家安全,危害无辜平民的生命是合情合理的.#52 2:旅游业难以聚集大量的财富,给其从业者带来的回报也有限,这就是为什么海南成了高消费的代名词,可当地人收入却普遍不高的原因.这也可以说是资源

小程序购物车下架商品实时显示,只为更好用户体验!

单商户小程序V1.8.4版本更新说明更新时间:2018年9月10号 一. 更新功能清单1.新增仅支持自提功能设置,用户下单仅可选到店自提!2.小程序前台优化购物车页面,已删除或已下架的商品,其状态展示在购物车提醒用户:3.商家后台总览页面新增常用入口:4.商家后台DIY活动组件增加是否显示参与人数设置:5.商加后台新增搜索自定义分销商品功能:6.商城后台添加服务商品处新增服务商品提交订单页的温馨提示自定义:二. 更新功能详细说明1. 增加仅支持自提功能设置,用户下单仅可选到店自提! 优化目的:很

2018.01.08 python

感想:我的内心毫无波澜 难题:不会英语是硬伤 # 成绩条目类 class Score: lesson_name = "课程名" score = 0 # 分数 # 当成绩单初始化时,需要提供这个成绩单的两个属性的值 def __init__(self,lesson_name,score): self.lesson_name = lesson_name # 初始化对象的时候,就把课程名提供给成绩单 self.score = score # 同理, 也需要提供成绩的值 # 1.获取当前成绩数

2018.07.08

我得小企鹅生病了,可是我却没钱给他买药. 现实中好多事情都是这样无能为力的. 当初保研陷入两难,如今读博也是,工作也是. 世界上本身就不存在两全齐美的. 刚刚给后导师打电话道歉,他说  你爱去哪儿去哪儿.不能开题别怪我. 可是,江苏对我来说就像个心结一样,我是真的想去看看啊. 虽然,他的任何情绪都与我无关. 这次回来,真的要好好努力啊,我得研究生生活怎么会这样呢...... 原文地址:https://www.cnblogs.com/guojincheng666895/p/9281284.html

记 2018/11/08 面向对象基础学习 1#

一.类与对象概述 1.为了把日常生活中实物用学习语言描述出来 2.如何描述现实事物 属性:就是该事物的描述信息(名词) 行为:就是该事物能够做什么(动词) 3.Java中最基本单位是类 成员变量:事物的属性 成员方法:事物的行为 4.定义类其实就是定义类的成员(成员变量和成员方法 ) 成员变量 和在类中,方法外 成员方法 去掉satatic 5.类和对象的概念 类: 是一组相关的属性和行为的集合 对象: 是该类事物的具体体现 类定义实例: class Student String name; i