mahout推荐15-在hadoop上运行MapReduce

详情可以参考《Mahout实战》的第六章

代码:

package mahout.wiki;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator;
import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.map.OpenIntLongHashMap;
import org.apache.mahout.math.VectorWritable;

import com.demo.WordCount;
import com.demo.Dedup.Reduce;
import com.demo.WordCount.IntSumReducer;
import com.demo.WordCount.TokenizerMapper;

public class WikiTest {

	//解析WIkipediatri链接文件的mapper
	public static class WikipediaToItemPrefsMapper
		extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable>{
		private static final Pattern NUMBERS = Pattern.compile("(\\d+)");
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String line = value.toString();
			Matcher m = NUMBERS.matcher(line);
			m.find();

			VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
			VarLongWritable itemID = new VarLongWritable();

			while(m.find()){
				itemID.set(Long.parseLong(m.group()));
				context.write(userID, itemID);
			}
		}
	}
	// 从用户的物品偏好中生成Vector的reducer
	public static class WikipediaToUserVectorReducer extends
		Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable>{
		@Override
		protected void reduce(VarLongWritable userId,
				Iterable<VarLongWritable> itemPrefs,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE,100);
			for (VarLongWritable itemPref : itemPrefs) {
				userVector.set((int)itemPref.get(), 1.0f);
			}
			context.write(userId, new VectorWritable(userVector));
		}
	}
	// 计算共现关系的mapper
	public static class UserVectorToCooccurrenceMapper extends
		Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable>{
		@Override
		protected void map(VarLongWritable userId, VectorWritable userVector,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stu
			Iterator<Element> it = userVector.get().nonZeroes().iterator();
			while(it.hasNext()){
				int index1 = it.next().index();
				Iterator<Element> it2 = userVector.get().nonZeroes().iterator();
				while (it2.hasNext()){
					int index2 = it2.next().index();
					context.write(new IntWritable(index1), new IntWritable(index2));
				}
			}
		}
	}
	// 计算共生关系的reducer
	public static class UserVectorToCooccurrenceReducer extends
		Reducer<IntWritable, IntWritable, IntWritable, VectorWritable>{
		@Override
		protected void reduce(IntWritable itemIndex1, Iterable<IntWritable> itemIndex2s,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Vector cooccurenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE,100);
			for (IntWritable intWritable : itemIndex2s) {
				int itemIndex2 = intWritable.get();
				cooccurenceRow.set(itemIndex2, cooccurenceRow.get(itemIndex2) + 1.0);
			}
			context.write(itemIndex1, new VectorWritable(cooccurenceRow));
		}
	}
	//封装共现关系列
	public static class CooccurenceColumnWrapperMapper extends
		Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{
		@Override
		protected void map(IntWritable key, VectorWritable value,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			context.write(key, new VectorOrPrefWritable());
		}
	}
	// 分割用户向量
	public static class UserVetorSplitterMapper extends
		Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{
		@Override
		protected void map(VarLongWritable key, VectorWritable value,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			long userId = key.get();
			Vector userVector = value.get();
			Iterator<Element> it = userVector.nonZeroes().iterator();
			IntWritable itemIndexWritable = new IntWritable();

			while(it.hasNext()){
				Vector.Element e = it.next();
				int itemIndex = e.index();
				float pref = (float) e.get();
				itemIndexWritable.set(itemIndex);
				context.write(itemIndexWritable, new VectorOrPrefWritable(userId, pref));
			}
		}
	}
	// 计算部分推荐向量
	public static class PartialMultiplyMapper extends
		Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable>{
		@Override
		protected void map(IntWritable key, VectorAndPrefsWritable value,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Vector cooccurenceColumn = value.getVector();
			List<Long> userIDs = value.getUserIDs();
			List<Float> prefValues = value.getValues();

			for (int i = 0; i < userIDs.size(); i++) {
				long userId = userIDs.get(i);
				float prefValue = prefValues.get(i);
				Vector partialProduct = cooccurenceColumn.times(prefValue);
				context.write(new VarLongWritable(userId), new VectorWritable(partialProduct));
			}
		}
	}
	//实现部分乘机的combiner
	public static class AggregateCombiner extends
		Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable>{
		@Override
		protected void reduce(VarLongWritable key,
				Iterable<VectorWritable> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Vector partial = null;
			for (VectorWritable vectorWritable : values) {
				partial = partial == null ? vectorWritable.get() : partial.plus(vectorWritable.get());
			}
			context.write(key, new VectorWritable(partial));
		}
	}
	// 处理来自向量的推荐结果
	public static class AggregateAndRecommendReducer extends
		Reducer<VarLongWritable, VectorWritable, VarLongWritable, RecommendedItemsWritable>{
		private OpenIntLongHashMap indexItemIDMap;
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			indexItemIDMap = new OpenIntLongHashMap(1000);
		}
		@Override
		protected void reduce(VarLongWritable key,
				Iterable<VectorWritable> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			Vector recommendationVector = null;
			for (VectorWritable vectorWritable : values) {
				recommendationVector = recommendationVector == null ? vectorWritable.get() : recommendationVector.plus(vectorWritable.get());
			}
			// recommendationsPerUser+1 此处没有发现这个变量,意思应该是每个用户要推荐多少个商品
			int recommendationsPerUser = 2;
			Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser+1,
					Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()));

			Iterator<Element> recommendationVectorIterator = recommendationVector.nonZeroes().iterator();
			while(recommendationVectorIterator.hasNext()){
				Vector.Element element = recommendationVectorIterator.next();
				int index = element.index();
				float value = (float) element.get();
				if ( topItems.size() < recommendationsPerUser){
					//此处indexItemIDMap没有定义,大致意思应该是由所有物品组成的一个map,这里使用一个随机值进行替代
					//topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value));
					topItems.add(new GenericRecommendedItem(new Random().nextLong(), value));
				}else if (value > topItems.peek().getValue()){
					//topItems.add(new GenericRecommendedItem(indexItemIDMap.get(index), value));
					topItems.add(new GenericRecommendedItem(new Random().nextLong(), value));
					topItems.poll();
				}
			}

			List<RecommendedItem> recommendations = new ArrayList<RecommendedItem>(topItems.size());
			recommendations.addAll(topItems);
			Collections.sort(recommendations,ByValueRecommendedItemComparator.getInstance());
			context.write(key, new RecommendedItemsWritable(recommendations));
		}
	}
	public static void main(String[] args) throws Exception {
	    Configuration conf = new Configuration();

	    RecommenderJob job = new RecommenderJob();
	    job.setConf(conf);
	    //组装各个Mapper和reducer

	    //各个Mapper和reduce是相互联系的,至于如何通过RecommenderJob将他们组合在一起,目前没有找到方式。
	    //因为又是相互独立的,所以可以对每个map+reduce单独作为一个作业来进行测试。和普通的hadoop作业一样。
	    //这里就不说了。另外Mahout In Action 所用的版本是0.5,而我这边采用的是0.9,里面有一些类已经发生变化了。注意。
	  }
}

