(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)

 

转自:http://zengzhaozheng.blog.51cto.com/8219051/1557054

一、概述

这2个月研究根据用户标签情况对用户的相似度进行评估,其中涉及一些推荐算法知识,在这段时间研究了一遍《推荐算法实践》和《Mahout in action》,在这里主要是根据这两本书的一些思想和自己的一些理解对分布式基于ItemBase的推荐算法进行实现。其中分两部分,第一部分是根据共现矩阵的方式来简单的推算出用户的推荐项,第二部分则是通过传统的相似度矩阵的方法来实践ItemBase推荐算法。这篇blog主要记录第一部分的内容,并且利用MapReduce进行实现,下一篇blog则是记录第二部分的内容和实现。

二、算法原理

协同推荐算法,作为众多推荐算法中的一种已经被广泛的应用。其主要分为2种,第一种就是基于用户的协同过滤,第二种就是基于物品的协同过滤。

所谓的itemBase推荐算法简单直白的描述就是:用户A喜欢物品X1,用户B喜欢物品X2,如果X1和X2相似则,将A之前喜欢过的物品推荐给B,或者B之前喜欢过的物品推荐给A。这种算法是完全依赖于用户的历史喜欢物品的;所谓的UserBase推荐算法直白地说就是:用户A喜欢物品X1,用户B喜欢物品X2,如果用户A和用户B相似则将物品X1推荐给用户B,将物品X2推荐给用户A。简单的示意图:

至于选择哪种要看自己的实际情况,如果用户量比物品种类多得多那么就采用ItemBase的协同过滤推荐算法,如果是用户量比物品种类少的多则采用UserBase的协同顾虑推荐算,这样选择的一个原因是为了让物品的相似度矩阵或者用户相似度矩阵或者共现矩阵的规模最小化。

三、数据建模

基本的算法上面已经大概说了一下,对于算法来说,对数据建模使之运用在算法之上是重点也是难点。这小节主要根据自己相关项目的经验和《推荐引擎实践》的一些观点来讨论一些。分开2部分说,一是根据共现矩阵推荐、而是根据相似度算法进行推荐。

(1)共现矩阵方式:

第一步:转换成用户向量

1[102:0.1,103:0.2,104:0.3]:表示用户1喜欢的物品列表,以及他们对应的喜好评分。

2[101:0.1,102:0.7,105:0.9]:表示用户2喜欢的物品列表,以及他们对应的喜好评分。

3[102:0.1,103:0.7,104:0.2]:表示用户3喜欢的物品列表,以及他们对应的喜好评分。

第二步:计算共现矩阵

简单地说就是将同时喜欢物品x1和x2的用户数组成矩阵。

第三步:

生成用户对物品的评分矩阵

第四步:物品共现矩阵和用户对物品的评分矩阵相乘得到推荐结果

举个例子计算用户1的推荐列表过程:

用户1对物品101的总评分计算:

1*0+1*0.1+0*0.2+0*0.3+1*0=0.1

用户1对物品102的总评分计算:

1*0+3*0.1+1*0.2+2*0.3+2*0=1.1

用户1对物品103的总评分计算:

0*0+1*0.1+1*0.2+1*0.3+0*0=0.6

用户1对物品104的总评分计算:

0*0+2*0.1+1*0.2+2*0.3+1*0=1.0

用户1对物品105的总评分计算:

1*0+2*0.1+0*0.2+1*0.3+2*0=0.5

从而得到用户1的推荐列表为1[101:0.1,102:1.1,103:0.6,104:1.0,105:0.5]再经过排序得到最终推荐列表1[102:1.1,104:1.0,103:0.6,105:0.5,101:0.1]。

(2)通过计算机物品相似度方式计算用户的推荐向量。

通过计算机物品相似度方式计算用户的推荐向量和上面通过共现矩阵的方式差不多,就是将物品相似度矩阵代替掉共现矩阵和用户对物品的评分矩阵相乘,然后在计算推荐向量。

计算相似度矩阵:

在计算之前我们先了解一下物品相似度相关的计算方法。

对于计算物品相似度的算法有很多,要根据自己的数据模型进行选择。基于皮尔逊相关系数计算、欧几里德定理(实际上是算两点距离)、基于余弦相似度计算斯皮尔曼相关系数计算、基于谷本系数计算、基于对数似然比计算。其中谷本系数和对数似然比这两种方式主要是针对那些没有指名对物品喜欢度的数据模型进行相似度计算,也就是mahout中所指的Boolean数据模型。下面主要介绍2种,欧几里德和余弦相似度算法。

现在关键是怎么将现有数据转化成对应的空间向量模型使之适用这些定理,这是个关键点。下面我以欧几里德定理作为例子看看那如何建立模型:

第一步:将用户向量转化为物品向量

用户向量:

1[102:0.1,103:0.2,104:0.3]

2[101:0.1,102:0.7,105:0.9]

3[102:0.1,103:0.7,104:0.2]

转为为物品向量:

101[2:0.1]

102[1:0.1,2:0.7,3:0.1]

103[1:0.2,3:0.7]

104[1:0.3,3:0.2]

105[2:0.9]

第二步:

那么物品相似度计算为:

第三步:

最终得到物品相似度矩阵为:(这里省略掉没有意义的自关联相似度)

第四步:物品相似度矩阵和用户对物品的评分矩阵相乘得到推荐结果:

举个例子计算用户1的类似推荐列表过程:

用户1对物品101的总评分计算:

1*0+1*0.6186429+0*0.6964322+0*0.7277142+1*0.55555556=1.174198

用户1对物品102的总评分计算:

1*0.6186429+3*0+1*0.5188439+2*0.5764197+2*0.8032458=3.896818

用户1对物品103的总评分计算:

0*0.6964322+1*0.5188439+1*0+1*0.662294+0*0.463481=1.181138

用户1对物品104的总评分计算:

0*0.7277142+2*0.5764197+1*0.662294+2*0+1*0.5077338=2.322867

用户1对物品105的总评分计算:

1*0.55555556+2*0.8032458+0*0.463481+1*0.5077338=2.669780

四、共现矩阵方式的MapReduce实现

这里主要是利用MapReduce结合Mahout连的一些数据类型对共现矩阵方式的推荐方法进行实现,至于相似度矩阵方式进行推荐的在下一篇blog写。这里采用Boolean数据模型,即用户是没有对喜欢的物品进行初始打分的,我们在程序中默认都为1。

先看看整个MapReduce的数据流向图:

具体代码实现:HadoopUtil

  1. package com.util;
  2. import java.io.IOException;
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.fs.PathFilter;
  9. import org.apache.hadoop.io.Writable;
  10. import org.apache.hadoop.mapreduce.InputFormat;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.JobContext;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.OutputFormat;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.mahout.common.iterator.sequencefile.PathType;
  17. import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
  18. import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. publicfinalclassHadoopUtil{
  22. privatestaticfinalLogger log =LoggerFactory.getLogger(HadoopUtil.class);
  23. privateHadoopUtil(){}
  24. publicstaticJob prepareJob(String jobName,
  25. String[] inputPath,
  26. String outputPath,
  27. Class<?extendsInputFormat> inputFormat,
  28. Class<?extendsMapper> mapper,
  29. Class<?extendsWritable> mapperKey,
  30. Class<?extendsWritable> mapperValue,
  31. Class<?extendsOutputFormat> outputFormat,Configuration conf)throwsIOException{
  32. Job job =newJob(newConfiguration(conf));
  33. job.setJobName(jobName);
  34. Configuration jobConf = job.getConfiguration();
  35. if(mapper.equals(Mapper.class)){
  36. thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");
  37. }
  38. job.setJarByClass(mapper);
  39. job.setInputFormatClass(inputFormat);
  40. job.setInputFormatClass(inputFormat);
  41. StringBuilder inputPathsStringBuilder =newStringBuilder();
  42. for(String p : inputPath){
  43. inputPathsStringBuilder.append(",").append(p);
  44. }
  45. inputPathsStringBuilder.deleteCharAt(0);
  46. jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());
  47. job.setMapperClass(mapper);
  48. job.setMapOutputKeyClass(mapperKey);
  49. job.setMapOutputValueClass(mapperValue);
  50. job.setOutputKeyClass(mapperKey);
  51. job.setOutputValueClass(mapperValue);
  52. jobConf.setBoolean("mapred.compress.map.output",true);
  53. job.setNumReduceTasks(0);
  54. job.setOutputFormatClass(outputFormat);
  55. jobConf.set("mapred.output.dir", outputPath);
  56. return job;
  57. }
  58. publicstaticJob prepareJob(String jobName,
  59. String[] inputPath,
  60. String outputPath,
  61. Class<?extendsInputFormat> inputFormat,
  62. Class<?extendsMapper> mapper,
  63. Class<?extendsWritable> mapperKey,
  64. Class<?extendsWritable> mapperValue,
  65. Class<?extendsReducer> reducer,
  66. Class<?extendsWritable> reducerKey,
  67. Class<?extendsWritable> reducerValue,
  68. Class<?extendsOutputFormat> outputFormat,
  69. Configuration conf)throwsIOException{
  70. Job job =newJob(newConfiguration(conf));
  71. job.setJobName(jobName);
  72. Configuration jobConf = job.getConfiguration();
  73. if(reducer.equals(Reducer.class)){
  74. if(mapper.equals(Mapper.class)){
  75. thrownewIllegalStateException("Can‘t figure out the user class jar file from mapper/reducer");
  76. }
  77. job.setJarByClass(mapper);
  78. }else{
  79. job.setJarByClass(reducer);
  80. }
  81. job.setInputFormatClass(inputFormat);
  82. StringBuilder inputPathsStringBuilder =newStringBuilder();
  83. for(String p : inputPath){
  84. inputPathsStringBuilder.append(",").append(p);
  85. }
  86. inputPathsStringBuilder.deleteCharAt(0);
  87. jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());
  88. job.setMapperClass(mapper);
  89. if(mapperKey !=null){
  90. job.setMapOutputKeyClass(mapperKey);
  91. }
  92. if(mapperValue !=null){
  93. job.setMapOutputValueClass(mapperValue);
  94. }
  95. jobConf.setBoolean("mapred.compress.map.output",true);
  96. job.setReducerClass(reducer);
  97. job.setOutputKeyClass(reducerKey);
  98. job.setOutputValueClass(reducerValue);
  99. job.setOutputFormatClass(outputFormat);
  100. jobConf.set("mapred.output.dir", outputPath);
  101. return job;
  102. }
  103. publicstaticJob prepareJob(String jobName,String[] inputPath,
  104. String outputPath,Class<?extendsInputFormat> inputFormat,
  105. Class<?extendsMapper> mapper,
  106. Class<?extendsWritable> mapperKey,
  107. Class<?extendsWritable> mapperValue,
  108. Class<?extendsReducer> combiner,
  109. Class<?extendsReducer> reducer,
  110. Class<?extendsWritable> reducerKey,
  111. Class<?extendsWritable> reducerValue,
  112. Class<?extendsOutputFormat> outputFormat,Configuration conf)
  113. throwsIOException{
  114. Job job =newJob(newConfiguration(conf));
  115. job.setJobName(jobName);
  116. Configuration jobConf = job.getConfiguration();
  117. if(reducer.equals(Reducer.class)){
  118. if(mapper.equals(Mapper.class)){
  119. thrownewIllegalStateException(
  120. "Can‘t figure out the user class jar file from mapper/reducer");
  121. }
  122. job.setJarByClass(mapper);
  123. }else{
  124. job.setJarByClass(reducer);
  125. }
  126. job.setInputFormatClass(inputFormat);
  127. StringBuilder inputPathsStringBuilder =newStringBuilder();
  128. for(String p : inputPath){
  129. inputPathsStringBuilder.append(",").append(p);
  130. }
  131. inputPathsStringBuilder.deleteCharAt(0);
  132. jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());
  133. job.setMapperClass(mapper);
  134. if(mapperKey !=null){
  135. job.setMapOutputKeyClass(mapperKey);
  136. }
  137. if(mapperValue !=null){
  138. job.setMapOutputValueClass(mapperValue);
  139. }
  140. jobConf.setBoolean("mapred.compress.map.output",true);
  141. job.setCombinerClass(combiner);
  142. job.setReducerClass(reducer);
  143. job.setOutputKeyClass(reducerKey);
  144. job.setOutputValueClass(reducerValue);
  145. job.setOutputFormatClass(outputFormat);
  146. jobConf.set("mapred.output.dir", outputPath);
  147. return job;
  148. }
  149. publicstaticString getCustomJobName(String className,JobContext job,
  150. Class<?extendsMapper> mapper,
  151. Class<?extendsReducer> reducer){
  152. StringBuilder name =newStringBuilder(100);
  153. String customJobName = job.getJobName();
  154. if(customJobName ==null|| customJobName.trim().isEmpty()){
  155. name.append(className);
  156. }else{
  157. name.append(customJobName);
  158. }
  159. name.append(‘-‘).append(mapper.getSimpleName());
  160. name.append(‘-‘).append(reducer.getSimpleName());
  161. return name.toString();
  162. }
  163. publicstaticvoiddelete(Configuration conf,Iterable<Path> paths)throwsIOException{
  164. if(conf ==null){
  165. conf =newConfiguration();
  166. }
  167. for(Path path : paths){
  168. FileSystem fs = path.getFileSystem(conf);
  169. if(fs.exists(path)){
  170. log.info("Deleting {}", path);
  171. fs.delete(path,true);
  172. }
  173. }
  174. }
  175. publicstaticvoiddelete(Configuration conf,Path... paths)throwsIOException{
  176. delete(conf,Arrays.asList(paths));
  177. }
  178. publicstaticlong countRecords(Path path,Configuration conf)throwsIOException{
  179. long count =0;
  180. Iterator<?> iterator =newSequenceFileValueIterator<Writable>(path,true, conf);
  181. while(iterator.hasNext()){
  182. iterator.next();
  183. count++;
  184. }
  185. return count;
  186. }
  187. publicstaticlong countRecords(Path path,PathType pt,PathFilter filter,Configuration conf)throwsIOException{
  188. long count =0;
  189. Iterator<?> iterator =newSequenceFileDirValueIterator<Writable>(path, pt, filter,null,true, conf);
  190. while(iterator.hasNext()){
  191. iterator.next();
  192. count++;
  193. }
  194. return count;
  195. }
  196. }

