Apriori on MapReduce

Apiroi算法在Hadoop MapReduce上的实现

输入格式:

一行为一个Bucket

1 3 5 7 9 11 13 15 17 19 21 23 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 12 13 15 17 19 21 23 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 12 13 16 17 19 21 23 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 20 21 23 25 27 29 31 34 36 38 40 42 44 47 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 19 21 23 25 27 29 31 34 36 38 40 42 44 46 48 51 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 19 21 23 25 27 29 31 34 36 38 40 42 44 46 48 51 52 54 56 58 60 63 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 20 21 23 25 27 29 31 34 36 38 40 42 44 47 48 51 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 12 13 15 17 19 21 24 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 19 21 24 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 65 66 68 70 72 74
1 3 5 7 9 11 13 16 17 19 21 24 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 12 13 16 17 19 21 24 25 27 29 31 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 20 21 24 25 27 29 31 34 36 38 40 42 44 47 48 50 52 54 56 58 60 62 64 66 68 70 72 74
1 3 5 7 9 11 13 15 17 20 21 24 25 27 29 31 34 36 38 40 42 44 47 48 50 52 54 56 58 60 62 65 66 68 70 72 74
1 3 5 7 9 11 13 15 17 20 21 24 25 27 29 31 34 36 38 40 43 44 47 48 50 52 54 56 58 60 62 65 66 68 70 72 74 

输出格式:

<item1,item2,...itemK, frequency>

25    2860
29    3181
3    2839
34    3040
36    3099
40    3170
48    3013
5    2971
52    3185
56    3021