recommenderJob的流程图:就是苦于找不到如何配置,信息都是cmd形式。

mahout推荐15-在hadoop上运行MapReduce

时间: 2024-11-09 01:59:30

mahout推荐15-在hadoop上运行MapReduce的相关文章

在Hadoop上运行基于RMM中文分词算法的MapReduce程序

原文:http://xiaoxia.org/2011/12/18/map-reduce-program-of-rmm-word-count-on-hadoop/ 在Hadoop上运行基于RMM中文分词算法的MapReduce程序 23条回复 我知道这个文章标题很“学术”化,很俗,让人看起来是一篇很牛B或者很装逼的论文!其实不然,只是一份普通的实验报告,同时本文也不对RMM中文分词算法进行研究.这个实验报告是我做高性能计算课程的实验里提交的.所以,下面的内容是从我的实验报告里摘录出来的,当作是我学

在hadoop上运行java文件

hadoop 2.x版本 编译:javac -d . -classpath /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar TestGetPathMark.java 在com的同级目录上建立manifest.mf 在里面写上Main-Class: com.test.path.mark.TestGetPathMark d打包:然后保存并执行jar -cvfm test.jar manifest.mf com/ 然后执行hadoop jar t

原生态在Hadoop上运行Java程序

第一种:原生态运行jar包1,利用eclipse编写Map-Reduce方法,一般引入Hadoop-core-1.1.2.jar.注意这里eclipse里没有安装hadoop的插件,只是引入其匝包,该eclipse可以安装在windows或者linux中,如果是在windows中安装的,且在其虚拟机安装的linux,可以通过共享文件夹来实现传递.2,编写要测试的数据,如命名为tempdata3,利用eclipse的export来打包已编写好的,在利用eclipse打包jar的时候,只需要选择sr