先看看写的工具类:

第一步:处理原始输入数据

处理原始数据的SourceDataToItemPrefsJob作业的mapper:SourceDataToItemPrefsMapper

  1. package com.mapper;
  2. import java.io.IOException;
  3. import java.util.regex.Matcher;
  4. import java.util.regex.Pattern;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.mahout.math.VarLongWritable;
  9. /**
  10. * mapper输入格式:userID:itemID1 itemID2 itemID3....
  11. * mapper输出格式:<userID,itemID>
  12. * @author 曾昭正
  13. */
  14. publicclassSourceDataToItemPrefsMapperextendsMapper<LongWritable,Text,VarLongWritable,VarLongWritable>{
  15. //private static final Logger logger = LoggerFactory.getLogger(SourceDataToItemPrefsMapper.class);
  16. privatestaticfinalPattern NUMBERS =Pattern.compile("(\\d+)");
  17. privateString line =null;
  18. @Override
  19. protectedvoid map(LongWritable key,Text value,Context context)
  20. throwsIOException,InterruptedException{
  21. line = value.toString();
  22. if(line ==null)return;
  23. // logger.info("line:"+line);
  24. Matcher matcher = NUMBERS.matcher(line);
  25. matcher.find();//寻找第一个分组,即userID
  26. VarLongWritable userID =newVarLongWritable(Long.parseLong(matcher.group()));//这个类型是在mahout中独立进行封装的
  27. VarLongWritable itemID =newVarLongWritable();
  28. while(matcher.find()){
  29. itemID.set(Long.parseLong(matcher.group()));
  30. // logger.info(userID + " " + itemID);
  31. context.write(userID, itemID);
  32. }
  33. }
  34. }

处理原始数据的SourceDataToItemPrefsJob作业的reducer:SourceDataToItemPrefsMapper

  1. package com.reducer;
  2. import java.io.IOException;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import org.apache.mahout.math.RandomAccessSparseVector;
  5. import org.apache.mahout.math.VarLongWritable;
  6. import org.apache.mahout.math.Vector;
  7. import org.apache.mahout.math.VectorWritable;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. /**
  11. * reducer输入:<userID,Iterable<itemID>>
  12. * reducer输出:<userID,VecotrWriable<index=itemID,valuce=pres>....>
  13. * @author 曾昭正
  14. */
  15. publicclassSourceDataToUserVectorReducerextendsReducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable>{
  16. privatestaticfinalLogger logger =LoggerFactory.getLogger(SourceDataToUserVectorReducer.class);
  17. @Override
  18. protectedvoid reduce(VarLongWritable userID,Iterable<VarLongWritable> itemPrefs,Context context)
  19. throwsIOException,InterruptedException{
  20. /**
  21. * DenseVector,它的实现就是一个浮点数数组,对向量里所有域都进行存储,适合用于存储密集向量。
  22. RandomAccessSparseVector 基于浮点数的 HashMap 实现的,key 是整形 (int) 类型,value 是浮点数 (double) 类型,它只存储向量中不为空的值,并提供随机访问。
  23. SequentialAccessVector 实现为整形 (int) 类型和浮点数 (double) 类型的并行数组,它也只存储向量中不为空的值,但只提供顺序访问。
  24. 用户可以根据自己算法的需求选择合适的向量实现类,如果算法需要很多随机访问,应该选择 DenseVector 或者 RandomAccessSparseVector,如果大部分都是顺序访问,SequentialAccessVector 的效果应该更好。
  25. 介绍了向量的实现,下面我们看看如何将现有的数据建模成向量,术语就是“如何对数据进行向量化”,以便采用 Mahout 的各种高效的聚类算法。
  26. */
  27. Vector userVector =newRandomAccessSparseVector(Integer.MAX_VALUE,100);
  28. for(VarLongWritable itemPref : itemPrefs){
  29. userVector.set((int)itemPref.get(),1.0f);//RandomAccessSparseVector.set(index,value),用户偏好类型为boolean类型,将偏好值默认都为1.0f
  30. }
  31. logger.info(userID+" "+newVectorWritable(userVector));
  32. context.write(userID,newVectorWritable(userVector));
  33. }
  34. }