代码:

  1 package apriori;
  2
  3 import java.io.IOException;
  4 import java.util.Iterator;
  5 import java.util.StringTokenizer;
  6 import java.util.List;
  7 import java.util.ArrayList;
  8 import java.util.Collections;
  9 import java.util.Map;
 10 import java.util.HashMap;
 11 import java.io.*;
 12
 13 import org.apache.hadoop.conf.Configuration;
 14 import org.apache.hadoop.conf.Configured;
 15 import org.apache.hadoop.fs.Path;
 16 import org.apache.hadoop.fs.FileSystem;
 17 import org.apache.hadoop.io.Text;
 18 import org.apache.hadoop.io.IntWritable;
 19 import org.apache.hadoop.mapreduce.Job;
 20 import org.apache.hadoop.mapreduce.Mapper;
 21 import org.apache.hadoop.mapreduce.Mapper.Context;
 22 import org.apache.hadoop.mapreduce.Reducer;
 23 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 24 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 25 import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
 26 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 27 import org.apache.hadoop.util.Tool;
 28 import org.apache.hadoop.util.ToolRunner;
 29
 30 class AprioriPass1Mapper extends Mapper<Object,Text,Text,IntWritable>{
 31     private final static IntWritable one = new IntWritable(1);
 32     private Text number = new Text();
 33
 34     //第一次pass的Mapper只要把每个item映射为1
 35     public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
 36
 37         String[] ids = value.toString().split("[\\s\\t]+");
 38         for(int i = 0;i < ids.length;i++){
 39             context.write(new Text(ids[i]),one);
 40         }
 41     }
 42 }
 43
 44 class AprioriReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
 45     private IntWritable result = new IntWritable();
 46
 47     //所有Pass的job共用一个reducer,即统计一种itemset的个数,并筛选除大于s的
 48     public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
 49         int sum = 0;
 50
 51         int minSup = context.getConfiguration().getInt("minSup",5);
 52         for(IntWritable val : values){
 53             sum += val.get();
 54         }
 55         result.set(sum);
 56
 57         if(sum > minSup){
 58             context.write(key,result);
 59         }
 60     }
 61 }
 62
 63 class AprioriPassKMapper extends Mapper<Object,Text,Text,IntWritable>{
 64     private final static IntWritable one = new IntWritable(1);
 65     private Text item = new Text();
 66
 67     private List< List<Integer> > prevItemsets = new ArrayList< List<Integer> >();
 68     private List< List<Integer> > candidateItemsets = new ArrayList< List<Integer> >();
 69     private Map<String,Boolean> candidateItemsetsMap = new HashMap<String,Boolean>();
 70
 71
 72     //第一个以后的pass使用该Mapper,在map函数执行前会执行setup来从k-1次pass的输出中构建候选itemsets,对应于apriori算法
 73     @Override
 74     public void setup(Context context) throws IOException, InterruptedException{
 75         int passNum = context.getConfiguration().getInt("passNum",2);
 76         String prefix = context.getConfiguration().get("hdfsOutputDirPrefix","");
 77         String lastPass1 = context.getConfiguration().get("fs.default.name") + "/user/hadoop/chess-" + (passNum - 1) + "/part-r-00000";
 78         String lastPass = context.getConfiguration().get("fs.default.name") + prefix + (passNum - 1) + "/part-r-00000";
 79
 80         try{
 81             Path path = new Path(lastPass);
 82             FileSystem fs = FileSystem.get(context.getConfiguration());
 83             BufferedReader fis = new BufferedReader(new InputStreamReader(fs.open(path)));
 84             String line = null;
 85
 86             while((line = fis.readLine()) != null){
 87
 88                 List<Integer> itemset = new ArrayList<Integer>();
 89
 90                 String itemsStr = line.split("[\\s\\t]+")[0];
 91                 for(String itemStr : itemsStr.split(",")){
 92                     itemset.add(Integer.parseInt(itemStr));
 93                 }
 94
 95                 prevItemsets.add(itemset);
 96             }
 97         }catch (Exception e){
 98             e.printStackTrace();
 99         }
100
101         //get candidate itemsets from the prev itemsets
102         candidateItemsets = getCandidateItemsets(prevItemsets,passNum - 1);
103     }
104
105
106     public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
107         String[] ids = value.toString().split("[\\s\\t]+");
108
109         List<Integer> itemset = new ArrayList<Integer>();
110         for(String id : ids){
111             itemset.add(Integer.parseInt(id));
112         }
113
114         //遍历所有候选集合
115         for(List<Integer> candidateItemset : candidateItemsets){
116             //如果输入的一行中包含该候选集合,则映射1,这样来统计候选集合被包括的次数
117             //子集合,消耗掉了大部分时间
118             if(contains(candidateItemset,itemset)){
119                 String outputKey = "";
120                 for(int i = 0;i < candidateItemset.size();i++){
121                     outputKey += candidateItemset.get(i) + ",";
122                 }
123                 outputKey = outputKey.substring(0,outputKey.length() - 1);
124                 context.write(new Text(outputKey),one);
125             }
126         }
127     }
128
129     //返回items是否是allItems的子集
130     private boolean contains(List<Integer> items,List<Integer> allItems){
131
132         int i = 0;
133         int j = 0;
134         while(i < items.size() && j < allItems.size()){
135             if(allItems.get(j) > items.get(i)){
136                 return false;
137             }else if(allItems.get(j) == items.get(i)){
138                 j++;
139                 i++;
140             }else{
141                 j++;
142             }
143         }
144
145         if(i != items.size()){
146             return false;
147         }
148         return true;
149     }
150
151     //获取所有候选集合,参考apriori算法
152     private List< List<Integer> > getCandidateItemsets(List< List<Integer> > prevItemsets, int passNum){
153
154         List< List<Integer> > candidateItemsets = new ArrayList<List<Integer> >();
155
156         //上次pass的输出中选取连个itemset构造大小为k + 1的候选集合
157         for(int i = 0;i < prevItemsets.size();i++){
158             for(int j = i + 1;j < prevItemsets.size();j++){
159                 List<Integer> outerItems = prevItemsets.get(i);
160                 List<Integer> innerItems = prevItemsets.get(j);
161
162                 List<Integer> newItems = null;
163                 if(passNum == 1){
164                     newItems = new ArrayList<Integer>();
165                     newItems.add(outerItems.get(0));
166                     newItems.add(innerItems.get(0));
167                 }
168                 else{
169                     int nDifferent = 0;
170                     int index = -1;
171                     for(int k = 0; k < passNum && nDifferent < 2;k++){
172                         if(!innerItems.contains(outerItems.get(k))){
173                             nDifferent++;
174                             index = k;
175                         }
176                     }
177
178                     if(nDifferent == 1){
179                         //System.out.println("inner " + innerItems + " outer : " + outerItems);
180                         newItems = new ArrayList<Integer>();
181                         newItems.addAll(innerItems);
182                         newItems.add(outerItems.get(index));
183                     }
184                 }
185                 if(newItems == null){continue;}
186
187                 Collections.sort(newItems);
188
189                 //候选集合必须满足所有的子集都在上次pass的输出中,调用isCandidate进行检测,通过后加入到候选子集和列表
190                 if(isCandidate(newItems,prevItemsets) && !candidateItemsets.contains(newItems)){
191                     candidateItemsets.add(newItems);
192                     //System.out.println(newItems);
193                 }
194             }
195         }
196
197         return candidateItemsets;
198     }
199
200     private boolean isCandidate(List<Integer> newItems,List< List<Integer> > prevItemsets){
201
202         List<List<Integer>> subsets = getSubsets(newItems);
203
204         for(List<Integer> subset : subsets){
205             if(!prevItemsets.contains(subset)){
206                 return false;
207             }
208         }
209
210         return true;
211     }
212
213     private List<List<Integer>> getSubsets(List<Integer> items){
214
215         List<List<Integer>> subsets = new ArrayList<List<Integer>>();
216         for(int i = 0;i < items.size();i++){
217             List<Integer> subset = new ArrayList<Integer>(items);
218             subset.remove(i);
219             subsets.add(subset);
220         }
221
222         return subsets;
223     }
224 }
225
226 public class Apriori extends Configured implements Tool{
227
228     public static int s;
229     public static int k;
230
231     public int run(String[] args)throws IOException,InterruptedException,ClassNotFoundException{
232         long startTime = System.currentTimeMillis();
233
234         String hdfsInputDir = args[0];        //从参数1中读取输入数据
235         String hdfsOutputDirPrefix = args[1];    //参数2为输出数据前缀,和第pass次组成输出目录
236         s = Integer.parseInt(args[2]);        //阈值
237         k = Integer.parseInt(args[3]);        //k次pass
238
239         //循环执行K次pass
240         for(int pass = 1; pass <= k;pass++){
241             long passStartTime = System.currentTimeMillis();
242
243             //配置执行该job
244             if(!runPassKMRJob(hdfsInputDir,hdfsOutputDirPrefix,pass)){
245                 return -1;
246             }
247
248             long passEndTime = System.currentTimeMillis();
249             System.out.println("pass " + pass + " time : " + (passEndTime - passStartTime));
250         }
251
252         long endTime = System.currentTimeMillis();
253         System.out.println("total time : " + (endTime - startTime));
254
255         return 0;
256     }
257
258     private static boolean runPassKMRJob(String hdfsInputDir,String hdfsOutputDirPrefix,int passNum)
259             throws IOException,InterruptedException,ClassNotFoundException{
260
261             Configuration passNumMRConf = new Configuration();
262             passNumMRConf.setInt("passNum",passNum);
263             passNumMRConf.set("hdfsOutputDirPrefix",hdfsOutputDirPrefix);
264             passNumMRConf.setInt("minSup",s);
265
266             Job passNumMRJob = new Job(passNumMRConf,"" + passNum);
267             passNumMRJob.setJarByClass(Apriori.class);
268             if(passNum == 1){
269                 //第一次pass的Mapper类特殊对待,不许要构造候选itemsets
270                 passNumMRJob.setMapperClass(AprioriPass1Mapper.class);
271             }
272             else{
273                 //第一次之后的pass的Mapper类特殊对待,不许要构造候选itemsets
274                 passNumMRJob.setMapperClass(AprioriPassKMapper.class);
275             }
276             passNumMRJob.setReducerClass(AprioriReducer.class);
277             passNumMRJob.setOutputKeyClass(Text.class);
278             passNumMRJob.setOutputValueClass(IntWritable.class);
279
280             FileInputFormat.addInputPath(passNumMRJob,new Path(hdfsInputDir));
281             FileOutputFormat.setOutputPath(passNumMRJob,new Path(hdfsOutputDirPrefix + passNum));
282
283             return passNumMRJob.waitForCompletion(true);
284     }
285
286     public static void main(String[] args) throws Exception{
287         int exitCode = ToolRunner.run(new Apriori(),args);
288         System.exit(exitCode);
289     }
290 }
时间: 2024-09-27 04:32:59