Hadoop YARN上运行MapReduce程序

(1)配置集群 (a)配置hadoop-2.7.2/etc/hadoop/yarn-env.sh 配置一下JAVA_HOME export JAVA_HOME=/home/hadoop/bigdatasoftware/jdk1.8.0_161 (b)配置yarn-site.xml <!-- reducer获取数据的方式 --> <property> <name>yarn.nodemanager.aux-services</name> <value>

让python在hadoop上跑起来

duang~好久没有更新博客啦,原因很简单,实习啦-好吧,我过来这边上班表示觉得自己简直弱爆了.第一周,配置环境:第二周,将数据可视化,包括学习了excel2013的一些高大上的技能,例如数据透视表和mappower绘制3d地图,当然本来打算是在tkinter里面运用matplotlib制作一个交互式的图表界面,然而,画出来的图简直不是excel2013能比的,由于对界面和matplotlib研究的也不是很深,短时间是没法研究出来,上周真是多灾多难:现在,第三周,开始接触hadoop,虽说大多数

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

写在前面 前文:用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题. 为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1). 假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HAD

第六篇:Eclipse上运行第一个Hadoop实例 - WordCount(单词统计程序)

需求 计算出文件中每个单词的频数.要求输出结果按照单词的字母顺序进行排序.每个单词和其频数占一行,单词和频数之间有间隔. 比如,输入两个文件,其一内容如下: hello world hello hadoop hello mapreduce 另一内容如下: bye world bye hadoop bye mapreduce 对应上面给出的输入样例,其输出样例为: bye   3 hadoop 2 hello 3 mapreduce 2 world 2 方案制定 对该案例,可设计出如下的MapRe

[转]hadoop运行mapreduce作业无法连接0.0.0.0/0.0.0.0:10020

14/04/04 17:15:12 INFO mapreduce.Job:  map 0% reduce 0% 14/04/04 17:19:42 INFO mapreduce.Job:  map 41% reduce 0% 14/04/04 17:19:53 INFO mapreduce.Job:  map 64% reduce 0% 14/04/04 17:19:55 INFO mapreduce.Job:  map 52% reduce 0% 14/04/04 17:19:57 INFO 

Hadoop之 - 剖析 MapReduce 作业的运行机制(MapReduce 2)

在0.20版本及更早期的系列中,mapred.job.tracker 决定了执行MapReduce程序的方式.如果这个配置属性被设置为local(默认值),则使用本地的作业运行器.运行器在耽搁JVM上运行整个作业.它被设计用来在小的数据集上测试和运行MapReduce程序. 如果 mapred.job.tracker 被设置为用冒号分开的主机和端口对(主机:端口),那么该配置属性就被解释为一个jobtracker地址,运行器则将作业提交给该地址的jobtracker. Hadoop 2.x引入了