使用hadoop实现关联商品统计

转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40184581

最近几天一直在看hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。

需求描述:

根据超市的销售清单,计算商品之间的关联程度(即统计同时买A商品和B商品的次数)。

数据格式:

超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:

需求分析:

采用hadoop中的mapreduce对该需求进行计算。

map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果如下图所示:

这里为了统计出和A、B两件商品想关联的商品,所以商品A、B之间的关系输出两条结果即 A-B、B-A。

reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:

通过map函数的处理,得到如下图所示的记录:

reduce中对map输出的value值进行分组计数,得到的结果如下图所示

将商品A B作为key,组合个数作为value输出,输出结果如下图所示:

对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现

代码实现:

关于代码就不做详细的介绍,具体参照代码之中的注释吧。

package com;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Test extends Configured implements Tool{

	/**
	 * map类,实现数据的预处理
	 * 输出结果key为商品A value为关联商品B
	 * @author lulei
	 */
	public static class MapT extends Mapper<LongWritable, Text, Text, Text> {
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
			String line = value.toString();
			if (!(line == null || "".equals(line))) {
				//分割商品
				String []vs = line.split(",");
				//两两组合,构成一条记录
				for (int i = 0; i < (vs.length - 1); i++) {
					if ("".equals(vs[i])) {//排除空记录
						continue;
					}
					for (int j = i+1; j < vs.length; j++) {
						if ("".equals(vs[j])) {
							continue;
						}
						//输出结果
						context.write(new Text(vs[i]), new Text(vs[j]));
						context.write(new Text(vs[j]), new Text(vs[i]));
					}
				}
			}
		}
	}

	/**
	 * reduce类,实现数据的计数
	 * 输出结果key 为商品A|B value为该关联次数
	 * @author lulei
	 */
	public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> {
		private int count;

		/**
		 * 初始化
		 */
		public void setup(Context context) {
			//从参数中获取最小记录个数
			String countStr = context.getConfiguration().get("count");
			try {
				this.count = Integer.parseInt(countStr);
			} catch (Exception e) {
				this.count = 0;
			}
		}
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
			String keyStr = key.toString();
			HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
			//利用hash统计B商品的次数
			for (Text value : values) {
				String valueStr = value.toString();
				if (hashMap.containsKey(valueStr)) {
					hashMap.put(valueStr, hashMap.get(valueStr) + 1);
				} else {
					hashMap.put(valueStr, 1);
				}
			}
			//将结果输出
			for (Entry<String, Integer> entry : hashMap.entrySet()) {
				if (entry.getValue() >= this.count) {//只输出次数不小于最小值的
					context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue()));
				}
			}
		}
	}

	@Override
	public int run(String[] arg0) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = getConf();
		conf.set("count", arg0[2]);

		Job job = new Job(conf);
		job.setJobName("jobtest");

		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(MapT.class);
		job.setReducerClass(ReduceT.class);

		FileInputFormat.addInputPath(job, new Path(arg0[0]));
		FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

		job.waitForCompletion(true);

		return job.isSuccessful() ? 0 : 1;

	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		if (args.length != 3) {
			System.exit(-1);
		}
		try {
			int res = ToolRunner.run(new Configuration(), new Test(), args);
			System.exit(res);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

上传运行:

将程序打包成jar文件,上传到机群之中。将测试数据也上传到HDFS分布式文件系统中。

命令运行截图如下图所示:

运行结束后查看相应的HDFS文件系统,如下图所示:

到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~

时间: 2024-10-12 21:46:57

使用hadoop实现关联商品统计的相关文章

实验二-3 Hadoop&amp;Paoding 中文词频统计

  参考教程 在Hadoop上使用庖丁解牛(较复杂,并未采用,可以之后试试) http://zhaolinjnu.blog.sohu.com/264905210.html Lucene3.3.Lucene3.4中文分词——庖丁解牛分词实例(屈:注意版本) http://www.360doc.com/content/13/0217/13/11619026_266124504.shtml 庖丁分词在hadoop上运行时的配置问题(采纳了一半,没有按照其所写配置dic属性文件) http://f.da

初学Hadoop之中文词频统计

1.安装eclipse 准备 eclipse-dsl-luna-SR2-linux-gtk-x86_64.tar.gz 安装 1.解压文件. 2.创建图标. ln -s /opt/eclipse/eclipse /usr/bin/eclipse #使符号链接目录 vim /usr/share/applications/eclipse.desktop #创建一个  Gnome 启动 添加如下代码: [Desktop Entry] Encoding=UTF-8 Name=Eclipse 4.4.2

【hadoop】 3002-mapreduce程序统计单词个数示例

一.新建文本文件wordcount.txt,并上传至hdfs服务器上 [[email protected] HDFSdemo]$ hadoop fs -cat /wc/wordcount.txt hello world hello China hello wenjie hello USA hello China hello China hello Japan [[email protected] HDFSdemo]$ hadoop fs -cat /wc/wordcount1.txt hello

流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计

小知识点: half:关机 yarn端口:8088 删除hdfs目录:hadoop fs -rm -r /wc/output namenode两个状态都是standby原因:zookeeper没有比hdfs先启动 现在来做一个流量统计的例子: 首先数据是这样一张表:见附件 统计:(代码) 1,flowbean: package cn.itcast.hadoop.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; imp

初学Hadoop之WordCount分词统计

1.WordCount源码 将源码文件WordCount.java放到Hadoop2.6.0文件夹中. import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apach

使用hadoop实现ip地理位置统计~ip归属地和运营商

转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40585565 更多相关hadoop内容访问:http://blog.csdn.net/xiaojimanman/article/category/2640707 对于博客 http://blog.csdn.net/xiaojimanman/article/details/40372189 中的计算结果 key-value (ip,出现次数),统计下各个地区运营商下的IP个数,通

使用hadoop实现IP个数统计~并将结果写入数据库

转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40372189 hadoop源代码中的WordCount事例中实现了单词统计,但是输出到HDFS文件,线上程序想使用其计算结果还要再次写个程序,所以自己就研究一下关于MapReduce的输出问题,下面就通过一个简单的例子说明下如何将MapReduce的计算结果输出到数据库中. 需求描述: 分析网络服务器上的Apache日志,统计每个IP访问资源的次数,并将结果写入到mysql数据

利用Hadoop streaming 进行词频统计

创建一个文件夹 bin/hdfs dfs -mkdir /input 将要统计的文件上传到hadoopbin/hadoop fs -put /test.txt /input 利用hadoop进行词频统计bin/hadoop jar share/hadoop/tools/lib/Hadoop-streaming-2-9-2.jar –input /test.txt –output /user/results.txt –mapper /bin/cat -reducer /usr/bin/wc 删除r

实验二-2 Eclipse&amp;Hadoop 做英文词频统计进行集群测试

  创建目录上传英文测试文档(如果已有则无需配置).a.dfs上创建input目录 [email protected]:~/data/hadoop-2.5.2$bin/hadoop fs -mkdir -p input b.把hadoop目录下的README.txt拷贝到dfs新建的input里 [email protected]:~/data/hadoop-2.5.2$bin/hadoop fs -copyFromLocal README.txt input —————————————————