Mapreduce -- PageRank

PageRank 简单理解为网页排名,但是网页是根据什么排名的,接下来就简单介绍一下。

举例:

假设网页 A 的内容中有网页 B,C 和 D 的链接,并且 A 的 PageRank的值为0.25。

那接下里我们就可以计算在网页 A 中的其他网页的PageRank的值了。我们拿网页 B 来进行说明,

在网页 A 中的网页 B 的 PageRank 为 0.25 * (1/n) 其中n为网页 A 中网页链接数,结果则为 0.25*(1/3)。

可以简单理解为A的PageRank被B,C 和 D 平分了,B分到了0.25的三分之一。

然后将所有网页中的关于网页B的pagerank值求出来,就是网页B真实的pagerank了。

但是上面的例子没有考虑到如下的特殊情况:

1 网页A中只有指向自己的网页链接。

2 网页A中没有任何链接。

如果出现以上情况,会导致pagerank结果不准确。

所以出现了下面的公式:

result = sum * n + (1-n)/N

sum 为上面计算出来的,如网页B在所有网页中的pagerank值的总和。

n 可以理解为停留在当前网页继续进行网页跳转浏览的概率

1-n 可以理解为不访问当前网页的任何链接,从浏览器的地址栏输入,转去其他网页的概率。

N 为网页的数量

下面介绍通过MapReduce实现PageRank

简单的流程分析:

Map

取一行数据进行说明

A    B    C    D

网页A中有网页B,C,D的链接;

刚开始给网页A一个默认的pagerank值,然后根据这个值计算其他网页链接在网页A中的PageRank。处理后的数据如下:

A    0.25    B    C    D
B    0.25*(1/3)
C    0.25*(1/3)
D    0.25*(1/3)

Reduce

然后通过 Reduce 计算出各个网页在其他网页中的 PageRank 的总和 sum,然后代入公式计算实际的PageRank,并更新 A 0.25 B C D 中的数据,这里将0.25更新为计算出来真实的 PageRank。

重复计算各个网页的 PageRank 的值,直到 PageRank 的结果收敛,值趋于稳定。

A    0.25    B    C    D  =>A    ?    B    C    D

测试数据:

A    B       C       D
B    A       D
C    C
D    B       C

测试代码:

