协同过滤算法(天池竞赛试题)

一:推荐算法分类:

  1.按数据使用划分:

    • 协同过滤算法:UserCF, ItemCF, ModelCF
    • 基于内容的推荐: 用户内容属性和物品内容属性
    • 社会化过滤:基于用户的社会网络关系

  2.案例:天池大数据竞赛

    我们会开放如下数据类型:


字 段


字段说明


提取说明


user_id


用户标记


抽样&字段加密


Time


行为时间


精度到天级别&隐藏年份


action_type


用户对品牌的行为类型


包括点击、购买、加入购物车、收藏4种行为 
(点击:0 购买:1 收藏:2 购物车:3)


brand_id


品牌数字ID


抽样&字段加密

提供的数据量,涉及千万级天猫用户,万级天猫品牌,时间跨度4个月的行为记录。 
     提供的训练数据在天池集群的表t_alibaba_bigdata_user_brand_total_1中,字段分别为:user_id,brand_id, type, visit_datetime。如图所示

3.用户4种行为类型(Type)对应代码分别为: 
     点击:0;购买:1;收藏:2;购物车:3

二:实现思路及代码

1、 对原数据去重

 1 package com.oracle.www.TianChi_compition;
 2
 3 import java.io.IOException;
 4
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.NullWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16
17 /*
18  * 对原数据去重,去表头
19  */
20 public class Step01 {
21     static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
22         @Override
23         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
24                 throws IOException, InterruptedException {
25             if (key.get() > 0) {
26                 context.write(value, NullWritable.get());
27             }
28         }
29     }
30
31     static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
32         @Override
33         protected void reduce(Text key, Iterable<NullWritable> vlue,
34                 Reducer<Text, NullWritable, Text, NullWritable>.Context context)
35                 throws IOException, InterruptedException {
36             context.write(key, NullWritable.get());
37         }
38     }
39
40     public static void main(String[] args)
41             throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
42         Configuration conf = new Configuration();
43         Job job = Job.getInstance(conf);
44         job.setJarByClass(Step01.class);
45
46         job.setMapperClass(MyMapper.class);
47         job.setReducerClass(MyReducer.class);
48
49         job.setMapOutputKeyClass(Text.class);
50         job.setMapOutputValueClass(NullWritable.class);
51
52         job.setOutputKeyClass(Text.class);
53
54         Path outPath = new Path("hdfs://192.168.9.13:8020/deweight");
55         FileSystem fs = outPath.getFileSystem(conf);
56         if (fs.exists(outPath)) {
57             fs.delete(outPath, true);
58         }
59         FileInputFormat.addInputPath(job, new Path("hdfs://192.168.9.13:8020/TianmaoData"));
60         FileOutputFormat.setOutputPath(job, outPath);
61         job.waitForCompletion(true);
62
63     }
64
65 }