第二步:将SourceDataToItemPrefsJob作业的reduce输出结果组合成共现矩阵

UserVectorToCooccurrenceJob作业的mapper:UserVectorToCooccurrenceMapper

  1. package com.mapper;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import org.apache.mahout.math.VarLongWritable;
  7. import org.apache.mahout.math.Vector;
  8. import org.apache.mahout.math.VectorWritable;
  9. /**
  10. * mapper输入:<userID,VecotrWriable<index=itemID,valuce=pres>....>
  11. * mapper输出:<itemID,itemID>(共现物品id对)
  12. * @author 曾昭正
  13. */
  14. publicclassUserVectorToCooccurrenceMapperextendsMapper<VarLongWritable,VectorWritable,IntWritable,IntWritable>{
  15. @Override
  16. protectedvoid map(VarLongWritable userID,VectorWritable userVector,Context context)
  17. throwsIOException,InterruptedException{
  18. Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();//过滤掉非空元素
  19. while(it.hasNext()){
  20. int index1 = it.next().index();
  21. Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator();
  22. while(it2.hasNext()){
  23. int index2 = it2.next().index();
  24. context.write(newIntWritable(index1),newIntWritable(index2));
  25. }
  26. }
  27. }
  28. }

UserVectorToCooccurrenceJob作业的reducer:UserVectorToCoocurrenceReducer

  1. package com.reducer;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
  6. import org.apache.mahout.math.RandomAccessSparseVector;
  7. import org.apache.mahout.math.Vector;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. /**
  11. * reducer输入:<itemID,Iterable<itemIDs>>
  12. * reducer输出:<mainItemID,Vector<coocItemID,coocTime(共现次数)>....>
  13. * @author 曾昭正
  14. */
  15. publicclassUserVectorToCoocurrenceReducerextendsReducer<IntWritable,IntWritable,IntWritable,VectorOrPrefWritable>{
  16. privatestaticfinalLogger logger =LoggerFactory.getLogger(UserVectorToCoocurrenceReducer.class);
  17. @Override
  18. protectedvoid reduce(IntWritable mainItemID,Iterable<IntWritable> coocItemIDs,Context context)
  19. throwsIOException,InterruptedException{
  20. Vector coocItemIDVectorRow =newRandomAccessSparseVector(Integer.MAX_VALUE,100);
  21. for(IntWritable coocItem : coocItemIDs){
  22. int itemCoocTime = coocItem.get();
  23. coocItemIDVectorRow.set(itemCoocTime,coocItemIDVectorRow.get(itemCoocTime)+1.0);//将共现次数累加
  24. }
  25. logger.info(mainItemID +" "+newVectorOrPrefWritable(coocItemIDVectorRow));
  26. context.write(mainItemID,newVectorOrPrefWritable(coocItemIDVectorRow));//记录mainItemID的完整共现关系
  27. }
  28. }

第三步:将SourceDataToItemPrefsJob作业的reduce输出结果进行分割

userVecotrSplitJob作业的mapper:UserVecotrSplitMapper

  1. package com.mapper;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
  7. import org.apache.mahout.math.VarLongWritable;
  8. import org.apache.mahout.math.Vector;
  9. import org.apache.mahout.math.Vector.Element;
  10. import org.apache.mahout.math.VectorWritable;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. * 将用户向量分割,以便和物品的共现向量进行合并
  15. * mapper输入:<userID,Vector<itemIDIndex,preferenceValuce>....>
  16. * reducer输出:<itemID,Vecotor<userID,preferenceValuce>....>
  17. * @author 曾昭正
  18. */
  19. publicclassUserVecotrSplitMapperextendsMapper<VarLongWritable,VectorWritable,IntWritable,VectorOrPrefWritable>{
  20. privatestaticfinalLogger logger =LoggerFactory.getLogger(UserVecotrSplitMapper.class);
  21. @Override
  22. protectedvoid map(VarLongWritable userIDWritable,VectorWritable value,Context context)
  23. throwsIOException,InterruptedException{
  24. IntWritable itemIDIndex =newIntWritable();
  25. long userID = userIDWritable.get();
  26. Vector userVector = value.get();
  27. Iterator<Element> it = userVector.nonZeroes().iterator();//只取非空用户向量
  28. while(it.hasNext()){
  29. Element e = it.next();
  30. int itemID = e.index();
  31. itemIDIndex.set(itemID);
  32. float preferenceValuce =(float) e.get();
  33. logger.info(itemIDIndex +" "+newVectorOrPrefWritable(userID,preferenceValuce));
  34. context.write(itemIDIndex,newVectorOrPrefWritable(userID,preferenceValuce));
  35. }
  36. }
  37. }

第四步:将userVecotrSplitJob和UserVectorToCooccurrenceJob作业的输出结果合并

