pagerank算法的MapReduce实现

pagerank是一种不容易被欺骗的计算Web网页重要性的工具,pagerank是一个函数,它对Web中(或者至少是抓取并发现其中连接关系的一部分web网页)的每个网页赋予一个实数值。他的意图在于,网页 的pagerank越高,那么它就越重要。并不存在一个固定的pagerank分配算法。

对于pagerank算法的推到我在这里不想做过多的解释,有兴趣的可以自己查看资料看看,这里我直接给出某个网页pagerank的求解公式:

P(n)=a/G+(1-a)*求和(P(m)/C(m))     (m属于L(n))

其中:G 为网页的数量,P(n)为页面n的pagerank值,C(m)为页面m含有的连接数量,a为随机跳转因子,其中求和符号不能打印,我直接使用文字给出,L(n)表示存在到页面n链接的页面的集合。

下面给出pagerank的MapReduce实现,其中输入文件必须要求的格式为:

输入文件 pagerank.txt:

页面id 初始pagerank值;{对于页面n,n所包含的链接所指向的页面id集合(即出链集合)};{对于页面n,包含页面n链接的页面id集合(即入链集合)};包含链接个数

注意:这中间一定是分号分隔

1  
0.2;{2,4};{5};2

2 0.2;{3,5};{1,5};2

3 0.2;{4};{2,5};1

4 0.2;{5};{1,3};1

5 0.2;{1,2,3};{2,4};3

分布式缓存文件 rankCache.txt

rank 页面id:页面pagerank值,页面id:页面pagerank值,页面id:页面pagerank值....

rank 1:0.2,2:0.2,3:0.2,4:0.2,5:0.2

介绍完了两个输入文件,下面是pagerank算法的MapReduce实现:当然输出路径是自己设置