2、 获得所有物品之间的同现矩阵

  1 package com.oracle.www.TianChi_compition;
  2
  3 import java.io.IOException;
  4 import java.util.ArrayList;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17
 18 /*
 19  * 生成同现(显)矩阵
 20  * map端<商品1-商品2,1>拆分,发送
 21  * reduce端<商品1-商品2,1,1,1...>统计
 22  */
 23 public class Step03 {
 24
 25     static class MyMapper extends Mapper<Text, Text, Text, IntWritable> {
 26         Text k = new Text();
 27         IntWritable v = new IntWritable();
 28
 29         @Override
 30         protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context)
 31                 throws IOException, InterruptedException {
 32             ArrayList<String> itemList = new ArrayList<>();
 33             String line = value.toString();
 34             String[] datas = line.split("\t");
 35             for (String data : datas) {// 将用户购买过的商品添加到list集合中
 36                 String[] item_mark = data.split(":");
 37                 itemList.add(item_mark[0]);
 38             }
 39
 40             for (int i = 0; i < itemList.size(); i++) {
 41                 for (int j = 0; j < itemList.size(); j++) {
 42                     k.set(itemList.get(i) + "-" + itemList.get(j));
 43                     v.set(1);
 44                     context.write(k, v);
 45                 }
 46
 47             }
 48         }
 49     }
 50
 51     static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 52         Text k = new Text();
 53         IntWritable v = new IntWritable();
 54
 55         @Override
 56         protected void reduce(Text key, Iterable<IntWritable> value,
 57                 Reducer<Text, IntWritable, Text, IntWritable>.Context context)
 58                 throws IOException, InterruptedException {
 59             int sum = 0;
 60             for (IntWritable val : value) {
 61                 sum += val.get();
 62             }
 63             k.set(key.toString());
 64             v.set(sum);
 65             context.write(k, v);
 66         }
 67     }
 68
 69     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
 70         Configuration conf = new Configuration();
 71         try {
 72             Job job = Job.getInstance(conf);
 73
 74             job.setJarByClass(Step03.class);
 75             job.setMapperClass(MyMapper.class);
 76             job.setReducerClass(MyReducer.class);
 77
 78             job.setMapOutputKeyClass(Text.class);
 79             job.setMapOutputValueClass(IntWritable.class);
 80
 81             job.setOutputKeyClass(Text.class);
 82             job.setOutputValueClass(IntWritable.class);
 83
 84             job.setInputFormatClass(KeyValueTextInputFormat.class);
 85
 86             // 判断output文件夹是否存在,如果存在则删除
 87             Path outPath = new Path("hdfs://192.168.9.13:8020/implyCount");// 输出路径
 88             FileSystem fs = outPath.getFileSystem(conf);// 根据输出路径找到文件,参数为配置文件
 89             if (fs.exists(outPath)) {
 90                 fs.delete(outPath);
 91                 // fs.delete(outPath, true);true的意思是,就算output有东西,也一带删除,默认为true
 92
 93             }
 94             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/gradeMarking"));
 95             FileOutputFormat.setOutputPath(job, outPath);
 96             job.waitForCompletion(true);
 97         } catch (IOException e) {
 98             // TODO Auto-generated catch block
 99             e.printStackTrace();
100         }
101     }
102 }