combineUserVectorAndCoocMatrixJob作业的mapper:CombineUserVectorAndCoocMatrixMapper

  1. package com.mapper;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
  6. /**
  7. * 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量
  8. * 这个mapper其实没有什么逻辑处理功能,只是将数据按照输入格式输出
  9. * 注意:这里的mapper输入为共现矩阵和分割后的用户向量计算过程中的共同输出的2个目录
  10. * mapper输入:<itemID,Vecotor<userID,preferenceValuce>> or <itemID,Vecotor<coocItemID,coocTimes>>
  11. * mapper输出:<itemID,Vecotor<userID,preferenceValuce>/Vecotor<coocItemID,coocTimes>>
  12. * @author 曾昭正
  13. */
  14. publicclassCombineUserVectorAndCoocMatrixMapperextendsMapper<IntWritable,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable>{
  15. @Override
  16. protectedvoid map(IntWritable itemID,VectorOrPrefWritable value,Context context)
  17. throwsIOException,InterruptedException{
  18. context.write(itemID, value);
  19. }
  20. }

combineUserVectorAndCoocMatrixJob作业的CombineUserVectorAndCoocMatrixReducer

  1. package com.reducer;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.Iterator;
  5. import java.util.List;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
  9. import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
  10. import org.apache.mahout.math.Vector;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. * 将共现矩阵和分割后的用户向量进行合并,以便计算部分的推荐向量
  15. * @author 曾昭正
  16. */
  17. publicclassCombineUserVectorAndCoocMatrixReducerextendsReducer<IntWritable,VectorOrPrefWritable,IntWritable,VectorAndPrefsWritable>{
  18. privatestaticfinalLogger logger =LoggerFactory.getLogger(CombineUserVectorAndCoocMatrixReducer.class);
  19. @Override
  20. protectedvoid reduce(IntWritable itemID,Iterable<VectorOrPrefWritable> values,Context context)
  21. throwsIOException,InterruptedException{
  22. VectorAndPrefsWritable vectorAndPrefsWritable =newVectorAndPrefsWritable();
  23. List<Long> userIDs =newArrayList<Long>();
  24. List<Float> preferenceValues =newArrayList<Float>();
  25. Vector coocVector =null;
  26. Vector coocVectorTemp =null;
  27. Iterator<VectorOrPrefWritable> it = values.iterator();
  28. while(it.hasNext()){
  29. VectorOrPrefWritable e = it.next();
  30. coocVectorTemp = e.getVector();
  31. if(coocVectorTemp ==null){
  32. userIDs.add(e.getUserID());
  33. preferenceValues.add(e.getValue());
  34. }else{
  35. coocVector = coocVectorTemp;
  36. }
  37. }
  38. if(coocVector !=null){
  39. //这个需要注意,根据共现矩阵的计算reduce聚合之后,到了这个一个Reudce分组就有且只有一个vecotr(即共现矩阵的一列或者一行,这里行和列是一样的)了。
  40. vectorAndPrefsWritable.set(coocVector, userIDs, preferenceValues);
  41. logger.info(itemID+" "+vectorAndPrefsWritable);
  42. context.write(itemID, vectorAndPrefsWritable);
  43. }
  44. }
  45. }

第五步:将combineUserVectorAndCoocMatrixJob作业的输出结果生成推荐列表

caclPartialRecomUserVectorJob作业的mapper:CaclPartialRecomUserVectorMapper

  1. package com.mapper;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
  7. import org.apache.mahout.math.VarLongWritable;
  8. import org.apache.mahout.math.Vector;
  9. import org.apache.mahout.math.VectorWritable;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. /**
  13. * 计算部分用户推荐向量
  14. * @author 曾昭正
  15. */
  16. publicclassCaclPartialRecomUserVectorMapperextendsMapper<IntWritable,VectorAndPrefsWritable,VarLongWritable,VectorWritable>{
  17. privatestaticfinalLogger logger =LoggerFactory.getLogger(CaclPartialRecomUserVectorMapper.class);
  18. @Override
  19. protectedvoid map(IntWritable itemID,VectorAndPrefsWritable values,Context context)
  20. throwsIOException,InterruptedException{
  21. Vector coocVectorColumn = values.getVector();
  22. List<Long> userIDs = values.getUserIDs();
  23. List<Float> preferenceValues = values.getValues();
  24. for(int i =0; i< userIDs.size(); i++){
  25. long userID = userIDs.get(i);
  26. float preferenceValue = preferenceValues.get(i);
  27. logger.info("userID:"+ userID);
  28. logger.info("preferenceValue:"+preferenceValue);
  29. //将共现矩阵中userID对应的列相乘,算出部分用户对应的推荐列表分数
  30. Vector preferenceParScores = coocVectorColumn.times(preferenceValue);
  31. context.write(newVarLongWritable(userID),newVectorWritable(preferenceParScores));
  32. }
  33. }
  34. }

caclPartialRecomUserVectorJob作业的combiner:ParRecomUserVectorCombiner

  1. package com.reducer;
  2. import java.io.IOException;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import org.apache.mahout.math.VarLongWritable;
  5. import org.apache.mahout.math.Vector;
  6. import org.apache.mahout.math.VectorWritable;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. /**
  10. * 将计算部分用户推荐向量的结果进行合并,将userID对应的贡现向量的分值进行相加(注意:这个只是将一个map的输出进行合并,所以这个也是只部分结果)
  11. * @author 曾昭正
  12. */
  13. publicclassParRecomUserVectorCombinerextendsReducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable>{
  14. privatestaticfinalLogger logger =LoggerFactory.getLogger(ParRecomUserVectorCombiner.class);
  15. @Override
  16. protectedvoid reduce(VarLongWritable userID,Iterable<VectorWritable> coocVectorColunms,Context context)
  17. throwsIOException,InterruptedException{
  18. Vector vectorColunms =null;
  19. for(VectorWritable coocVectorColunm : coocVectorColunms){
  20. vectorColunms = vectorColunms ==null? coocVectorColunm.get(): vectorColunms.plus(coocVectorColunm.get());
  21. }
  22. logger.info(userID +" "+newVectorWritable(vectorColunms));
  23. context.write(userID,newVectorWritable(vectorColunms));
  24. }
  25. }

