学习日志---基于hadoop实现PageRank

PageRank简单介绍:

其值是通过其他值得指向值所决定,具体例子如下:

对应于每个mapReduce的计算:

由mapper算出每个点所指节点的分值,由reduce整个key相同的,由公式算出。

三角号表示的是迭代两次之间计算的差值,若小于某个值则计算完成,求的每个点的pagerank值。

自我实现的代码:如下

输入的数据分为:

input1.txt

A,B,D
B,C
C,A,B
D,B,C

表示每行第一个点所指向的节点,在reducer的setup会用到,构建hashmap供使用。

input2.txt

A,0.25,B,D
B,0.25,C
C,0.25,A,B
D,0.25,B,C

中间多的数字,表示当前每个节点的pagerank值,其文件可无,因为可以由上面的文件计算生成,有四个节点,即1/4。

自我实现的代码:

package bbdt.steiss.pageRank;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PageRank {

    public static class PageMapper extends Mapper<LongWritable, Text, Text, Text>{
        
        private Text averageValue = new Text();
        private Text node = new Text();
        
        @Override
        //把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            String string = value.toString();
            String [] ss = string.split(",");
            int length = ss.length;
            double pageValue = Double.parseDouble(ss[1]);
            double average = pageValue/(length-2);
            averageValue.set(String.valueOf(average));
            int i = 2;
            while(i<=length-1){
                node.set(ss[i]);
                context.write(node,averageValue);
                i++;
            }
            
        }
    } 
    
    public static class PageReducer extends Reducer<Text, Text, Text, Text>{
        
        private HashMap<String, String> content;
        private Text res = new Text();
        
        //reducer工作前,key相同的会分组分在一组,用迭代器操作,从总的图中找到所有该节点的分pagerank值
        //利用公式计算该pagerank值,输出。因为下一次要用,因此输出可以凑近一些,把结果都放在value里输出
        @Override
        protected void reduce(Text text, Iterable<Text> intIterable,
                Context context)
                throws IOException, InterruptedException {
            double sum = 0.0;
            double v = 0.0;
            for (Text t : intIterable) {
                v = Double.parseDouble(t.toString());
                sum = sum + v;
            }
            double a = 0.85;
            double result = (1-a)/4 + a*sum;
            String sRes = String.valueOf(result);
            String back = content.get(text.toString());
            String front = text.toString();
            String comp = front + "," + sRes + back;
            res.set(comp);
            context.write(null,res);
            
        }
        
        @Override
        //reducer的初始化时,先把节点对应文件的数据,存在hashmap中,也就是content中,供每次reduce方法使用,相当于数据库的作用
        //方便查询
        protected void setup(Context context)
                throws IOException, InterruptedException {
            URI[] uri = context.getCacheArchives();
            content = new HashMap<String, String>();
            for(URI u : uri)
            {
                FileSystem fileSystem = FileSystem.get(u.create("hdfs://hadoop1:9000"), context.getConfiguration());
                FSDataInputStream in = null;
                in = fileSystem.open(new Path(u.getPath()));
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
                String line;
                while((line = bufferedReader.readLine())!=null)
                {
                    int index = line.indexOf(",");
                    String first = line.substring(0,index);
                    String last = line.substring(index,line.length());
                    content.put(first, last);
                }
                
            }
        }
    }
    
    public static void main(String[] args) throws Exception{
        
        //接受路径文件
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        Path cachePath = new Path(args[2]);
        double result = 100;
        int flag = 0;
        //制定差值多大时进入循环
        while(result>0.1)
        {
            if(flag == 1)
            {
                //初次调用mapreduce不操作这个
                //这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件
                copyFile();
                flag = 0;
            }
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            
            job.setJarByClass(PageRank.class);
            job.setMapperClass(PageMapper.class);
            job.setReducerClass(PageReducer.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            
            FileInputFormat.addInputPath(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
            job.addCacheArchive(cachePath.toUri());
            outputPath.getFileSystem(configuration).delete(outputPath, true);
            job.waitForCompletion(true);
            
            String outpathString = outputPath.toString()+"/part-r-00000";
            //计算两个文件的各节点的pagerank值差
            result = fileDo(inputPath, new Path(outpathString));
            flag = 1;
        }
            System.exit(0);   
    }
    
    //计算两个文件的每个节点的pagerank差值,返回
    public static double fileDo(Path inputPath,Path outPath) throws Exception
    {
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
         FileSystem fs = FileSystem.get(conf);
         FSDataInputStream in1 = null;
         FSDataInputStream in2 = null;
         in1 = fs.open(inputPath);
         in2 = fs.open(outPath);
         BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
         BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));
         String s1 = null;
         String s2 = null;
         ArrayList<Double> arrayList1 = new ArrayList<Double>();
         ArrayList<Double> arrayList2 = new ArrayList<Double>();
         while ((s1 = br1.readLine()) != null)
         {
             String[] ss = s1.split(",");
             arrayList1.add(Double.parseDouble(ss[1]));
         }
         br1.close();
         
         while ((s2 = br2.readLine()) != null)
         {
             String[] ss = s2.split(",");
             arrayList2.add(Double.parseDouble(ss[1]));
         }
         double res = 0;
         
         for(int i = 0;i<arrayList1.size();i++)
         {
             res = res + Math.abs(arrayList1.get(i)-arrayList2.get(i));
         }
         
         return res;   
    }
    
    //将输出文件复制到输入文件中
    public static void copyFile() throws Exception
    {
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
         FileSystem fs = FileSystem.get(conf);
         FSDataInputStream in1 = null;
         in1 = fs.open(new Path("/output/part-r-00000"));
         BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
         //这里删除需要打开hdfs在/input目录下的权限操作,非常重要
         //“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件
         fs.delete(new Path("/input/test2.txt"),true);
         //建立一个新文件,返回流
         FSDataOutputStream fsDataOutputStream = fs.create(new Path("/input/test2.txt"));
         BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream));
         String s1 = null;
         //写出并写入
         while ((s1 = br1.readLine()) != null)
         { 
             bw1.write(s1);
             bw1.write("\n");
         }
         bw1.close();
         fsDataOutputStream.close();
         br1.close();
         in1.close();
    }
}