3、 权重矩阵(用户对同一件商品的不同行为操作得到的评分矩阵)

  1 package com.oracle.www.TianChi_compition;
  2
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Iterator;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.FileSystem;
  9 import org.apache.hadoop.fs.Path;
 10 /*
 11  * 生成评分矩阵
 12  * map端拆分,发送<用户    商品+":"+操作>
 13  * reduce端统计生成<用户    商品1+":"+评分,商品2+":"+评分,...>
 14  */
 15 import org.apache.hadoop.io.LongWritable;
 16 import org.apache.hadoop.io.Text;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.Mapper;
 19 import org.apache.hadoop.mapreduce.Reducer;
 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 22
 23 public class Step02 {
 24     static Text userId = new Text();
 25     static Text shopping_operate = new Text();
 26
 27     static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
 28         @Override
 29         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
 30                 throws IOException, InterruptedException {
 31
 32             String line = value.toString();
 33             String[] datas = line.split("\t");
 34             userId.set(datas[1]);
 35             shopping_operate.set(datas[0] + ":" + datas[2]);
 36             context.write(userId, shopping_operate);
 37         }
 38     }
 39
 40     static class MyReducer extends Reducer<Text, Text, Text, Text> {
 41         Text v = new Text();
 42         double click = 0;
 43         double collect = 0;
 44         double cart = 0;
 45         double alipay = 0;
 46
 47         @Override
 48         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
 49                 throws IOException, InterruptedException {
 50             // shoppingOperate_counter<商品,<商品操作,操作次数>>
 51             HashMap<String, HashMap<String, Integer>> shoppingOperate_counter = new HashMap<>();
 52             String[] temp_str = null;
 53             String shoppingName = null;
 54             String shoppingOperate = null;
 55             HashMap<String, Integer> operate_counter = null;// 内层map,记录对商品的操作和操作次数
 56             for (Text val : value) {
 57                 temp_str = val.toString().split(":");
 58                 shoppingName = temp_str[0];
 59                 shoppingOperate = temp_str[1];
 60                 if (!shoppingOperate_counter.containsKey(shoppingName)) {// map中不存在此商品信息,添加并给予初始值
 61                     operate_counter = new HashMap<>();
 62                     operate_counter.put(shoppingOperate, 1);
 63                     shoppingOperate_counter.put(shoppingName, operate_counter);
 64                 } else {// map中包含此商品
 65                     operate_counter = shoppingOperate_counter.get(shoppingName);
 66                     if (!operate_counter.containsKey(shoppingOperate)) {// 包含此商品不包含此操作
 67                         operate_counter.put(shoppingOperate, 1);
 68                     } else {
 69                         operate_counter.put(shoppingOperate, operate_counter.get(shoppingOperate) + 1);
 70                     }
 71                 }
 72             }
 73             // 通过对shoppingOperate_counter循环遍历,统计算分
 74             Iterator<String> iter = shoppingOperate_counter.keySet().iterator();
 75             StringBuffer shopping_marking = new StringBuffer();
 76             while (iter.hasNext()) {
 77                 click = 0;
 78                 collect = 0;
 79                 cart = 0;
 80                 alipay = 0;
 81                 shoppingName = iter.next();
 82                 operate_counter = shoppingOperate_counter.get(shoppingName);
 83                 Iterator<String> operateIter = operate_counter.keySet().iterator();
 84                 int counter = 0;// 记录用户对单个商品操作过的次数
 85                 while (operateIter.hasNext()) {
 86                     counter++;
 87                     shoppingOperate = operateIter.next();
 88                     if ("click".equals(shoppingOperate)) {
 89                         click = operate_counter.get(shoppingOperate);
 90                     } else if ("collect".equals(shoppingOperate)) {
 91                         collect = operate_counter.get(shoppingOperate);
 92                     } else if ("cart".equals(shoppingOperate)) {
 93                         cart = operate_counter.get(shoppingOperate);
 94                     } else {
 95                         alipay = operate_counter.get(shoppingOperate);
 96                     }
 97                 }
 98                 double sum = click / counter * 1.0 + collect / counter * 2.0 + cart / counter * 3.0
 99                         + alipay / counter * 4.0;
100                 shopping_marking.append(shoppingName + ":" + sum + "\t");
101             }
102             v.set(shopping_marking.toString());
103             context.write(key, v);
104         }
105     }
106
107     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
108         Configuration conf = new Configuration();
109         try {
110             Job job = Job.getInstance(conf);
111             job.setJarByClass(Step02.class);
112             job.setMapperClass(MyMapper.class);
113             job.setReducerClass(MyReducer.class);
114
115             job.setMapOutputKeyClass(Text.class);
116             job.setMapOutputValueClass(Text.class);
117
118             job.setOutputKeyClass(Text.class);
119             job.setOutputKeyClass(Text.class);
120
121             Path outPath = new Path("hdfs://192.168.9.13:8020/deweight");
122             FileSystem fs = outPath.getFileSystem(conf);
123             if (fs.exists(outPath)) {
124                 fs.delete(outPath);
125             }
126             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/deweight"));
127             FileOutputFormat.setOutputPath(job, outPath);
128             job.waitForCompletion(true);
129         } catch (IOException e) {
130             // TODO Auto-generated catch block
131             e.printStackTrace();
132         }
133
134     }
135
136 }

