2018-08-07 期 MapReduce模拟实现热销商品排行

package cn.sjq.mr.sort;

import java.io.IOException;

import java.util.Comparator;

import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* MapReduce实现热销商品TopN排行

* 这里按照商品购买次数排名在前面的为热销商品

* 输入数据:

* order.data1...10 10个订单文件,每个文件5000-10000条的购买记录,格式如下:

* orderid userid  payment productid

* c13a009e-a950-42f6-8eab-8e28d1406fe0,U10102000139,1008, 21010185

c5d2a564-619c-4e1a-a350-7ce981bbc22d,U10102000191,1357, 21010117

1d762edd-0044-4773-a413-ab0440081b1e,U10102000150,2173, 21010124

e82c2848-6d6e-4fdf-8d7d-83059376846b,U10102000162,2310, 21010197

......

* 最终输出数据(TopN):

热销商品排行Top10

商品ID 销售数量

21010129 871

21010182 852

21010153 839

21010131 837

21010142 835

21010159 833

21010117 830

21010110 828

21010141 824

21010198 823

*

* 实现逻辑:

* Mapper端:

* (1)实现数据分片,将读取的数据分片通过map方法处理后输出到Combiner

* (2)数据的输出格式

* <k2>Text <v2>Intwritable

* 21010185 <1>

* 21010117 <1>

* 21010185 <1>

* ... ...

* Combiner端:

* (1)Combiner是一种特殊的Reducer,使用Combiner需要注意不要改变程序原有逻辑,且保障Mapper端和Reducer端的数据类型一致

* (2)这里使用Combiner主要是为了实现

* 1)每个商品购买次数求和

* 2)对于每个局部的Combiner任务,对接收到Mapper端输出的数据处理后进行局部TopN排行,这样可以避免不必要的数据传递到Reducer端,同时提高Reducer程序的执行效率

* (3)TopN中的N由Hadoop的configuration中set(K,V)来设置,这样可以保障运行在各个机器上的任务可以获取到这个全局唯一的N值

* (4)处理后数据输出格式如下:

* <k2`> <v2`>

* 21010185 <30>

* 21010117 <20>

* ... ...

* 注意:这里输出为局部TopN排行

*

* Reducer端:

* (1)Reducer端主要对Combiner端输出的多个局部排行的TopN条数据进行全局排行汇总

* (2)由于最终输出只会到一个文件,因此需要保障Reducer Tasks任务数为1

* (3)通过Reducer处理后,最终输出为

* <k3> <v4>

* 21010185 <30>

* 21010117 <20>

* ... ...

*

* @author songjq

*

*/