注意:

在本地操作hdfs时,进行文件的删除和添加,需要打开hdfs的文件操作权限,

这里删除需要打开hdfs在/input目录下的权限操作,非常重要
“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件

打开/input路径的操作权限

时间: 2024-12-21 00:17:40

学习日志---基于hadoop实现PageRank的相关文章

Hadoop学习日志- install hadoop

资料来源 : http://www.tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm Hadoop 安装 创建新用户 $ su password: # useradd hadoop -g root # passwd hadoop New passwd: Retype new passwd 修改/etc/sudoers 赋予sudo 权限 设置ssh SSH Setup and Key Generation SSH setup is re

基于Hadoop的数据分析综合管理平台之Hadoop、HBase完全分布式集群搭建

能够将热爱的技术应用于实际生活生产中,是做技术人员向往和乐之不疲的事. 现将前期手里面的一个项目做一个大致的总结,与大家一起分享.交流.进步.项目现在正在线上运行,项目名--基于Hadoop的数据分析综合管理平台. 项目流程整体比较清晰,爬取数据(txt文本)-->数据清洗-->文本模型训练-->文本分类-->热点话题发现-->报表"实时"展示,使用到的技术也是当今互联网公司常用的技术:Hadoop.Mahout.HBase.Spring Data Had

打造基于hadoop的网站日志分析系统(5)之spark在日志分析系统里的简单应用

1.下载spark和运行 wget http://apache.fayea.com/apache-mirror/spark/spark-1.0.0/spark-1.0.0-bin-hadoop2.tgz 我这里下载的是1.0.0版,由于我们只是测试spark的用法所以不需要配置spark集群,只需把下好的文件解压,进入bin/文件夹. spark支持scala,java和python. scala和java输入命令:./spark-shell python 输入命令 ./pyspark 进入控制

基于Flask框架搭建视频网站的学习日志(二)

基于Flask框架搭建视频网站的学习日志(二)2020/02/02 一.初始化 所有的Flask程序都必须创建一个程序实例,程序实例是Flask类的对象 from flask import Flask app = Flask(__name__) Flask 类的构造函数Flask()只有一个必须指定的参数,即程序主模块或包的名字.在大多数程序中,python的__name__变量就是所需的值.(Flask这个参数决定程序的根目录,以便稍后能够找到相对与程序根目录的资源文件位置)--<Flask

基于Hadoop离线大数据分析平台项目实战

基于Hadoop离线大数据分析平台项目实战  课程学习入口:http://www.xuetuwuyou.com/course/184 课程出自学途无忧网:http://www.xuetuwuyou.com 课程简介: 某购物电商网站数据分析平台,分为收集数据.数据分析和数据展示三大层面.其中数据分析主要依据大数据Hadoop生态系统常用组件进行处理,此项目真实的展现了大数据在企业中实际应用. 课程内容 (1)文件收集框架 Flume ①Flume 设计架构.原理(三大组件) ②Flume 初步使

Hadoop学习第一次:hadoop概念

1.大数据学习方向:一是系统建设技术,二,海量数据应用. 先说系统建设,现在主流的技术是HADOOP,主要基于mapreduce的分布式框架.目前可以先学习这个.但是我的观点,在分布式系统出来之前,主要是集中式架构,如DB2,oracle.为什么现在用分布式架构,那是因为现在集中式架构受限于IO性能,出来速度慢,如果又一种硬件技术,可以很快地处理海量数据,性能上能满足需求,那么集中式架构优于分布式架构,因为集中式架构稳定,运维压力小.现在的集中式架构要么性能达不到要求,要么就是过于昂贵.我期待一

Hadoop新手学习指导之hadoop核心知识学习

上篇(Hadoop新手学习指导之入门需知)我们介绍了新手学习hadoop的入门注意事项.这篇来谈谈hadoop核心知识学习. hadoop核心知识学习: hadoop分为hadoop1.X和hadoop2.X,并且还有hadoop生态系统.这里只能慢慢介绍了.一口也吃不成胖子. 那么下面我们以hadoop2.x为例进行详细介绍: Hadoop的核心是mapreduce和hdfs. Mapreduce:mapreduce是很多人都需要迈过去的槛,它比较难以理解,我们有时候即使写出了mapreduc

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