4、 两个矩阵相乘得到三维矩阵

  1 package com.oracle.www.TianChi_compition;
  2
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map.Entry;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.FileSystem;
  9 import org.apache.hadoop.fs.Path;
 10 import org.apache.hadoop.io.DoubleWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 17 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19
 20 public class Step04 {
 21     static class MyMapper extends Mapper<Text, Text, Text, Text> {
 22         String parentName = null;
 23         Text k = new Text();
 24         Text v = new Text();
 25
 26         @Override
 27         protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
 28             FileSplit fs = (FileSplit) context.getInputSplit();
 29             parentName = fs.getPath().getParent().getName();
 30         }
 31
 32         @Override
 33         protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
 34                 throws IOException, InterruptedException {
 35             String line = value.toString();
 36             String[] datas = null;
 37             // 判断输入目录
 38             if (parentName.equals("gradeMarking")) {// 评分
 39                 datas = line.split("\t");
 40                 for (String data : datas) {
 41                     String[] item_mark = data.split(":");
 42                     k.set(item_mark[0]);
 43                     v.set(key.toString() + ":" + item_mark[1]);
 44                     context.write(k, v);
 45                 }
 46             } else {
 47                 datas = key.toString().split("-");
 48                 k.set(datas[1]);
 49                 v.set(datas[0] + ":" + line);
 50                 context.write(k, v);
 51             }
 52         }
 53     }
 54
 55     static class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> {
 56         Text k = new Text();
 57         DoubleWritable v = new DoubleWritable();
 58         // <商品x 用户1:评分1,用户2:评分2,...,商品1:频次1,商品2:频次2,...>(频次值为两件商品同时出现的次数)
 59
 60         @Override
 61         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, DoubleWritable>.Context context)
 62                 throws IOException, InterruptedException {
 63             HashMap<String, Double> user_mark = new HashMap<>();
 64             HashMap<String, Double> item_counter = new HashMap<>();
 65             // 将 用户:评分 , 商品:频次 添加到对应的map中
 66             String[] datas = null;
 67             for (Text val : value) {
 68                 datas = val.toString().split(":");
 69                 if (datas[0].startsWith("u")) {
 70                     user_mark.put(datas[0], Double.parseDouble(datas[1]));
 71                 } else {
 72                     item_counter.put(datas[0], Double.parseDouble(datas[1]));
 73                 }
 74             }
 75
 76             // 遍历循环相乘
 77             String userName = null;
 78             double userMark = 0.0;
 79             String itemName = null;
 80             double iterCounter = 0;
 81             for (Entry<String, Double> entry1 : user_mark.entrySet()) {
 82                 userName = entry1.getKey();
 83                 userMark = entry1.getValue();
 84                 for (Entry<String, Double> entry2 : item_counter.entrySet()) {
 85                     itemName = entry2.getKey();
 86                     iterCounter = entry2.getValue();
 87                     k.set(userName + ":" + itemName);
 88                     v.set(userMark * iterCounter);
 89                     context.write(k, v);
 90                 }
 91             }
 92
 93         }
 94     }
 95
 96     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
 97         Configuration conf = new Configuration();
 98         try {
 99             Job job = Job.getInstance(conf);
100
101             job.setJarByClass(Step03.class);
102             job.setMapperClass(MyMapper.class);
103             job.setReducerClass(MyReducer.class);
104
105             job.setMapOutputKeyClass(Text.class);
106             job.setMapOutputValueClass(Text.class);
107
108             job.setOutputKeyClass(Text.class);
109             job.setOutputValueClass(DoubleWritable.class);
110
111             job.setInputFormatClass(KeyValueTextInputFormat.class);
112
113             // 判断output文件夹是否存在,如果存在则删除
114             Path outPath = new Path("hdfs://192.168.9.13:8020/mark&implyCount_multiply");// 输出路径
115             FileSystem fs = outPath.getFileSystem(conf);// 根据输出路径找到文件,参数为配置文件
116             if (fs.exists(outPath)) {
117                 fs.delete(outPath);
118                 // fs.delete(outPath, true);true的意思是,就算output有东西,也一带删除,默认为true
119
120             }
121             FileInputFormat.setInputPaths(job, new Path[] { new Path("hdfs://192.168.9.13:8020/gradeMarking"),
122                     new Path("hdfs://192.168.9.13:8020/implyCount") });
123             FileOutputFormat.setOutputPath(job, outPath);
124             job.waitForCompletion(true);
125         } catch (IOException e) {
126             // TODO Auto-generated catch block
127             e.printStackTrace();
128         }
129     }
130
131 }

