《hadoop进阶》PeopleRank从社交关系中挖掘价值用户

转载请注明出处: 转载自  Thinkgamer的CSDN博客: blog.csdn.net/gamer_gyt

代码下载地址:点击查看

1:PageRank 与 PeopleRank

2:需求分析:挖掘CSDN博客的价值用户

3:算法模型:PeopleRank算法

4:架构设计:从数据准备到PR算法的MR化

5:程序开发:hadoop实现PeopleRank算法

一:PageRank与PeopleRank

PageRank算法是Google从垃圾堆里捡黄金的重量级算法,它让谷歌的搜索引擎一度成为No.1,当然谷歌所公开的PR算法毕竟是过去式了,既然它能公开,那么肯定不是它最新的算法演化版本,但是不管怎样,我们依旧从中学习到很多创新和独特的思想。

PR算法主要用于网页评分计算,它利用互联网的网页之间的连接关系,给网页进行打分,最终PR值越高的网页价值也就越高。

自2012以来,中国开始进入社交网络的时代,开心网,人人网,新浪微博,腾讯微博,微信等社交网络应用,开始进入大家的生活。最早是由“抢车位”,“偷菜”等社交游戏带动的社交网络的兴起,如今人们会更多的利用社交网络,获取信息和分享信息。我们的互联网,正在从以网页信息为核心的网络,向着以人为核心的网络转变着。

于是有人就提出了,把PageRank模型应用于社交网络,定义以人为核心的个体价值。这样PageRank模型就有了新的应用领域,同时也有了一个新的名字PeopleRank。

二 . 需求分析:挖掘CSDN博客的价值用户

如上图所示,CSDN博客的每个用户都有关注人数和粉丝人数,这在一定程度上和网页之间的连接关系是十分相似的,我个人比较菜,粉丝数太少,当然我希望看过我博客的人,如果你感觉不错的话是否可以关注以下呢,闲话少说,这种相互关注的关系在一定程度上体现了用户的价值,粉丝数目越多的人,在一定程度上,其本身所具有的重要性。

顺便给大家看一个CSDN排名47的牛人

这刚好符合PR算法,我们是否可以考虑使用PeopleRank算法,利用用户之间的关注关系,来计算不同用户的PR值,从而提取出“价值”更高的用户呢?答案是肯定的。

三 . 算法模型:PeopleRank算法