package soft.project;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PageRank {

	private final static String localInputPath = "/home/hadoop/test/mapReduce/pagerank.txt";
// private final static String hdfsInputPath = "hdfs:/192.168.0.1:9000/user/hadoop/pagerank";
	private final static String localOutputPath = "/home/hadoop/test/mapReduce/pagerank";
	private final static String hdfsOutputPath = "hdfs:/192.168.0.1:9000/user/hadoop/pagerank";
	private final static String rankCachePath="/home/hadoop/test/mapReduce/rankCache.txt";
	private static List<RankResult> pagerankList=new Vector<RankResult>();
	private final static double random = 0.85;    //随机跳转因子
	private final static double stopFactor=0.001;   //上一次和这次每个网页pagerank差值的绝对值之和小于stopFactor则停止迭代
	private final static long G = 5;        //初始网页数量

	private static class RankResult{
		private String order="";
		private double rank=0;

		@SuppressWarnings("unused")
		public RankResult() {}
		public RankResult(String order,double rank){
			this.order=order;
			this.rank=rank;
		}
	}

	private static class PRMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		private String keyinfo = "";
		private String valueinfo = "";

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] split = value.toString().split(";");
			String outLink[] = split[1].split("[{}]")[1].split(",");
			double pagerank = Double.parseDouble(split[0].split("\\s")[1]);
			double c = Double.parseDouble(split[3]);
			double k = pagerank / c;
			/*System.out.println("page:" + split[0].split("\\s")[0] + "pagerank:"
					+ pagerank + "  c:" + c);*/
			for (String page : outLink) {
				context.write(new Text(page), new Text(String.valueOf(k)));
			//	System.out.println("page:" + page + "  ragerank:" + k);
			}
			writeNode(value, context);
		}

		private void writeNode(Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String split[] = value.toString().split("\\s");
			valueinfo = split[1].split(";", 2)[1];
			keyinfo = split[0];
			context.write(new Text(keyinfo), new Text(valueinfo));
			/*System.out.println("keyinfo:" + keyinfo + "  valueinfo:"
					+ valueinfo);*/
		}
	}

	private static class PRCombiner extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> value,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String v = "";
			double pagerank = 0;
			for (Text text : value) {
				String valueString = text.toString();
				if (valueString.contains("{")) {
					v = valueString;
				} else {
					pagerank += Double.parseDouble(valueString);
				}
			}
			if (v.equals("")) {
				context.write(key, new Text(String.valueOf(pagerank)));
			} else {
				String s = pagerank + ";" + v;
				context.write(key, new Text(s));
			}

		}

	}

	private static class PRReducer extends Reducer<Text, Text, Text, Text> {

		private List<Double> rankList=new Vector<Double>((int)G);          //是否每个job都是重新创建一个rankList和rankMap???
		private Hashtable<Integer, Double> rankMap=new Hashtable<Integer, Double>();

		@Override
		protected void setup(Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			Configuration conf=context.getConfiguration();
			int order=Integer.parseInt(conf.get("order"));
			System.out.println(".................... invoke setup().................");
			Path cachePath[]=DistributedCache.getLocalCacheFiles(conf);
			if(cachePath==null || cachePath.length>0){
				for(Path p:cachePath){
					System.out.println("reduce cache:"+p.toString());
				}
				System.out.println("cachePath length:"+cachePath.length);
				getRankList(cachePath[order-1].toString(), context);
			}else {
				System.out.println("cachePath ==null || cachePath's lenth is 0");
			}
		}

		@Override
		protected void reduce(Text key, Iterable<Text> value,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			double pagerank = 0;
			String node = "";
			for (Text v : value) {
				String pString = v.toString();
				System.out.println("reduce key="+key.toString()+"  reduce value=" + pString);
				String split[] = pString.split(";");
				if (split.length == 1) { // pString is the same as 0.2+

					pagerank += Double.parseDouble(pString);
				} else if (!split[0].contains("{")) { // pString is the same as 0.2;{2,4};{1,3};2
					pagerank += Double.parseDouble(split[0]);
					node = pString.split(";", 2)[1];
				} else if (split[0].contains("{")) { // pString is the same as	 {2,4};{1,3};2
					node = pString;
				}
			}
			pagerank = random / G + (1 - random) * pagerank;
			node = pagerank + ";" + node;
			System.out.println("reduce  key=" + key.toString() + "  node_value=" + node);
			rankMap.put(Integer.parseInt(key.toString()), pagerank);         //将每一个节点的pagerank值加入rankMap
			if (!node.equals(""))
				context.write(key, new Text(node));
		}

		@Override
		protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			System.out.println(".................invoke cleanup().......................");
			System.out.println("rankList.size="+rankList.size()+"  rankMap.size="+rankMap.size());
			Configuration configuration=context.getConfiguration();
			String order=configuration.get("order");
			System.out.println("order:"+order+"  invoke cleanup().............");
			if(rankList.size()==G && rankMap.size()==G){
				double gammar=0;
				int length=rankList.size();
				int orderNum=Integer.parseInt(order);
				if(orderNum>1){
					for(int i=1;i<=length;i++){
						gammar+=Math.abs(rankMap.get(i)-rankList.get(i-1));
					}
				String s="第"+orderNum+"次和第"+(orderNum-1)+"次迭代差值:";
				pagerankList.add(new RankResult(s,gammar));
				}
				flushCacheFile(rankMap);
			}
			else{
				System.out.println("rankList.size()!=G || rankMap.size()!=G "
						+ "rankList.size():"+rankList.size()+"  rankMap.size():"+rankMap.size());
			}
		}

		private void flushCacheFile(Hashtable<Integer, Double> rankMap){
			File file =new File(rankCachePath);
			StringBuffer stringBuffer=new StringBuffer();
			int length=rankMap.size();
			if(length==G){
				BufferedWriter writer=null;
				stringBuffer.append("rank").append("\t");
				for(int i=1;i<=G;i++){
					stringBuffer.append(i+":"+rankMap.get(i)+",");
				}
				String string=stringBuffer.toString().substring(0,stringBuffer.toString().length()-2);
				System.out.println("Stringbuffer:"+string);
				try {
					writer=new BufferedWriter(new FileWriter(file, false));
					writer.write(string);
					writer.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}else{
				System.out.println("reduce rankMap 长度不够G,不执行flushCacheFile");
			}
		}

		private void  getRankList(String  path,Reducer<Text, Text, Text, Text>.Context context) {
			FileReader reader = null;
			try {
				reader = new FileReader(new File(path));
			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}
			BufferedReader in=new BufferedReader(reader);
			StringBuffer stringBuffer=new StringBuffer();
			String string="";
			try {
				while((string=in.readLine())!=null){
					stringBuffer.append(string);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			String value=stringBuffer.toString().split("\t")[1];
			System.out.println("reduce  rankList value:"+value);
			String split[]=value.split(",");
			for(String pagerank:split)
				rankList.add(Double.parseDouble(pagerank.split(":")[1]));
		}

	}

	private static boolean deleteOutput(boolean isLocalFile, Configuration conf)
			throws IOException {
		if (isLocalFile) {
			File file = new File(localOutputPath);
			return deleteFile(file);
		} else if (!isLocalFile) {
			FileSystem hdfs = FileSystem.get(conf);
			boolean isDelete = hdfs.delete(new Path(hdfsOutputPath), true);
			return isDelete;
		} else
			return false;
	}

	private static boolean deleteFile(File file) {
		if (file.isFile()) {
			return file.delete();
		} else if (file.isDirectory()) {
			String filePath = file.getAbsolutePath();
			String[] list = file.list();
			for (String subFile : list) {
				String path = filePath + "/" + subFile;
				File sonFile = new File(path);
				deleteFile(sonFile);
			}
			file.delete();
		}
		return file.exists() ? false : true;
	}

	public static Job getJob(Configuration conf,String input,String output) throws IOException {
		//Configuration conf=new Configuration();
		/*if (deleteOutput(true,conf)) {
			System.out.println("delete output success");
		} else {
			System.out.println("delete fail,exit program");
			System.exit(1);
		}*/

		Job job = new Job(conf, "pagerank");
		job.setJarByClass(PageRank.class);

		DistributedCache.addCacheFile(new Path(rankCachePath).toUri(), conf);

		job.setMapperClass(PRMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setCombinerClass(PRCombiner.class);

		job.setReducerClass(PRReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));

		return job;
	}

	public static void run(int number) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration configuration = new Configuration();          //有问题?每个job公用一个配置??
		deleteOutput(true, configuration);
		int i=1;
		String input="";
		String output="";
		while(i<=number){
			System.out.println("i="+i+"  pagerankList.length:"+pagerankList.size());
			if(i>=3 && pagerankList.get(i-3).rank<=stopFactor){
				System.out.println("********pagerankList.get("+(i-3)+").rank="+pagerankList.get(i-3).rank+"<="+stopFactor+" "
						+ "满足迭代终止条件,结束迭代**************************");
				break;
			}
			if(i==1){
				input=localInputPath;
				output=localOutputPath+"/trash";
				System.out.println("*******************第0次MapReduce***************************************");
				configuration.set("order",String.valueOf(0));
				Job job=getJob(configuration,input, output);
				job.waitForCompletion(true);
			}else {
				input=output;
			}
			output=localOutputPath+"/"+i;
			System.out.println("*******************第"+i+"次MapReduce***************************************");
			configuration.set("order",String.valueOf(i));       //位置很重要,切记一定要放在这里!!!
			Job job=getJob(configuration,input, output);
			job.waitForCompletion(true);
			i++;
		}
	}

	public static void printGap(){
		int num=pagerankList.size();
			Iterator<RankResult> iterator=pagerankList.iterator();
			int i=1;
			while(iterator.hasNext()){
				RankResult rankResult=iterator.next();
				System.out.print(rankResult.order+rankResult.rank+"    ");
				if(i%3==0)
					System.out.println();
				i++;
			}
	}
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		  int n=10;
		  long start=System.currentTimeMillis();
		  PageRank.run(n);
		  PageRank.printGap();
		  long  end=System.currentTimeMillis();
		  System.out.println("\n迭代"+n+"次一共花费:"+(end-start)/60000+"分"+((end-start)%60000)/1000+"秒"+(end-start)%1000+"毫秒");
	}
}
时间: 2024-08-04 22:21:31