5、 三维矩阵的数据相加获得所有用户对所有物品的推荐值(二维矩阵)

  1 package com.oracle.www.TianChi_compition;
  2
  3 /*
  4  * 筛选掉用户购买过的商品,并求和
  5  */
  6 import java.io.BufferedReader;
  7 import java.io.FileReader;
  8 import java.io.IOException;
  9 import java.net.URI;
 10 import java.net.URISyntaxException;
 11 import java.util.ArrayList;
 12
 13 import org.apache.hadoop.conf.Configuration;
 14 import org.apache.hadoop.fs.FileSystem;
 15 import org.apache.hadoop.fs.Path;
 16 import org.apache.hadoop.io.Text;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.Mapper;
 19 import org.apache.hadoop.mapreduce.Reducer;
 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 23
 24 public class Step05 {
 25     static class MyMapper extends Mapper<Text, Text, Text, Text> {
 26         // boughtList集合用于存放哪些用户买过哪些商品,不能使用map集合存放,
 27         // 同一个用户可能买过多件商品,同一件商品也有可能同时被好多人买过;
 28         ArrayList<String> boughtList = new ArrayList<>();
 29         BufferedReader br = null;
 30
 31         // setup方法初始化boughtList集合
 32         @Override
 33         protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
 34             br = new BufferedReader(new FileReader("part-r-00000"));
 35             String line = null;
 36             String[] datas = null;
 37             while ((line = br.readLine()) != null) {
 38                 datas = line.split("\t");
 39                 if ("alipay".equals(datas[2])) {
 40                     boughtList.add(datas[1] + ":" + datas[0]);
 41                 }
 42             }
 43         }
 44
 45         // map方法排除掉用户购买过的商品,使其不推荐
 46         @Override
 47         protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
 48                 throws IOException, InterruptedException {
 49             // 判断向该用户推荐的商品是否被该用户购买过,如果购买过,则不推荐(即不向reduce端发送)
 50             if (!boughtList.contains(key.toString())) {
 51                 context.write(key, value);
 52             }
 53         }
 54     }
 55
 56     static class MyReducer extends Reducer<Text, Text, Text, Text> {
 57         Text k = new Text();
 58         Text v = new Text();
 59
 60         @Override
 61         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
 62                 throws IOException, InterruptedException {
 63             double rank = 0.0;
 64             for (Text val : value) {
 65                 rank += Double.parseDouble(val.toString());
 66             }
 67             k.set(key.toString().split(":")[0]);
 68             v.set(key.toString().split(":")[1] + ":" + rank);
 69             context.write(k, v);
 70         }
 71     }
 72
 73     public static void main(String[] args)
 74             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
 75         Configuration conf = new Configuration();
 76         Job job = Job.getInstance(conf);
 77         job.setJarByClass(Step05.class);
 78         job.setMapperClass(MyMapper.class);
 79         job.setReducerClass(MyReducer.class);
 80
 81         job.setMapOutputKeyClass(Text.class);
 82         job.setMapOutputValueClass(Text.class);
 83
 84         job.setOutputKeyClass(Text.class);
 85         job.setOutputValueClass(Text.class);
 86
 87         job.setInputFormatClass(KeyValueTextInputFormat.class);
 88
 89         job.addCacheFile(new URI("hdfs://192.168.9.13:8020/deweight/part-r-00000"));
 90
 91         Path outPath = new Path("hdfs://192.168.9.13:8020/shoppingRecommend");
 92         FileSystem fs = outPath.getFileSystem(conf);
 93         if (fs.exists(outPath)) {
 94             fs.delete(outPath, true);
 95         }
 96         FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/mark&implyCount_multiply"));
 97         FileOutputFormat.setOutputPath(job, outPath);
 98
 99         job.waitForCompletion(true);
100
101     }
102
103 }