RunJob.java

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.Text;
  5 import org.apache.hadoop.mapreduce.Job;
  6 import org.apache.hadoop.mapreduce.Mapper;
  7 import org.apache.hadoop.mapreduce.Reducer;
  8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11
 12 import java.io.IOException;
 13
 14 /**
 15  * Created by Edward on 2016/7/13.
 16  */
 17 public class RunJob {
 18
 19     public static enum ValueEnum{
 20         CLOSURE_VALUE;
 21     }
 22
 23     public static void main(String[] args)
 24     {
 25
 26         //access hdfs‘s user
 27         System.setProperty("HADOOP_USER_NAME","root");
 28
 29         Configuration conf = new Configuration();
 30         conf.set("fs.defaultFS", "hdfs://node1:8020");
 31
 32         try {
 33             int i = 0;
 34             while(true) {
 35                 i++;
 36                 conf.setInt("count", i);
 37
 38                 FileSystem fs = FileSystem.get(conf);
 39                 Job job = Job.getInstance(conf);
 40                 job.setJarByClass(RunJob.class);
 41                 job.setMapperClass(MyMapper.class);
 42                 job.setReducerClass(MyReducer.class);
 43
 44                 //需要指定 map out 的 key 和 value
 45                 job.setOutputKeyClass(Text.class);
 46                 job.setOutputValueClass(Text.class);
 47
 48                 //设置输入类的
 49                 job.setInputFormatClass(KeyValueTextInputFormat.class);
 50                 if(i==1)
 51                     FileInputFormat.addInputPath(job, new Path("/test/pagerank/input"));
 52                 else
 53                     FileInputFormat.addInputPath(job, new Path("/test/pagerank/output/pr"+(i-1)));
 54
 55                 Path path = new Path("/test/pagerank/output/pr"+i);
 56                 if (fs.exists(path))//如果目录存在,则删除目录
 57                 {
 58                     fs.delete(path, true);
 59                 }
 60                 FileOutputFormat.setOutputPath(job, path);
 61
 62                 boolean b = job.waitForCompletion(true);
 63                 if (b) {
 64                     long closure =job.getCounters().findCounter(ValueEnum.CLOSURE_VALUE).getValue();
 65                     double avg= closure/4000.0;//计算收敛的平均值,浮动小于0.001则认为收敛
 66                     System.out.println("执行第"+i+"次, closure="+closure+",avg="+avg);
 67                     if(avg < 0.001){
 68                         System.out.println("总共执行了"+i+"次,之后收敛");
 69                         break;
 70                     }
 71                 }
 72             }
 73
 74         } catch (Exception e) {
 75             e.printStackTrace();
 76         }
 77     }
 78
 79
 80     public static class MyMapper extends Mapper<Text, Text, Text, Text> {
 81         @Override
 82         protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
 83             AdjacentNodes adjacentNodes  = new AdjacentNodes();
 84             int count = context.getConfiguration().getInt("count", 1);
 85
 86             if(count == 1)
 87             {
 88                 AdjacentNodes firstAdj = new AdjacentNodes();
 89                 firstAdj.setValue(1.0);
 90                 firstAdj.setNodes(value.toString().split("\t"));
 91                 adjacentNodes = firstAdj;
 92             }
 93             else
 94             {
 95                 //格式化 value: 1.0 B C D
 96                 adjacentNodes.formatInfo(value.toString());
 97             }
 98             //A 1.0 B C D
 99             context.write(key, new Text(adjacentNodes.toString()));
100
101             double pagerank = adjacentNodes.getValue()/adjacentNodes.getNum();
102             for(int i=0; i<adjacentNodes.getNum(); i++)
103             {
104                 String node = adjacentNodes.getNodes()[i];
105                 //B 0.333
106                 context.write(new Text(node), new Text(pagerank+""));
107             }
108         }
109     }
110
111     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
112         @Override
113         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
114
115             AdjacentNodes adjacentNodes = new AdjacentNodes();
116
117             Double sum = 0.0;
118             for(Text adj : values)
119             {
120                 String str = adj.toString();
121                 if(str.split("\t").length>1)
122                 {
123                     adjacentNodes.formatInfo(str);
124                 }
125                 else{
126                     sum += Double.parseDouble(str);   //对节点的 pagerank 求和,
127                 }
128             }
129
130             //计算pagerank
131             double n = 0.80;
132             double pagerank = sum * n + (1-n)/4.0;// 计算pagerank 公式 sum * n + (1-n)/N
133
134             //计算收敛的差值
135             int closure =(int)(Math.abs(pagerank - adjacentNodes.getValue()) * 1000);
136             //通过context.getCounter(ENUM)方法,每次执行reduce将closure的值进行累加,结果传递给主函数,
137
138             context.getCounter(ValueEnum.CLOSURE_VALUE).increment(closure);
139
140             adjacentNodes.setValue(pagerank);
141             context.write(key, new Text(adjacentNodes.toString()));
142         }
143     }
144 }

AdjacentNodes.java
/**
 * Created by Edward on 2016/7/14.
 */
public class AdjacentNodes {

    double value = 0.0;
    int num = 0;
    String[] nodes = null;

    public void formatInfo(String str)
    {
        String[] val = str.split("\t");
        this.setValue(Double.parseDouble(val[0]));
        this.setNum(val.length-1);

        if(this.num != 0)
            this.nodes = new String[this.num];

        for(int i=1; i<val.length; i++)
        {
            this.nodes[i-1] = val[i];
        }
    }

    public boolean isEmpty()
    {
        if(this.num == 0)
            return true;
        else
            return false;
    }