caclPartialRecomUserVectorJob作业的reducer:MergeAndGenerateRecommendReducer

  1. package com.reducer;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.PriorityQueue;
  8. import java.util.Queue;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
  11. import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
  12. import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
  13. import org.apache.mahout.cf.taste.recommender.RecommendedItem;
  14. import org.apache.mahout.math.VarLongWritable;
  15. import org.apache.mahout.math.Vector;
  16. import org.apache.mahout.math.Vector.Element;
  17. import org.apache.mahout.math.VectorWritable;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. /**
  21. * 合并所有已经评分的共现矩阵
  22. * @author 曾昭正
  23. */
  24. publicclassMergeAndGenerateRecommendReducerextendsReducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable>{
  25. privatestaticfinalLogger logger =LoggerFactory.getLogger(MergeAndGenerateRecommendReducer.class);
  26. privateint recommendationsPerUser;
  27. @Override
  28. protectedvoid setup(Context context)
  29. throwsIOException,InterruptedException{
  30. recommendationsPerUser = context.getConfiguration().getInt("recomandItems.recommendationsPerUser",5);
  31. }
  32. @Override
  33. protectedvoid reduce(VarLongWritable userID,Iterable<VectorWritable> cooVectorColumn,Context context)
  34. throwsIOException,InterruptedException{
  35. //分数求和合并
  36. Vector recommdVector =null;
  37. for(VectorWritable vector : cooVectorColumn){
  38. recommdVector = recommdVector ==null? vector.get(): recommdVector.plus(vector.get());
  39. }
  40. //对推荐向量进行排序,为每个UserID找出topM个推荐选项(默认找出5个),此队列按照item对应的分数进行排序
  41. //注意下:PriorityQueue队列的头一定是最小的元素,另外这个队列容量增加1是为了为添加更大的新元素时使用的临时空间
  42. Queue<RecommendedItem> topItems =newPriorityQueue<RecommendedItem>(recommendationsPerUser+1,ByValueRecommendedItemComparator.getInstance());
  43. Iterator<Element> it = recommdVector.nonZeroes().iterator();
  44. while(it.hasNext()){
  45. Element e = it.next();
  46. int itemID = e.index();
  47. float preValue =(float) e.get();
  48. //当队列容量小于推荐个数,往队列中填item和分数
  49. if(topItems.size()< recommendationsPerUser){
  50. topItems.add(newGenericRecommendedItem(itemID, preValue));
  51. }
  52. //当前item对应的分数比队列中的item的最小分数大,则将队列头原始(即最小元素)弹出,并且将当前item:分数加入队列
  53. elseif(preValue > topItems.peek().getValue()){
  54. topItems.add(newGenericRecommendedItem(itemID, preValue));
  55. //弹出头元素(最小元素)
  56. topItems.poll();
  57. }
  58. }
  59. //重新调整队列的元素的顺序
  60. List<RecommendedItem> recommdations =newArrayList<RecommendedItem>(topItems.size());
  61. recommdations.addAll(topItems);//将队列中所有元素添加即将排序的集合
  62. Collections.sort(recommdations,ByValueRecommendedItemComparator.getInstance());//排序
  63. //输出推荐向量信息
  64. logger.info(userID+" "+newRecommendedItemsWritable(recommdations));
  65. context.write(userID,newRecommendedItemsWritable(recommdations));
  66. }
  67. }

第六步:组装各个作业关系