6、 按照推荐值降序排序(筛选权重高的前十件商品)。

  1 package com.oracle.www.TianChi_compition;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.lang.reflect.InvocationTargetException;
  7 import java.util.ArrayList;
  8 import java.util.Collections;
  9
 10 import org.apache.commons.beanutils.BeanUtils;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.fs.FileSystem;
 13 import org.apache.hadoop.fs.Path;
 14 import org.apache.hadoop.io.Text;
 15 import org.apache.hadoop.io.WritableComparable;
 16 import org.apache.hadoop.mapreduce.Job;
 17 import org.apache.hadoop.mapreduce.Mapper;
 18 import org.apache.hadoop.mapreduce.Reducer;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 22
 23 /*
 24  * 排序,选出向用户推荐的前十个商品
 25  */
 26 public class Step06 {
 27     // 将取到一行的内容拆分,以 <用户,商品j:权重r>发送到reduce端进行处理
 28     static class MyMapper extends Mapper<Text, Text, Text, Sort> {
 29         Sort sort = null;
 30
 31         @Override
 32         protected void map(Text key, Text value, Mapper<Text, Text, Text, Sort>.Context context)
 33                 throws IOException, InterruptedException {
 34
 35             sort = new Sort(value.toString().split(":")[0], Double.parseDouble(value.toString().split(":")[1]));
 36             context.write(key, sort);
 37         }
 38     }
 39
 40     // reduce端将同一用户的推荐商品按权值大小排序,将前十个拼接输出
 41     static class MyReducer extends Reducer<Text, Sort, Text, Text> {
 42         ArrayList<Sort> list = new ArrayList<>();
 43         Text v = new Text();
 44
 45         @Override
 46         protected void reduce(Text key, Iterable<Sort> value, Reducer<Text, Sort, Text, Text>.Context context)
 47                 throws IOException, InterruptedException {
 48             StringBuffer sb = new StringBuffer();
 49             list.clear();
 50             // map端如果将自定义对象作为value发送到reduce端进行迭代时,需要将迭代器中的每个对象使用BeanUtils.copyProperties(dest,org)将属性拷贝到另外一个对象中;
 51             for (Sort sort : value) {
 52                 Sort tempSort = new Sort();
 53                 try {
 54                     BeanUtils.copyProperties(tempSort, sort);
 55                     list.add(tempSort);
 56                 } catch (IllegalAccessException e) {
 57                     // TODO Auto-generated catch block
 58                     e.printStackTrace();
 59                 } catch (InvocationTargetException e) {
 60                     // TODO Auto-generated catch block
 61                     e.printStackTrace();
 62                 }
 63             }
 64
 65             Collections.sort(list);
 66             for (int i = 0; i < list.size() && i < 10; i++) {
 67                 sb.append(list.get(i));
 68             }
 69             v.set(sb.toString());
 70             context.write(key, v);
 71         }
 72     }
 73
 74     static public class Sort implements WritableComparable<Sort> {
 75         private String shoppingName;
 76         private double shoppingRank;
 77
 78         public Sort() {
 79         }
 80
 81         public Sort(String shoppingName, double shoppingRank) {
 82             this.shoppingName = shoppingName;
 83             this.shoppingRank = shoppingRank;
 84         }
 85
 86         public String getShoppingName() {
 87             return shoppingName;
 88         }
 89
 90         public void setShoppingName(String shoppingName) {
 91             this.shoppingName = shoppingName;
 92         }
 93
 94         public double getShoppingRank() {
 95             return shoppingRank;
 96         }
 97
 98         public void setShoppingRank(double shoppingRank) {
 99             this.shoppingRank = shoppingRank;
100         }
101
102         @Override
103         public String toString() {
104             return shoppingName + ":" + shoppingRank + "\t";
105         }
106
107         @Override
108         public void write(DataOutput out) throws IOException {
109             out.writeDouble(shoppingRank);
110             out.writeUTF(shoppingName);
111         }
112
113         @Override
114         public void readFields(DataInput in) throws IOException {
115             this.shoppingRank = in.readDouble();
116             this.shoppingName = in.readUTF();
117         }
118
119         @Override
120         public int compareTo(Sort o) {
121             int temp = 0;
122             if (this.getShoppingRank() - o.getShoppingRank() < 0) {
123                 return 1;
124             } else if (this.getShoppingRank() - o.getShoppingRank() > 0) {
125                 return -1;
126             }
127             return temp;
128         }
129     }
130
131     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
132         Configuration conf = new Configuration();
133         try {
134             Job job = Job.getInstance();
135
136             job.setJarByClass(Step06.class);
137             job.setMapperClass(MyMapper.class);
138             job.setReducerClass(MyReducer.class);
139
140             job.setMapOutputKeyClass(Text.class);
141             job.setMapOutputValueClass(Sort.class);
142
143             job.setOutputKeyClass(Text.class);
144             job.setOutputValueClass(Text.class);
145
146             job.setInputFormatClass(KeyValueTextInputFormat.class);
147
148             Path outPath = new Path("hdfs://192.168.9.13:8020/ShoppingRecommend_Sort");
149             FileSystem fs = outPath.getFileSystem(conf);
150             if (fs.exists(outPath)) {
151                 fs.delete(outPath);
152             }
153
154             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/shoppingRecommend"));
155             FileOutputFormat.setOutputPath(job, outPath);
156
157             job.waitForCompletion(true);
158         } catch (IOException e) {
159             // TODO Auto-generated catch block
160             e.printStackTrace();
161         }
162
163     }
164
165 }
时间: 2024-08-14 07:26:33

协同过滤算法(天池竞赛试题)的相关文章

基于协同过滤算法的推荐

基于协同过滤算法的推荐 (本实验选用数据为真实电商脱敏数据,仅用于学习,请勿商用) 数据挖掘的一个经典案例就是尿布与啤酒的例子.尿布与啤酒看似毫不相关的两种产品,但是当超市将两种产品放到相邻货架销售的时候,会大大提高两者销量.很多时候看似不相关的两种产品,却会存在这某种神秘的隐含关系,获取这种关系将会对提高销售额起到推动作用,然而有时这种关联是很难通过经验分析得到的.这时候我们需要借助数据挖掘中的常见算法-协同过滤来实现.这种算法可以帮助我们挖掘人与人以及商品与商品的关联关系. 协同过滤算法是一

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