那么什么是PageRank算法?当然本篇博客并不是来谈PR算法的,而是将如何利用hadoop实现pr算法从而挖掘有价值的用户,所以以下只是简单的对pr算法的描述,更多还请自己搜索查看(以下部分摘自:http://blog.jobbole.com/71431/)

互联网中的网页可以看出是一个有向图,其中网页是结点,如果网页A有链接到网页B,则存在一条有向边A->B,下面是一个简单的示例:

这个例子中只有四个网页,如果当前在A网页,那么悠闲的上网者将会各以1/3的概率跳转到B、C、D,这里的3表示A有3条出链,如果一个网页有k条出链,那么跳转任意一个出链上的概率是1/k,同理D到B、C的概率各为1/2,而B到C的概率为0。一般用转移矩阵表示上网者的跳转概率,如果用n表示网页的数目,则转移矩阵M是一个n*n的方阵;如果网页j有k个出链,那么对每一个出链指向的网页i,有M[i][j]=1/k,而其他网页的M[i][j]=0;上面示例图对应的转移矩阵如下:

初试时,假设上网者在每一个网页的概率都是相等的,即1/n,于是初试的概率分布就是一个所有值都为1/n的n维列向量V0,用V0去右乘转移矩阵M,就得到了第一步之后上网者的概率分布向量MV0,(nXn)*(nX1)依然得到一个nX1的矩阵。下面是V1的计算过程:

注意矩阵M中M[i][j]不为0表示用一个链接从j指向i,M的第一行乘以V0,表示累加所有网页到网页A的概率即得到9/24。得到了V1后,再用V1去右乘M得到V2,一直下去,最终V会收敛,即Vn=MV(n-1),上面的图示例,不断的迭代,最终V=[3/9,2/9,2/9,2/9]’:

四 .架构设计:从数据准备到PR算法的MR化

这里我采用的是用户和用户之间的关注关系,例如 用户A 关注 用户B

1:数据采集

使用Python爬虫采集CSDN博客的用户和用户的关注关系,这里我使用的采集程序架构图如下:

因为我这个PR计算是我做的另外一个项目(博客统计分析系统:github地址 在线演示地址:点击查看
该地址会在一定的时间内有效)的其中的一部分,所以数据也是从其中摘取的,原本的采集程序是为了采集所有CSDN博客用户udell信息和博客内容的,当然由于各种关系,最终采集的用户数量为七万左右,最终采集到的数据格式如下:

用户信息数据:

博客信息数据:

2:数据整理

我从中随机抽取了100个用户,同时利用一定的技术手段,给这个100个用户之间赋予一定的关注关系,整理后的数据如下,主要包括两部分,第一部分是用户之间的关注关系(用户id,关注的用户id),第二是给每个用户赋予一定的初始值(用户id,初始用户pr值全部为1)

(1)                        
                  (2)   

3:PR算法的MR化设计

我么以下面这个图来说一下

ID=1的页面链向2,3,4页面,所以一个用户从ID=1的页面跳转到2,3,4的概率各为1/3

ID=2的页面链向3,4页面,所以一个用户从ID=2的页面跳转到3,4的概率各为1/2

ID=3的页面链向4页面,所以一个用户从ID=3的页面跳转到4的概率各为1

ID=4的页面链向2页面,所以一个用户从ID=4的页面跳转到2的概率各为1

(1):构造邻接矩阵

(2):构造邻接矩阵

(3):转换为概率矩阵(转移矩阵)

(4):阻尼系数概率矩阵

(5):进行迭代计算

至于迭代的次数有自己设定,并不是越多越好,根据六度分割理论来讲,一般迭代6次

五 .
程序开发:hadoop实现PeopleRank算法

程序架构如下:

个人代码目录:

下面我们具体说一说每一个文件是干什么的

day7_author100_mess.csv:源文件,由dataEtl.java处理成我们所需要的数据格式

people.csv,peoplerank.txt :day7_author100_mess.csv处理后得到的文件

prjob.java:程序调度的主函数

prMatrix.java:数据转换为矩阵形式

prJisuan.java: 计算每个用户的PR值

prNormal.java:PR值的标准化

prSort.java:对转化后的PR值进行排序

最终的输出文件目录

下面只对部分代码进行展示,更多请前往github下载:点击查看

dataEtl.java

package pagerankjisuan;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;

public class dataEtl {

	public static void main() throws IOException {

		File f1 = new File("MyItems/pagerankjisuan/people.csv");
		if(f1.isFile()){
			f1.delete();
		}
		File f = new File("MyItems/pagerankjisuan/peoplerank.txt");
		if(f.isFile()){
			f.delete();
		}
		//打开文件
		File file = new File("MyItems/pagerankjisuan/day7_author100_mess.csv");
		//定义一个文件指针
		BufferedReader reader = new BufferedReader(new FileReader(file));
		try {
			String line=null;
			//判断读取的一行是否为空
			while( (line=reader.readLine()) != null)
			{
					String[] userMess = line.split( "," );
					//第一字段为id,第是个字段为粉丝列表
					String userid = userMess[0];
					if(userMess.length!=0){
							if(userMess.length==11)
							{
									int i=0;
									String[] focusName = userMess[10].split("\\|"); //  | 为转义符
									for (i=1;i < focusName.length; i++)
										{
											write(userid,focusName[i]);
//											System.out.println(userid+ "           " + focusName[i]);
										}
							}
							else
							{
									int j =0;
									String[] focusName = userMess[9].split("\\|"); //  | 为转义符
									for (j=1;j < focusName.length; j++)
									{
										write(userid,focusName[j]);
//										System.out.println(userid+ "           " + focusName[j]);
									}
							}
					}
				}
			}
			catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			finally
			{
					reader.close();

					//etl peoplerank.txt
					for(int i=1;i<=100;i++){
						FileWriter writer = new FileWriter("MyItems/pagerankjisuan/peoplerank.txt",true);
						writer.write(i + "\t" + 1 + "\n");
						writer.close();
					}
			}
			System.out.println("OK..................");
	}

	private static void write(String userid, String nameid) {
		// TODO Auto-generated method stub
		//定义写文件,按行写入
		try {
			if(!nameid.contains("\n")){
				FileWriter writer = new FileWriter("MyItems/pagerankjisuan/people.csv",true);
				writer.write(userid + "," + nameid + "\n");
				writer.close();
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

prjob.java

package pagerankjisuan;

import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;

/*
 * 调度函数
 */
public class prjob {

	public static final String  HDFS = "hdfs://127.0.0.1:9000";

	public static void main(String[] args) {
		Map  <String, String> path= new HashMap<String, String>();	

		path.put("page" ,"/home/thinkgamer/MyCode/hadoop/MyItems/pagerankjisuan/people.csv");
		path.put("pr" ,"/home/thinkgamer/MyCode/hadoop/MyItems/pagerankjisuan/peoplerank.txt");

		path.put("input", HDFS + "/mr/blog_analysic_system/people");          // HDFS的目录
        path.put("input_pr", HDFS + "/mr/blog_analysic_system/pr");       // pr存储目录
        path.put("tmp1", HDFS + "/mr/blog_analysic_system/tmp1");             // 临时目录,存放邻接矩阵
        path.put("tmp2", HDFS + "/mr/blog_analysic_system/tmp2");           // 临时目录,计算到得PR,覆盖input_pr

        path.put("result", HDFS + "/mr/blog_analysic_system/result");                   // 计算结果的PR

        path.put("sort", HDFS +  "/mr/blog_analysic_system/sort");  //最终排序输出的结果

        try {
        	   dataEtl.main();
            prMatrix.main(path);
            int iter = 3;           // 迭代次数
            for (int i = 0; i < iter; i++) {
                prJisuan.main(path);
            }
           prNormal.main(path);
           prSort.main(path);

        	} catch (Exception e) {
        		e.printStackTrace();
        	}
        	System.exit(0);
    	}

	 	public static String scaleFloat(float f) {// 保留6位小数
	        DecimalFormat df = new DecimalFormat("##0.000000");
	        return df.format(f);
	    }
}

prSort.java

package pagerankjisuan;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class prSort {
	/**
     * @param args
     * @throws IOException
     * @throws IllegalArgumentException
     * @throws InterruptedException
     * @throws ClassNotFoundException
     */
    public static class myComparator extends Comparator {
        @SuppressWarnings("rawtypes")
        public int compare( WritableComparable a,WritableComparable b){
            return -super.compare(a, b);
        }
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

    public static class sortMap extends Mapper<Object,Text,FloatWritable,IntWritable>{
        public void map(Object key,Text value,Context context) throws NumberFormatException, IOException, InterruptedException{
            String[] split = value.toString().split("\t");
            context.write(new FloatWritable(Float.parseFloat(split[1])),new IntWritable(Integer.parseInt(split[0])) );
        }
    }
    public static class Reduce extends Reducer<FloatWritable,IntWritable,IntWritable,FloatWritable>{
        public void reduce(FloatWritable key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
            for (IntWritable text : values) {
                context.write( text,key);
            }
        }
    }

	public static void main(Map<String, String> path) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub  

		String input = path.get("result");
		String output = path.get("sort");
		hdfsGYT hdfs = new hdfsGYT();
		hdfs.rmr(output);

        Job job = new Job();
        job.setJarByClass(prSort.class);
        // 1
        FileInputFormat.setInputPaths(job, new Path(input) );
        // 2
        job.setMapperClass(sortMap.class);
        job.setMapOutputKeyClass(FloatWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 3
        // 4  自定义排序
        job.setSortComparatorClass( myComparator.class);
        // 5
        job.setNumReduceTasks(1);
        // 6
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(FloatWritable.class);
        // 7
        FileOutputFormat.setOutputPath(job, new Path(output));
        // 8
        System.exit(job.waitForCompletion(true)? 0 :1 );
    }
}

最终排序输出的结果为:

时间: 2024-11-10 15:52:32

《hadoop进阶》PeopleRank从社交关系中挖掘价值用户的相关文章

社交关系与大数据的结合

大家还没搞清楚PC的时候,移动互联网来了,还没搞清楚移动互联网的时候,大数据来了. "有个不太靠谱的命题:如何让赵本山和迈克尔乔丹搭上关系?其实很简单,通过分析两个人的社交圈子,兴趣爱好等,最终可以找出一条线路能让他们两个人认识,这就是隐藏其中的大数据魅力之一点点-- 随着互联网的冲击,UGC(用户产生内容)不断发展,社交网络已经不断普及并深入人心,用户可以随时随地在网络上分享内容,由此产生了海量的用户数据.这些数据并不是我们想象中的那样冷冰冰.枯燥的数据,而是更加活生生.有趣的数据:这些数据不

Hadoop的配置过程(虚拟机中的伪分布模式)

1引言 hadoop如今已经成为大数据处理中不可缺少的关键技术,在如今大数据爆炸的时代,hadoop给我们处理海量数据提供了强有力的技术支撑.因此,了解hadoop的原理与应用方法是必要的技术知识. hadoop的基础原理可参考如下的三篇论文: The Google File System, 2003 MapReduce: Simplified Data Processing on Large Clusters, 2004 Bigtable: A Distributed Storage Syst

姚劲波:58看中腾讯庞大用户群及其社交关系

58同城CEO姚劲波 核心观点: 1.腾讯和58同城双方风格接近,互相认可,10天内火速达成交易.腾讯出资7.36亿美元认购58同城19.9%的摊薄后股权,获得一个董事会席位. 2.58同城主要看中腾讯能带来58所欠缺的资源:一是腾讯有庞大用户群,腾讯在PC和移动都有很大的流量,更重要的是58同城的用户之间缺乏社交关系,和腾讯合作能引进腾讯的社交关系. 3. 引入腾讯投资后,腾讯成为58同城的第二大股东,并不意味着58同城"站队".腾讯不会干预58的战略决策,58同城仍然会坚持独立发展

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUnit 框架 MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用.MRUnit针对不同测试对象使用不同的Driver: MapDriver:针对单独的Map测试  ReduceDriver:针对单独的Reduce测试    MapReduceDri

移动互联网的垂直化与面向组织的社交关系

从基于移动互联网的社交发现的角度上看: 我们使用QQ,QQ是自由的社交关系,可以从陌生人开始,你可以和陌生人搭讪,从而成为网友,可以认为是陌生人之间的社交,当然陌陌也是如此了. 我们使用微信,微信是相对自由的社交关系,从熟人开始,相识进而成为网友,可以认为是熟人之间的社交. What is the next?what is others? 是面向组织的社交关系,是相对封闭的社交关系. 通过研究面向组织的社交关系,提出了基于超对称关系的社交网络模型,从二元模型到多元模型,将现实中的组织关系映射成网

PTA数据结构与算法题目集(中文) 7-36 社交网络图中结点的“重要性”计算 (30 分)

PTA数据结构与算法题目集(中文)  7-36 社交网络图中结点的“重要性”计算 (30 分) 7-36 社交网络图中结点的“重要性”计算 (30 分) 在社交网络中,个人或单位(结点)之间通过某些关系(边)联系起来.他们受到这些关系的影响,这种影响可以理解为网络中相互连接的结点之间蔓延的一种相互作用,可以增强也可以减弱.而结点根据其所处的位置不同,其在网络中体现的重要性也不尽相同. “紧密度中心性”是用来衡量一个结点到达其它结点的“快慢”的指标,即一个有较高中心性的结点比有较低中心性的结点能够

Makefile中头文件在依赖关系中作用

摘于:http://bbs.csdn.net/topics/120024677 (1)在makefile的依赖关系中用不用体现.h头文件?(2)如果在依赖关系中要体现.h头文件,应该体现到什么层次?==============================(1)在makefile的依赖关系中用不用体现.h头文件?============================== 下面是我的一些认识: 头文件中定义的是接口(函数接口,文件外全局变量和宏定义),它的作用是向调用文件封装函数的实现过程.在

Entity Framework 6 Recipes 2nd Edition(10-9)译 -&gt; 在多对多关系中为插入和删除使用存储过程

10-9. 在多对多关系中为插入和删除使用存储过程 问题 想要在一个无载荷的多对多关系中使用存储过程(存储过程只影响关系的连接表) 解决方案 假设有一个多对多关系的作者( Author)表和书籍( Book)表. 用连接表AuthorBook来做多对多关系,如 Figure 10-11.所示: Figure 10-11. A payload-free, many-to-many relationship between an Author and a Book 当把表生成模型,那么模型就如Fig

理清Linux中的各种用户ID关系

在Eclipse中,出现"Access Restriction: The Type BASE64Encoder Is Not Accessible Due To Restriction"错误. 解决方法: 点击Window-->Preferences-->Java-->Compiler-->Errors/Warns,设置Deprecated And Restricted API 参数值. 问题解决. 理清Linux中的各种用户ID关系,布布扣,bubuko.co