    public void setNodes(String[] nodes) {
        this.nodes = nodes;
        this.num = nodes.length;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public double getValue() {
        return value;
    }

    public int getNum() {
        return num;
    }

    public String[] getNodes() {
        return nodes;
    }

    @Override
    public String toString() {

        StringBuffer stringBuffer = new StringBuffer(this.value+"");

        for(int i=0; i<this.num; i++)
        {
            stringBuffer.append("\t"+this.nodes[i]);
        }

        return stringBuffer.toString();
    }
}

结果数据:

A       0.10135294176208584     B       C       D
B       0.12838069628609527     A       D
C       0.6560527651143326      C
D       0.12838069628609527     B       C

总共执行了24次,之后收敛;

PageRank值越高,越值得推荐。

时间: 2025-01-03 16:16:06

Mapreduce -- PageRank的相关文章

pagerank算法的MapReduce实现

pagerank是一种不容易被欺骗的计算Web网页重要性的工具,pagerank是一个函数,它对Web中(或者至少是抓取并发现其中连接关系的一部分web网页)的每个网页赋予一个实数值.他的意图在于,网页 的pagerank越高,那么它就越重要.并不存在一个固定的pagerank分配算法. 对于pagerank算法的推到我在这里不想做过多的解释,有兴趣的可以自己查看资料看看,这里我直接给出某个网页pagerank的求解公式: P(n)=a/G+(1-a)*求和(P(m)/C(m))     (m属

mapReduce编程之google pageRank

1 pagerank算法介绍 1.1 pagerank的假设 数量假设:每个网页都会给它的链接网页投票,假设这个网页有n个链接,则该网页给每个链接平分投1/n票. 质量假设:一个网页的pagerank值越大,则它的投票越重要.表现为将它的pagerank值作为它投票的加权值. 1.2 矩阵表示形式 ......... 最终PR值会收敛为稳定值. 1.3 deadends和spider traps deadends:一个网页没有链接,则最终PR值会收敛为全为0: spider traps:一个网页

海量数据挖掘——第1讲.MapReduce and PageRank

本栏目(数据挖掘)下海量数据挖掘专题是个人对Coursera公开课海量数据挖掘(2015)的学习心得与笔记.所有内容均来自Coursera公开课Mining Massive Datasets中Jure Leskovec, Anand Rajaraman以及Jeff Ullman老师的讲解.(https://class.coursera.org/mmds-002/lecture) 第1讲-------MapReduce and PageRank 一.Distributed File System

MapReduce原理——PageRank算法Java版

Page Rank就是MapReduce的来源,下文是一个简单的计算PageRank的示例. import java.text.DecimalFormat; /**  * Created by jinsong.sun on 2014/7/15.  */ public class PageRankCaculator {     public static void main(String[] args) {         double[][] g = calcG(genS(), 0.85);  

【大创_社区划分】——PageRank算法MapReduce实现

PageRank算法的分析和Python实现参考:http://blog.csdn.net/gamer_gyt/article/details/47443877 举例来讲: 假设每个网页都有一个自己的默认PR值,相当于人为添加给它是一种属性,用来标识网页的等级或者重要性,从而依据此标识达到排名目的.假设有ID号是1的一个网页,PR值是10,假如它产生了到ID=3,ID=6,ID=8 ,ID=9这4个网页的链接.那么可以理解为ID=1的网页向ID=3,6,8,9的4个网页各贡献了2.5的PR值.如

PageRank算法简介及Map-Reduce实现

PageRank对网页排名的算法,曾是Google发家致富的法宝.以前虽然有实验过,但理解还是不透彻,这几天又看了一下,这里总结一下PageRank算法的基本原理. 一.什么是pagerank PageRank的Page可是认为是网页,表示网页排名,也可以认为是Larry Page(google 产品经理),因为他是这个算法的发明者之一,还是google CEO(^_^).PageRank算法计算每一个网页的PageRank值,然后根据这个值的大小对网页的重要性进行排序.它的思想是模拟一个悠闲的

MapReduce 之PageRank 算法概述、设计思路和源码分析

早就对PageRank 算法感兴趣,但一直都是轮廓性的概念,没有具体深入学习.最近要学习和总结MapReduce 的实例,就又把PageRank 算法重新学习了一遍,并基于MapReduce 进行了实现. 1. PageRank是什么 PageRank,网页排名,右脚网页级别.是以Google 公司创始人Larry Page 之姓来命名.PageRank 计算每一个网页的PageRank值,并根据PageRank值的大小对网页的重要性进行排序.PageRank的基本思想是:对于一个网页A来说,链

mapreduce 实现pagerank

输入格式: A 1 B,C,D B 1 C,Dmap: B A 1/3 C A 1/3 D A 1/3 A |B,C,D C B 1/2 D B 1/2 B |C,Dreduce: B (1-0.85)+0.85*1/3 C,D C (1-0.85)+0.85*5/6 D (1-0.85)+0.85*5/6 A (1-0.85)+0.85*1 B,C,D import java.io.IOException; import org.apache.hadoop.conf.Configuration

MapReduce实现PageRank算法(邻接矩阵法)

前言 之前写过稀疏图的实现方法,这次写用矩阵存储数据的算法实现,只要会矩阵相乘的话,实现这个就很简单了.如果有不懂的可以先看一下下面两篇随笔. MapReduce实现PageRank算法(稀疏图法) Python+MapReduce实现矩阵相乘 算法实现 我们需要输入两个矩阵A和B,我一开始想的是两个矩阵分别存在两个文件里然后分别读取,但是我发现好像不行,无法区分打上A.B的标签. 所以我一开始就把A.B矩阵合起来存在一个文件里,一次读取. mapper.py 1 #!/usr/bin/env