GBDT(Gradient Boosting Decision Tree)算法&amp;协同过滤算法

GBDT(Gradient Boosting Decision Tree)算法参考:http://blog.csdn.net/dark_scope/article/details/24863289 理解机器学习算法:http://blog.csdn.net/dark_scope/article/details/25485893 协同过滤算法:http://blog.csdn.net/dark_scope/article/details/17228643

Mahout实现基于用户的协同过滤算法

Mahout中对协同过滤算法进行了封装,看一个简单的基于用户的协同过滤算法. 基于用户:通过用户对物品的偏好程度来计算出用户的在喜好上的近邻,从而根据近邻的喜好推测出用户的喜好并推荐. 图片来源 程序中用到的数据都存在MySQL数据库中,计算结果也存在MySQL中的对应用户表中. package com.mahout.helloworlddemo; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.

基于用户的协同过滤算法(UserCF)

基于用户的协同过滤算法: 找到和目标用户相似的用户集合 找到这个集合中用户喜欢的但目标用户没有听过的物品 #encoding: utf-8 from Similarity import Person from Sort import select_sort file=open('user_bookmark','r') filew=open('user_bookRecommend','w') #加载训练集 trainSet={} while True: line=file.readline().s

探秘推荐引擎之协同过滤算法小综述

数学大神.统计学大神和数据挖掘推荐大神请关注. 一.数学期望的理解 早些时候,法国有两个大数学家,一个叫做布莱士·帕斯卡,一个叫做费马.帕斯卡认识两个赌徒,这两个赌徒向他提出了一个问题.他们说,他俩下赌金之后,约定谁先赢满5局,谁就获得全部赌金.赌了半天,A赢了4局,B赢了3局,时间很晚了,他们都不想再赌下去了.那么,这个钱应该怎么分?是不是把钱分成7份,赢了4局的就拿4份,赢了3局的就拿3份呢?或者,因为最早说的是满5局,而谁也没达到,所以就一人分一半呢?这两种分法都不对.正确的答案是:赢了4

机器学习----推荐系统之协同过滤算法

(一)问题描述 电影评分,下图中5部电影,4个人进行评分,评分从0-5,并且为整数,问号处表示没有评分. (二)基于内容的推荐系统 给每部电影添加两个features,针对这个问题中分别为romatic和action,范围为1-5,并且给出一部电影这两个参数就已知. 这里设,每部电影由xi表示,xi为一个3*1的向量,第一个x0为截距1,第二个为romantic指数,第三个为action指数.每个人的评分也由一个3*1的向量表示,第二个和第三个分别表示每个人对romantic和action的喜欢

推荐引擎之Mahout 基于用户协同过滤算法的使用

本文目的: 介绍一种常见推荐算法(用户协同过滤)的使用. 应用场景: XXX项目运行一段时间后,系统中将会存在很多视频信息, 而通常 APP 给用户推送的消息(1-3条/每天), 那么这就需要我们根据用户的行为特征,进行更为有效的推送. 工具介绍:mahout 协同过滤算法的使用 测试代码: /**  *   * 基于用户近邻协同过滤推荐算法,  * 本文目的:针对xxx后续广告推荐算法,提供一些算法模型的参考  *   * @版权所有:来谊金融 版权所有 (c) 2015  * @author

推荐算法之基于物品的协同过滤算法

基于物品的协同过滤算法(ItemCF)是业界应用最多的算法,主要思想是利用用户之前有过的行为,给用户推荐和之前物品类似的物品. 基于物品的协同过滤算法主要分为两步: 1)计算物品之间的相似度. 2)依据物品的相似度和用户的历史行为给用户生成推荐列表. 第一步的关键点在于计算物品之间的相似度,这里并不採用基于内容的相似性,而是去计算在喜欢物品i的用户中有多少是喜欢物品j的,这样计算的前提是用户的兴趣爱好通常是比較确定的,不easy变,那么当一个用户对两个物品都喜欢的时候,我们往往能够觉得这两个物品