PackageRecomendJob

  1. package com.mapreduceMain;
  2. import java.io.IOException;
  3. import java.net.URI;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  14. import org.apache.hadoop.util.Tool;
  15. import org.apache.hadoop.util.ToolRunner;
  16. import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
  17. import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
  18. import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
  19. import org.apache.mahout.math.VarLongWritable;
  20. import org.apache.mahout.math.VectorWritable;
  21. import com.mapper.CaclPartialRecomUserVectorMapper;
  22. import com.mapper.CombineUserVectorAndCoocMatrixMapper;
  23. import com.mapper.UserVecotrSplitMapper;
  24. import com.mapper.UserVectorToCooccurrenceMapper;
  25. import com.mapper.SourceDataToItemPrefsMapper;
  26. import com.reducer.CombineUserVectorAndCoocMatrixReducer;
  27. import com.reducer.MergeAndGenerateRecommendReducer;
  28. import com.reducer.ParRecomUserVectorCombiner;
  29. import com.reducer.UserVectorToCoocurrenceReducer;
  30. import com.reducer.SourceDataToUserVectorReducer;
  31. import com.util.HadoopUtil;
  32. /**
  33. * 组装各个作业组件,完成推荐作业
  34. * @author 曾昭正
  35. */
  36. publicclassPackageRecomendJobextendsConfiguredimplementsTool{
  37. String[] dataSourceInputPath ={"/user/hadoop/z.zeng/distruteItemCF/dataSourceInput"};
  38. String[] uesrVectorOutput ={"/user/hadoop/z.zeng/distruteItemCF/uesrVectorOutput/"};
  39. String[] userVectorSpliltOutPut ={"/user/hadoop/z.zeng/distruteItemCF/userVectorSpliltOutPut"};
  40. String[] cooccurrenceMatrixOuptPath ={"/user/hadoop/z.zeng/distruteItemCF/CooccurrenceMatrixOuptPath"};
  41. String[] combineUserVectorAndCoocMatrixOutPutPath ={"/user/hadoop/z.zeng/distruteItemCF/combineUserVectorAndCoocMatrixOutPutPath"};
  42. String[] caclPartialRecomUserVectorOutPutPath ={"/user/hadoop/z.zeng/distruteItemCF/CaclPartialRecomUserVectorOutPutPath"};
  43. protectedvoid setup(Configuration configuration)
  44. throwsIOException,InterruptedException{
  45. FileSystem hdfs =FileSystem.get(URI.create("hdfs://cluster-master"), configuration);
  46. Path p1 =newPath(uesrVectorOutput[0]);
  47. Path p2 =newPath(userVectorSpliltOutPut[0]);
  48. Path p3 =newPath(cooccurrenceMatrixOuptPath[0]);
  49. Path p4 =newPath(combineUserVectorAndCoocMatrixOutPutPath[0]);
  50. Path p5 =newPath(caclPartialRecomUserVectorOutPutPath[0]);
  51. if(hdfs.exists(p1)){
  52. hdfs.delete(p1,true);
  53. }
  54. if(hdfs.exists(p2)){
  55. hdfs.delete(p2,true);
  56. }
  57. if(hdfs.exists(p3)){
  58. hdfs.delete(p3,true);
  59. }
  60. if(hdfs.exists(p4)){
  61. hdfs.delete(p4,true);
  62. }
  63. if(hdfs.exists(p5)){
  64. hdfs.delete(p5,true);
  65. }
  66. }
  67. @Override
  68. publicint run(String[] args)throwsException{
  69. Configuration conf=getConf();//获得配置文件对象
  70. setup(conf);
  71. // DistributedCache.addArchiveToClassPath(new Path("/user/hadoop/z.zeng/distruteItemCF/lib"), conf);
  72. //配置计算用户向量作业
  73. Job wikipediaToItemPrefsJob =HadoopUtil.prepareJob(
  74. "WikipediaToItemPrefsJob",
  75. dataSourceInputPath,
  76. uesrVectorOutput[0],
  77. TextInputFormat.class,
  78. SourceDataToItemPrefsMapper.class,
  79. VarLongWritable.class,
  80. VarLongWritable.class,
  81. SourceDataToUserVectorReducer.class,
  82. VarLongWritable.class,
  83. VectorWritable.class,
  84. SequenceFileOutputFormat.class,
  85. conf);
  86. //配置计算共现向量作业
  87. Job userVectorToCooccurrenceJob =HadoopUtil.prepareJob(
  88. "UserVectorToCooccurrenceJob",
  89. uesrVectorOutput,
  90. cooccurrenceMatrixOuptPath[0],
  91. SequenceFileInputFormat.class,
  92. UserVectorToCooccurrenceMapper.class,
  93. IntWritable.class,
  94. IntWritable.class,
  95. UserVectorToCoocurrenceReducer.class,
  96. IntWritable.class,
  97. VectorOrPrefWritable.class,
  98. SequenceFileOutputFormat.class,
  99. conf);
  100. //配置分割用户向量作业
  101. Job userVecotrSplitJob =HadoopUtil.prepareJob(
  102. "userVecotrSplitJob",
  103. uesrVectorOutput,
  104. userVectorSpliltOutPut[0],
  105. SequenceFileInputFormat.class,
  106. UserVecotrSplitMapper.class,
  107. IntWritable.class,
  108. VectorOrPrefWritable.class,
  109. SequenceFileOutputFormat.class,
  110. conf);
  111. //合并共现向量和分割之后的用户向量作业
  112. //这个主意要将分割用户向量和共现向量的输出结果一起作为输入
  113. String[] combineUserVectorAndCoocMatrixIutPutPath ={cooccurrenceMatrixOuptPath[0],userVectorSpliltOutPut[0]};
  114. Job combineUserVectorAndCoocMatrixJob =HadoopUtil.prepareJob(
  115. "combineUserVectorAndCoocMatrixJob",
  116. combineUserVectorAndCoocMatrixIutPutPath,
  117. combineUserVectorAndCoocMatrixOutPutPath[0],
  118. SequenceFileInputFormat.class,
  119. CombineUserVectorAndCoocMatrixMapper.class,
  120. IntWritable.class,
  121. VectorOrPrefWritable.class,
  122. CombineUserVectorAndCoocMatrixReducer.class,
  123. IntWritable.class,
  124. VectorAndPrefsWritable.class,
  125. SequenceFileOutputFormat.class,
  126. conf);
  127. //计算用户推荐向量
  128. Job caclPartialRecomUserVectorJob=HadoopUtil.prepareJob(
  129. "caclPartialRecomUserVectorJob",
  130. combineUserVectorAndCoocMatrixOutPutPath,
  131. caclPartialRecomUserVectorOutPutPath[0],
  132. SequenceFileInputFormat.class,
  133. CaclPartialRecomUserVectorMapper.class,
  134. VarLongWritable.class,
  135. VectorWritable.class,
  136. ParRecomUserVectorCombiner.class,//为map设置combiner减少网络IO
  137. MergeAndGenerateRecommendReducer.class,
  138. VarLongWritable.class,
  139. RecommendedItemsWritable.class,
  140. TextOutputFormat.class,
  141. conf);
  142. //串联各个job
  143. if(wikipediaToItemPrefsJob.waitForCompletion(true)){
  144. if(userVectorToCooccurrenceJob.waitForCompletion(true)){
  145. if(userVecotrSplitJob.waitForCompletion(true)){
  146. if(combineUserVectorAndCoocMatrixJob.waitForCompletion(true)){
  147. int rs = caclPartialRecomUserVectorJob.waitForCompletion(true)?1:0;
  148. return rs;
  149. }else{
  150. thrownewException("合并共现向量和分割之后的用户向量作业失败!!");
  151. }
  152. }else{
  153. thrownewException("分割用户向量作业失败!!");
  154. }
  155. }else{
  156. thrownewException("计算共现向量作业失败!!");
  157. }
  158. }else{
  159. thrownewException("计算用户向量作业失败!!");
  160. }
  161. }
  162. publicstaticvoid main(String[] args)throwsIOException,
  163. ClassNotFoundException,InterruptedException{
  164. try{
  165. int returnCode =ToolRunner.run(newPackageRecomendJob(),args);
  166. System.exit(returnCode);
  167. }catch(Exception e){
  168. }
  169. }
  170. }

