PageRank简单介绍:
其值是通过其他值得指向值所决定,具体例子如下:
对应于每个mapReduce的计算:
由mapper算出每个点所指节点的分值,由reduce整个key相同的,由公式算出。
三角号表示的是迭代两次之间计算的差值,若小于某个值则计算完成,求的每个点的pagerank值。
自我实现的代码:如下
输入的数据分为:
input1.txt
A,B,D
B,C
C,A,B
D,B,C
表示每行第一个点所指向的节点,在reducer的setup会用到,构建hashmap供使用。
input2.txt
A,0.25,B,D
B,0.25,C
C,0.25,A,B
D,0.25,B,C
中间多的数字,表示当前每个节点的pagerank值,其文件可无,因为可以由上面的文件计算生成,有四个节点,即1/4。
自我实现的代码:
package bbdt.steiss.pageRank; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class PageRank { public static class PageMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text averageValue = new Text(); private Text node = new Text(); @Override //把每行数据的对应节点的分pagerank找出,并输出,当前节点的值除以指向节点的总数 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String [] ss = string.split(","); int length = ss.length; double pageValue = Double.parseDouble(ss[1]); double average = pageValue/(length-2); averageValue.set(String.valueOf(average)); int i = 2; while(i<=length-1){ node.set(ss[i]); context.write(node,averageValue); i++; } } } public static class PageReducer extends Reducer<Text, Text, Text, Text>{ private HashMap<String, String> content; private Text res = new Text(); //reducer工作前,key相同的会分组分在一组,用迭代器操作,从总的图中找到所有该节点的分pagerank值 //利用公式计算该pagerank值,输出。因为下一次要用,因此输出可以凑近一些,把结果都放在value里输出 @Override protected void reduce(Text text, Iterable<Text> intIterable, Context context) throws IOException, InterruptedException { double sum = 0.0; double v = 0.0; for (Text t : intIterable) { v = Double.parseDouble(t.toString()); sum = sum + v; } double a = 0.85; double result = (1-a)/4 + a*sum; String sRes = String.valueOf(result); String back = content.get(text.toString()); String front = text.toString(); String comp = front + "," + sRes + back; res.set(comp); context.write(null,res); } @Override //reducer的初始化时,先把节点对应文件的数据,存在hashmap中,也就是content中,供每次reduce方法使用,相当于数据库的作用 //方便查询 protected void setup(Context context) throws IOException, InterruptedException { URI[] uri = context.getCacheArchives(); content = new HashMap<String, String>(); for(URI u : uri) { FileSystem fileSystem = FileSystem.get(u.create("hdfs://hadoop1:9000"), context.getConfiguration()); FSDataInputStream in = null; in = fileSystem.open(new Path(u.getPath())); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in)); String line; while((line = bufferedReader.readLine())!=null) { int index = line.indexOf(","); String first = line.substring(0,index); String last = line.substring(index,line.length()); content.put(first, last); } } } } public static void main(String[] args) throws Exception{ //接受路径文件 Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Path cachePath = new Path(args[2]); double result = 100; int flag = 0; //制定差值多大时进入循环 while(result>0.1) { if(flag == 1) { //初次调用mapreduce不操作这个 //这个是把mapreduce的输出文件复制到输入文件中,作为这次mapreduce的输入文件 copyFile(); flag = 0; } Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(PageRank.class); job.setMapperClass(PageMapper.class); job.setReducerClass(PageReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.addCacheArchive(cachePath.toUri()); outputPath.getFileSystem(configuration).delete(outputPath, true); job.waitForCompletion(true); String outpathString = outputPath.toString()+"/part-r-00000"; //计算两个文件的各节点的pagerank值差 result = fileDo(inputPath, new Path(outpathString)); flag = 1; } System.exit(0); } //计算两个文件的每个节点的pagerank差值,返回 public static double fileDo(Path inputPath,Path outPath) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); FileSystem fs = FileSystem.get(conf); FSDataInputStream in1 = null; FSDataInputStream in2 = null; in1 = fs.open(inputPath); in2 = fs.open(outPath); BufferedReader br1 = new BufferedReader(new InputStreamReader(in1)); BufferedReader br2 = new BufferedReader(new InputStreamReader(in2)); String s1 = null; String s2 = null; ArrayList<Double> arrayList1 = new ArrayList<Double>(); ArrayList<Double> arrayList2 = new ArrayList<Double>(); while ((s1 = br1.readLine()) != null) { String[] ss = s1.split(","); arrayList1.add(Double.parseDouble(ss[1])); } br1.close(); while ((s2 = br2.readLine()) != null) { String[] ss = s2.split(","); arrayList2.add(Double.parseDouble(ss[1])); } double res = 0; for(int i = 0;i<arrayList1.size();i++) { res = res + Math.abs(arrayList1.get(i)-arrayList2.get(i)); } return res; } //将输出文件复制到输入文件中 public static void copyFile() throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); FileSystem fs = FileSystem.get(conf); FSDataInputStream in1 = null; in1 = fs.open(new Path("/output/part-r-00000")); BufferedReader br1 = new BufferedReader(new InputStreamReader(in1)); //这里删除需要打开hdfs在/input目录下的权限操作,非常重要 //“hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件 fs.delete(new Path("/input/test2.txt"),true); //建立一个新文件,返回流 FSDataOutputStream fsDataOutputStream = fs.create(new Path("/input/test2.txt")); BufferedWriter bw1 = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream)); String s1 = null; //写出并写入 while ((s1 = br1.readLine()) != null) { bw1.write(s1); bw1.write("\n"); } bw1.close(); fsDataOutputStream.close(); br1.close(); in1.close(); } }
注意:
在本地操作hdfs时,进行文件的删除和添加,需要打开hdfs的文件操作权限,
这里删除需要打开hdfs在/input目录下的权限操作,非常重要 “hdfs dfs -chmod 777 /input”打开权限,这样才可以删除其下面的文件
打开/input路径的操作权限
时间: 2024-12-21 00:17:40