pagerank算法的MapReduce实现的相关文章

MapReduce原理——PageRank算法Java版

Page Rank就是MapReduce的来源,下文是一个简单的计算PageRank的示例. import java.text.DecimalFormat; /**  * Created by jinsong.sun on 2014/7/15.  */ public class PageRankCaculator {     public static void main(String[] args) {         double[][] g = calcG(genS(), 0.85);  

【大创_社区划分】——PageRank算法MapReduce实现

PageRank算法的分析和Python实现参考:http://blog.csdn.net/gamer_gyt/article/details/47443877 举例来讲: 假设每个网页都有一个自己的默认PR值,相当于人为添加给它是一种属性,用来标识网页的等级或者重要性,从而依据此标识达到排名目的.假设有ID号是1的一个网页,PR值是10,假如它产生了到ID=3,ID=6,ID=8 ,ID=9这4个网页的链接.那么可以理解为ID=1的网页向ID=3,6,8,9的4个网页各贡献了2.5的PR值.如

PageRank算法简介及Map-Reduce实现

PageRank对网页排名的算法,曾是Google发家致富的法宝.以前虽然有实验过,但理解还是不透彻,这几天又看了一下,这里总结一下PageRank算法的基本原理. 一.什么是pagerank PageRank的Page可是认为是网页,表示网页排名,也可以认为是Larry Page(google 产品经理),因为他是这个算法的发明者之一,还是google CEO(^_^).PageRank算法计算每一个网页的PageRank值,然后根据这个值的大小对网页的重要性进行排序.它的思想是模拟一个悠闲的

MapReduce 之PageRank 算法概述、设计思路和源码分析

早就对PageRank 算法感兴趣,但一直都是轮廓性的概念,没有具体深入学习.最近要学习和总结MapReduce 的实例,就又把PageRank 算法重新学习了一遍,并基于MapReduce 进行了实现. 1. PageRank是什么 PageRank,网页排名,右脚网页级别.是以Google 公司创始人Larry Page 之姓来命名.PageRank 计算每一个网页的PageRank值,并根据PageRank值的大小对网页的重要性进行排序.PageRank的基本思想是:对于一个网页A来说,链

MapReduce实现PageRank算法(邻接矩阵法)

前言 之前写过稀疏图的实现方法,这次写用矩阵存储数据的算法实现,只要会矩阵相乘的话,实现这个就很简单了.如果有不懂的可以先看一下下面两篇随笔. MapReduce实现PageRank算法(稀疏图法) Python+MapReduce实现矩阵相乘 算法实现 我们需要输入两个矩阵A和B,我一开始想的是两个矩阵分别存在两个文件里然后分别读取,但是我发现好像不行,无法区分打上A.B的标签. 所以我一开始就把A.B矩阵合起来存在一个文件里,一次读取. mapper.py 1 #!/usr/bin/env

【转】深入浅出PageRank算法

原文链接 http://segmentfault.com/a/1190000000711128 PageRank算法 PageRank算法是谷歌曾经独步天下的“倚天剑”,该算法由Larry Page和Sergey Brin在斯坦福大学读研时发明的, 论文点击下载: The PageRank Citation Ranking: Bringing Order to the Web. 本文首先通过一些参考文献引出问题,然后给出了PageRank的几种实现算法, 最后将其推广至在MapReduce框架下

Hadoop应用开发实战(flume应用开发、搜索引擎算法、Pipes、集群、PageRank算法)

Hadoop是2013年最热门的技术之一,通过北风网robby老师<深入浅出Hadoop实战开发>.<Hadoop应用开发实战>两套课程的学习,普通Java开发人员可以在最快的时间内提升工资超过15000.成为一位完全精通Hadoop应用开发的高端人才. Hadoop是什么,为什么要学习Hadoop? Hadoop是一个分布式系统基础架构,由Apache基金会开发.用户可以在不了解分布式底层细节的情况下,开发分布式程序.充分利用集群的威力高速运算和存储.Hadoop实现了一个分布式

PageRank 算法简介

有两篇文章一篇讲解(下面copy)< PageRank算法简介及Map-Reduce实现>来源:http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html 另一篇<PageRank简介-串讲Q&A.docx> http://docs.babel.baidu.com/doc/ee14bd65-ba71-4ebb-945b-cf279717233b PageRank对网页排名的算法,曾是Google发家致富的

PageRank算法和谷歌搜索讲解

吴裕雄 PageRank算法实际上就是Google使用它来计算每个网页价值的算法. Google每次的搜索结果都有成百上千万甚至上亿个相关的查询网页链接.如果将所有的查询结果不加区分,就立即显示给客户看的话,那么用户很有可能看到的就是一些没有多大用的东西,那么Google也就肯定会遭到淘汰的. 那么如何向用户显示对他们有用的网页链接呢?Google想出了一个办法--就是给那成百上千万个网页计算出一个值.这个值呢就叫做PageRank(页面价值得分).通过计算这个值呢,可以区分出每个网页价值有多高