详情可以参考《Mahout实战》的第六章
代码:
package mahout.wiki; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable; import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob; import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator; import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem; import org.apache.mahout.cf.taste.recommender.RecommendedItem; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.Vector.Element; import org.apache.mahout.math.map.OpenIntLongHashMap; import org.apache.mahout.math.VectorWritable; import com.demo.WordCount; import com.demo.Dedup.Reduce; import com.demo.WordCount.IntSumReducer; import com.demo.WordCount.TokenizerMapper; public class WikiTest { //解析WIkipediatri链接文件的mapper public static class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable>{ private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while(m.find()){ itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } // 从用户的物品偏好中生成Vector的reducer public static class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable>{ @Override protected void reduce(VarLongWritable userId, Iterable<VarLongWritable> itemPrefs,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE,100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int)itemPref.get(), 1.0f); } context.write(userId, new VectorWritable(userVector)); } } // 计算共现关系的mapper public static class UserVectorToCooccurrenceMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable>{ @Override protected void map(VarLongWritable userId, VectorWritable userVector,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stu Iterator<Element> it = userVector.get().nonZeroes().iterator(); while(it.hasNext()){ int index1 = it.next().index(); Iterator<Element> it2 = userVector.get().nonZeroes().iterator(); while (it2.hasNext()){ int index2 = it2.next().index(); context.write(new IntWritable(index1), new IntWritable(index2)); } } } } // 计算共生关系的reducer public static class UserVectorToCooccurrenceReducer extends Reducer<IntWritable, IntWritable, IntWritable, VectorWritable>{ @Override protected void reduce(IntWritable itemIndex1, Iterable<IntWritable> itemIndex2s,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector cooccurenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE,100); for (IntWritable intWritable : itemIndex2s) { int itemIndex2 = intWritable.get(); cooccurenceRow.set(itemIndex2, cooccurenceRow.get(itemIndex2) + 1.0); } context.write(itemIndex1, new VectorWritable(cooccurenceRow)); } } //封装共现关系列 public static class CooccurenceColumnWrapperMapper extends Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{ @Override protected void map(IntWritable key, VectorWritable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(key, new VectorOrPrefWritable()); } } // 分割用户向量 public static class UserVetorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{ @Override protected void map(VarLongWritable key, VectorWritable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long userId = key.get(); Vector userVector = value.get(); Iterator<Element> it = userVector.nonZeroes().iterator(); IntWritable itemIndexWritable = new IntWritable(); while(it.hasNext()){ Vector.Element e = it.next(); int itemIndex = e.index(); float pref = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userId, pref)); } } } // 计算部分推荐向量 public static class PartialMultiplyMapper extends Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable>{ @Override protected void map(IntWritable key, VectorAndPrefsWritable value,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector cooccurenceColumn = value.getVector(); List<Long> userIDs = value.getUserIDs(); List<Float> prefValues = value.getValues(); for (int i = 0; i < userIDs.size(); i++) { long userId = userIDs.get(i); float prefValue = prefValues.get(i); Vector partialProduct = cooccurenceColumn.times(prefValue); context.write(new VarLongWritable(userId), new VectorWritable(partialProduct)); } } } //实现部分乘机的combiner public static class AggregateCombiner extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable>{ @Override protected void reduce(VarLongWritable key, Iterable<VectorWritable> values,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector partial = null; for (VectorWritable vectorWritable : values) { partial = partial == null ? vectorWritable.get() : partial.plus(vectorWritable.get()); } context.write(key, new VectorWritable(partial)); } } // 处理来自向量的推荐结果 public static class AggregateAndRecommendReducer extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, RecommendedItemsWritable>{ private OpenIntLongHashMap indexItemIDMap; @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub indexItemIDMap = new OpenIntLongHashMap(1000); } @Override protected void reduce(VarLongWritable key, Iterable<VectorWritable> values,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Vector recommendationVector = null; for (VectorWritable vectorWritable : values) { recommendationVector = recommendationVector == null ? vectorWritable.get() : recommendationVector.plus(vectorWritable.get()); } // recommendationsPerUser+1 此处没有发现这个变量,意思应该是每个用户要推荐多少个商品 int recommendationsPerUser = 2; Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser+1, Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance())); Iterator<Element> recommendationVectorIterator = recommendationVector.nonZeroes().iterator(); while(recommendationVectorIterator.hasNext()){ Vector.Element element = recommendationVectorIterator.next(); int index = element.index(); float value = (float) element.get(); if ( topItems.size() < recommendationsPerUser){ //此处indexItemIDMap没有定义,大致意思应该是由所有物品组成的一个map,这里使用一个随机值进行替代 //topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value)); topItems.add(new GenericRecommendedItem(new Random().nextLong(), value)); }else if (value > topItems.peek().getValue()){ //topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value)); topItems.add(new GenericRecommendedItem(new Random().nextLong(), value)); topItems.poll(); } } List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size()); recommendations.addAll(topItems); Collections.sort(recommendations,ByValueRecommendedItemComparator.getInstance()); context.write(key, new RecommendedItemsWritable(recommendations)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); RecommenderJob job = new RecommenderJob(); job.setConf(conf); //组装各个Mapper和reducer //各个Mapper和reduce是相互联系的,至于如何通过RecommenderJob将他们组合在一起,目前没有找到方式。 //因为又是相互独立的,所以可以对每个map+reduce单独作为一个作业来进行测试。和普通的hadoop作业一样。 //这里就不说了。另外Mahout In Action 所用的版本是0.5,而我这边采用的是0.9,里面有一些类已经发生变化了。注意。 } }
recommenderJob的流程图:就是苦于找不到如何配置,信息都是cmd形式。
mahout推荐15-在hadoop上运行MapReduce
时间: 2024-11-09 01:59:30