public class HotProductTopN {

/**

* Mapper端:

* (1)实现数据分片,将读取的数据分片通过map方法处理后输出到Combiner

* (2)数据的输出格式

* <k2>Text <v2>Intwritable

* 21010185 <1>

* 21010117 <1>

* 21010185 <1>

* ... ...

* @author songjq

*

*/

static class HotProductTopNMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

private Text tkey = new Text();

private IntWritable tvalue = new IntWritable();

/*

* 读取文件分片,并处理后输出到Combiner

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//读入一行数据

String line = v1.toString();

//分词处理

String[] order = line.split(",");

if(null!=order && order.length == 4) {

//商品ID

String productId = order[3];

tkey.set(productId);

tvalue.set(1);

//通过context将数据传递到Combiner

context.write(tkey, tvalue);

}else {

return;

}

}

}

/**

*  * Combiner端:

* (1)Combiner是一种特殊的Reducer,使用Combiner需要注意不要改变程序原有逻辑,且保障Mapper端和Reducer端的数据类型一致

* (2)这里使用Combiner主要是为了实现

* 1)每个商品购买次数求和

* 2)对于每个局部的Combiner任务,对接收到Mapper端输出的数据处理后进行局部TopN排行,这样可以避免不必要的数据传递到Reducer端,同时提高Reducer程序的执行效率

* (3)TopN中的N由Hadoop的configuration中set(K,V)来设置,这样可以保障运行在各个机器上的任务可以获取到这个全局唯一的N值

* (4)处理后数据输出格式如下:

* <k2`> <v2`>

* 21010185 <30>

* 21010117 <20>

* ... ...

* 注意:这里输出为局部TopN排行

* @author songjq

*

*/

static class HotProductTopNCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

private TreeSet<String[]> treeSet = null;

//全局前N条商品排名

private Integer N = null;

/*

* 初始化方法,在reduce方法调用前执行,只会被执行一次

* 通过该方法,我们可以获取全局N变量的值,且可以初始化TopN的treeset集合。

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

protected void setup(Context context) throws IOException, InterruptedException {

//获取全局N

N = Integer.valueOf(context.getConfiguration().get("Global_N"));

//实例化treeSet,并对其内容按照商品购买次数进行排序

treeSet = new TreeSet<String[]>(new Comparator<String[]>() {

@Override

public int compare(String[] o1, String[] o2) {

Integer count1 = Integer.valueOf(o1[1]);

Integer count2 = Integer.valueOf(o2[1]);

int result = 0;

if(count1>count2) {

result = -1;

}else if(count1<count2) {

result = 1;

}

return result;

}

});

}

/*

* 对相同的ProductId求和,并将其加到treeSet集合,treeSet只存放排名TopN的N条商品

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void reduce(Text k3_, Iterable<IntWritable> v3_, Context ctx)

throws IOException, InterruptedException {

//商品次数求和

Integer count = 0;

for(IntWritable val:v3_) {

count += val.get();

}

//将商品放入treeSet集合

String[] arys = {k3_.toString(),count.toString()};

treeSet.add(arys);

//treeSet记录超过N条,就删除最后一条数据

if(treeSet.size()>N) {

treeSet.pollLast();

}

}

/*

* cleanup在reduce调用结束后执行

* 这里利用cleanup方法将treeSet集合中数据写出去

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void cleanup(Context context) throws IOException, InterruptedException {

for(String[] ary:treeSet) {

context.write(new Text(ary[0]), new IntWritable(Integer.valueOf(ary[1])));

}

}

}

/**

*  * Reducer端:

* (1)Reducer端主要对Combiner端输出的多个局部排行的TopN条数据进行全局排行汇总

* (2)由于最终输出只会到一个文件,因此需要保障Reducer Tasks任务数为1

* (3)通过Reducer处理后,最终输出为

* <k3> <v4>

* 21010185 <30>

* 21010117 <20>

* ... ...

* @author songjq

*

*/

static class HotProductTopNReducer extends Reducer<Text, IntWritable, Text, Text>{

//实现思路和Combiner一致

//存放TopN记录  HashMap<"ProductId", count>

private TreeSet<String[]> treeSet = null;

//全局前N条商品排名

private Integer N = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

//获取全局N

N = Integer.valueOf(context.getConfiguration().get("Global_N"));

//实例化treeSet,并对其内容按照商品购买次数进行排序

treeSet = new TreeSet<String[]>(new Comparator<String[]>() {

@Override

public int compare(String[] o1, String[] o2) {

Integer count1 = Integer.valueOf(o1[1]);

Integer count2 = Integer.valueOf(o2[1]);

int result = 0;

if(count1>count2) {

result = -1;

}else if(count1<count2) {

result = 1;

}

return result;

}

});

}

/*

* 对Combiner输出的数据进行全局排行

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void reduce(Text k3, Iterable<IntWritable> v3,

Context ctx) throws IOException, InterruptedException {

//汇总Combiner任务输出过来的商品次数

int count = 0;

for(IntWritable val:v3) {

count+=val.get();

}

String[] arys = {k3.toString(),String.valueOf(count)};

treeSet.add(arys);

//treeSet超过N条记录,则删除最后一个节点

if(treeSet.size()>N) {

treeSet.pollLast();

}

}

/*

* reduce方法结束后执行,这里将treeSet结果集写到HDFS

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#cleanup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

context.write(new Text("热销商品排行Top"+N), new Text());

context.write(new Text("商品ID"), new Text("销售数量"));

for(String[] ary:treeSet) {

context.write(new Text(ary[0]), new Text(ary[1]));

}

}

}

/**

* 提交任务Job

* @throws Exception

*/

@Test

public void HotProductTopNJob() throws Exception {

Configuration conf = new Configuration();

conf.set("Global_N", "10");

Job job = Job.getInstance(conf);

job.setJarByClass(HotProductTopN.class);

//Mapper

job.setMapperClass(HotProductTopNMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//Combiner

job.setCombinerClass(HotProductTopNCombiner.class);

//Reducer

job.setReducerClass(HotProductTopNReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

//必须设置为1

job.setNumReduceTasks(1);

//输入路径

FileInputFormat.setInputPaths(job, "D:\\test\\tmp\\userTopN");

job.setInputFormatClass(TextInputFormat.class);

//输出路径

Path outpath = new Path("D:\\test\\tmp\\TopNout");

outpath.getFileSystem(conf).delete(outpath, true);

FileOutputFormat.setOutputPath(job, outpath);

job.waitForCompletion(true);

}

}

原文地址:http://blog.51cto.com/2951890/2155538

时间: 2024-08-28 11:19:46

2018-08-07 期 MapReduce模拟实现热销商品排行的相关文章

【转】【Android UI设计与开发】第07期:底部菜单栏(二)Fragment的详细介绍和使用方法

原始地址:http://blog.csdn.net/yangyu20121224/article/category/1431917/1 由于TabActivity在Android4.0以后已经被完全弃用,那么我就不再浪费口水继续讲解它了,取而代之的是Fragment.Fragment是Android3.0新增的概念,Fragment翻译成中文是碎片的意思,不过却和Activity十分的相似,这一篇我花大量的篇幅来详细的讲解Fragment的介绍和使用方法. 一.Fragment的基础知识介绍  

Bootstrap 3.2.0 源码试读 2014/08/07

第一部分 normalize.css 70至72行 small {   font-size: 80%; } 设置small标签的字体大小为父容器字体的80%. 73至79行 sub, sup {   position: relative;   font-size: 75%;   line-height: 0;   vertical-align: baseline; } sup {   top: -.5em; } sub {   bottom: -.25em; } 先设置上标sup及下标sub,位

饥饿疗法是目前唯一确信能够延缓衰老的办法:4星|《三联生活周刊》2018年3期

三联生活周刊·人类到底能活多久:抗衰老科学指南(2018年3期) 本期主题是抗衰老,科学记者袁越走访了全球抗衰老研究的顶级机构,把这个领域最前沿的进展深入浅出地展现出来,非常有价值.这一类报道也是国内比较稀缺的. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码,[]中是我根据上下文补充的信息: 1:2016年世界人均一次性能源消费量为1.87吨油当量,中国为2.25吨,相比十几年前还不足1吨有了飞跃,但只相当于经合组织(OECD)4.5吨的一半.OECD目前有34

沈阳当年对学校承认了他和高岩的性关系:3星|《三联生活周刊》2018年16期

三联生活周刊·教授的权力:高校内的不平等关系(2018年16期) 本期主题是高校教师性侵学生的调查与思考. 总体评价3星,有参考价值. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:自第二次世界大战以来,以色列制造的暗杀事件比任何西方国家都多.该国领导人甚至认为通过杀戮指定的目标保护其国家安全,危害无辜平民的生命是合情合理的.#52 2:旅游业难以聚集大量的财富,给其从业者带来的回报也有限,这就是为什么海南成了高消费的代名词,可当地人收入却普遍不高的原因.这也可以说是资源

cocopods 镜像源更新 时间 2018年07月份

注意:本次更镜像源地址新时间为2018年07月份左右 如果您阅读本文章的时间与此时间相差太久.则可能没有参考价值. pod 的国内镜像源由原来的 https://gems.ruby-china.org 变更为  https://gems.ruby-china.com 变更的时间应该在 2018年07月份左右.如果你的pod无法更新很可能需要修改镜像源. 在命令行中输入以下代码更换数据源 1. 更换数据源 $ gem sources --add https://gems.ruby-china.co

新手C#string类常用函数的学习2018.08.04

ToLower()用于将字符串变为小写,注意字符串的不可变特性,需要重新赋值给另一个字符串变量. s = s.ToLower();//字符串具有不可变性,转换后需要重新赋值,不可仅有s.ToLower(); 这可以使用户的输入不区分大小写,例如验证码. ToUpper()用于将字符串全部变为大写,与上面类似. Trim()可以用于去掉两边的空格. string s1 = " a b c "; s1 = s1.Trim();//用于去除字符串两边的空格 Console.WriteLine

2018-08-08 期 MapReduce实现单个商品支付金额最大的前N个用户排行(TopN)

package cn.sjq.mr.sort; import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.Random; import java.util.TreeSet; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache

Intel Digital Innovation Industry Summit(2018.08.17)

时间:2018.08.17地点:北京金隅喜来登大酒店 原文地址:https://www.cnblogs.com/xuefeng1982/p/10331638.html

2019.08.07学习整理

2019.08.07学习整理 字符编码 1.什么是字符编码 字符编码是将人类的字符编码成计算机能识别的数字,这种转换必须遵循一套固定的标准,该标准无非是人类字符与数字的对应关系,称之为字符编码表. 2.字符编码发展史与分类 计算机由美国人发明,最早的字符编码为ASCII,只规定了英文字母数字和一些特殊字符与数字的对应关系.最多只能用 8 位来表示(一个字节),即:2**8 = 256,所以,ASCII码最多只能表示 256 个符号. 当然我们编程语言都用英文没问题,ASCII够用,但是在处理数据