Apriori on MapReduce的相关文章

使用Apriori算法和FP-growth算法进行关联分析(Python版)

===================================================================== <机器学习实战>系列博客是博主阅读<机器学习实战>这本书的笔记也包含一些其他python实现的机器学习算法 算法实现均采用python github 源码同步:https://github.com/Thinkgamer/Machine-Learning-With-Python ==================================

Spark下的FP-Growth和Apriori(频繁项集挖掘并行化算法)

频繁项集挖掘是一个关联式规则挖掘问题.关联挖掘是数据挖掘中研究最早也是最活跃的领域,其中频繁模式的挖掘是关联挖掘的核心和基础,是产生关联规则挖掘的基础.频繁项集最经典的应用就是超市的购物篮分析. 首先要理解频繁项集中的以下概念. 频繁项:在多个集合中,频繁出现的元素项. 频繁项集:在一系列集合中每项都含有某些相同的元素,这些元素形成一个子集,满足一定阀值就是频繁项集. K项集:K个频繁项组成的一个集合. 支持度:包含频繁项集(F)的集合的数目. 可信度:频繁项与某项的并集的支持度与频繁项集支持度

#研发解决方案#基于Apriori算法的Nginx+Lua+ELK异常流量拦截方案

郑昀 基于杨海波的设计文档 创建于2015/8/13 最后更新于2015/8/25 关键词:异常流量.rate limiting.Nginx.Apriori.频繁项集.先验算法.Lua.ELK 本文档适用人员:技术人员 提纲: 所谓异常流量 如何识别异常流量 Apriori如何工作 如何让 Nginx 拦截可疑 IP 0x00,所谓异常流量 有害的异常流量大概分为以下几种: 僵尸网络中的节点对主站发起无目的的密集访问: 黑客.白帽子或某些安全公司为了做漏洞扫描,对主站各个 Web 工程发起字典式