五、总结

本blog主要说了下itemBase推荐算法的一些概念,以及如何多现有数据进行建模。其中对共现矩阵方式的推荐用MapReduce结合Mahout的内置数据类型进行了实现。写完这篇blog和对算法实现完毕后,发现Mapreduce编程虽然数据模型非常简单,只有2个过程:数据的分散与合并,但是在分散与合并的过程中可以使用自定义的各种数据组合类型使其能够完成很多复杂的功能。

参考文献:《Mahout in action》、《推荐引擎实践》

来自为知笔记(Wiz)

时间: 2024-08-05 06:28:54

(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)的相关文章

Mahout in Action 学习---基于物品的分布式推荐算法(Wikipedia数据集)

文字总结自<Mahout in Action>中文版第六章的内容 1.1 数据集介绍 Wikipedia数据集:一篇文章到另外一篇文章的链接. 可以将文章看作是用户,将该文章指向的文章视为该源文章所喜欢的物品. 类型:单向布尔型偏好. 相似性评估算法:LogLikelihoodSimilarity 关于LogLikelihoodSimilarity具体算法思想见: 对数似然比相似度 - xidianycy - 博客频道 - CSDN.NET http://blog.csdn.net/u0143

基于用户的协同过滤推荐算法

什么是推荐算法 推荐算法最早在1992年就提出来了,但是火起来实际上是最近这些年的事情,因为互联网的爆发,有了更大的数据量可以供我们使用,推荐算法才有了很大的用武之地. 最开始,所以我们在网上找资料,都是进yahoo,然后分门别类的点进去,找到你想要的东西,这是一个人工过程,到后来,我们用google,直接搜索自 己需要的内容,这些都可以比较精准的找到你想要的东西,但是,如果我自己都不知道自己要找什么肿么办?最典型的例子就是,如果我打开豆瓣找电影,或者我去 买说,我实际上不知道我想要买什么或者看

基于特征的推荐算法【转】

http://in.sdo.com/?p=2779 推荐算法准确度度量公式: 其中,R(u)表示对用户推荐的N个物品,T(u)表示用户u在测试集上喜欢的物品集合. 集合相似度度量公式(N维向量的距离度量公式): Jaccard公式: 其中,N(u)表示用户u有过正反馈的物品集合. 余弦相似度公式: UserCF公式: 其中,S(u,k)表示和用户u兴趣最接近的K个用户集合:N(i)表示对物品i有过正反馈的用户集合:w(u,v)表示用户u和用户v的兴趣相似度:r(v,i)表示用户v对物品i的兴趣.

Mahout推荐算法API详解

前言 用Mahout来构建推荐系统,是一件既简单又困难的事情.简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口:困难是因为我们不了解算法细节,很难去根据业务的场景进行算法配置和调优. 本文将深入算法API去解释Mahout推荐算法底层的一些事. 1. Mahout推荐算法介绍 Mahoutt推荐算法,从数据处理能力上,可以划分为2类: 单机内存算法实现 基于Hadoop的分步式算法实现 1). 单机内存算法实现 单机内存算法实现:就是在单机下运行的算法

Mahout推荐算法API详解【一起学Mahout】

阅读导读: 1.mahout单机内存算法实现和分布式算法实现分别存在哪些问题? 2.算法评判标准有哪些? 3.什么会影响算法的评分? 1. Mahout推荐算法介绍 Mahout推荐算法,从数据处理能力上,可以划分为2类: 单机内存算法实现 基于Hadoop的分步式算法实现 1). 单机内存算法实现 单机内存算法实现:就是在单机下运行的算法,是由cf.taste项目实现的,像我们熟悉的UserCF,ItemCF都支持单机内存运行,并且参数可以灵活配置.单机算法的基本实例,请参考文章:用Maven

Mahout推荐算法API具体解释【一起学Mahout】

阅读导读: 1.mahout单机内存算法实现和分布式算法实现分别存在哪些问题? 2.算法评判标准有哪些? 3.什么会影响算法的评分? 1. Mahout推荐算法介绍 Mahout推荐算法,从数据处理能力上,能够划分为2类: 单机内存算法实现 基于Hadoop的分步式算法实现 1). 单机内存算法实现 单机内存算法实现:就是在单机下执行的算法,是由cf.taste项目实现的,像我们熟悉的UserCF,ItemCF都支持单机内存执行.而且參数能够灵活配置.单机算法的基本实例.请參考文章:用Maven

[转]Mahout推荐算法API详解

Mahout推荐算法API详解 Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等. 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占

Mahout推荐算法之ItemBased

Mahout推荐之ItemBased 一.   算法原理 (一)    基本原理 如下图评分矩阵所示:行为user,列为item. 图(1) 该算法的原理: 1.  计算Item之间的相似度. 2.  对用户U做推荐 公式(一) Map tmp ; Map tmp1 ; for(item a  in userRatedItems){ rate  =userforItemRate(a) ListsimItem =getSimItem(a); For(Jin simItem){ Item b =j;

Mahout推荐算法基础

转载自(http://www.geek521.com/?p=1423) Mahout推荐算法分为以下几大类 GenericUserBasedRecommender 算法: 1.基于用户的相似度 2.相近的用户定义与数量 特点: 1.易于理解 2.用户数较少时计算速度快 GenericItemBasedRecommender 算法: 1.基于item的相似度 特点: 1.item较少时就算速度更快 2.当item的外部概念易于理解和获得是非常有用 SlopeOneRecommender(itemB