[ML&amp;DL] 频繁项集Apriori算法

频繁项集Apriori算法 Reference 数据挖掘十大算法之Apriori详解 Apriori算法详解之[一.相关概念和核心步骤] 关联分析之Apriori算法 haha 算法理解部分主要是前两个链接,写的很靠谱.在实际中再配合上hadoop的mapreduce.

MapReduce实现手机上网流量分析

一.问题背景 现在的移动刚一通话就可以在网站上看自己的通话记录,以前是本月只能看上一个月.不过流量仍然是只能看上一月的. 目的就是找到用户在一段时间内的上网流量. 本文并没有对时间分组. 二.数据集分析 可以看出实际数据集并不是每个字段都有值,但是还好,完整地以tab隔开了,数据格式还是不错的,我们需要的上行下行数据都有,没有缺失值.其实这个需要在程序中处理,如果不在的话 该怎么办. 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196

mapreduce和spark的原理及区别

Mapreduce和spark是数据处理层两大核心,了解和学习大数据必须要重点掌握的环节,根据自己的经验和大家做一下知识的分享. 首先了解一下Mapreduce,它最本质的两个过程就是Map和Reduce,Map的应用在于我们需要数据一对一的元素的映射转换,比如说进行截取,进行过滤,或者任何的转换操作,这些一对一的元素转换就称作是Map:Reduce主要就是元素的聚合,就是多个元素对一个元素的聚合,比如求Sum等,这就是Reduce. Mapreduce是Hadoop1.0的核心,Spark出现

基于 Eclipse 的 MapReduce 开发环境搭建

文 / vincentzh 原文连接:http://www.cnblogs.com/vincentzh/p/6055850.html 上周末本来要写这篇的,结果没想到上周末自己环境都没有搭起来,运行起来有问题的呢,拖到周一才将问题解决掉.刚好这周也将之前看的内容复习了下,边复习边码代码理解,印象倒是很深刻,对看过的东西理解也更深入了. 目录 1.概述 2.环境准备 3.插件配置 4.配置文件系统连接 5.测试连接 6.代码编写与执行 7.问题梳理 7.1 console 无日志输出问题 7.2

mongodb aggregate and mapReduce

Aggregate MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果.有点类似sql语句中的 count(*) 语法如下: db.collection.aggregate() db.collection.aggregate(pipeline,options) db.runCommand({ aggregate: "<collection>", pipeline: [ <stage>, <...&g

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且: 1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑): 2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次. 而TaskImpl中存在一个成员变