MapReduce实现矩阵的乘法
在学习pageRank算法时看到这么一个小小的编程应用。并且一直自诩只要有原理就能写出代码(只是时间问题),矩阵乘法的原理很简单,基本上理工科生(只要学过线性代数或者相关课程)都知道。但是从来没有想过通过并行计算的方式来完成矩阵乘法。
这里矩阵的知识就不啰嗦了,矩阵的乘积记作为P=M*N。则P中的元素
简单粗暴的讲,就是左矩阵M的行依次与右矩阵的列元素对应相乘,然后再相加。
可能说到mapreduce算法可能都会想到map用来整理数据(这里指的就是矩阵中的元素),然后送至reduce中计算。说起来是挺简单的,但是从无到有的过程还是不容易的。关键是key值的设计,可以说一个key的设计是编程成功的一半。
下面详细的道来设计过程。
相信大家都关注到了<k,v>对里面的M: 和N:字样。这个得郑重声明一下,其实不用也可以,但是value值这样设置后很容易维护和调试代码(具体的可以参看以下陆嘉恒的《hadoop实战》关于单表关联的章节),因为能清晰看出元素的出处。必须要说明的是M:和N:后面的数字特别重要,M:y的y表示元素对应的列号;N:x的x表示元素对应的行号。这样说明之后是不是思路就清晰了,把这样的<k,v>上传给reduce,reduce收到的是〈k, list(v)〉,源代码的System.out.print("tuple=="
+ MN[0] + ":" + MN[1] +"\t");就能看得明白map的内容。List(v)就是如下的样子。
到了,这里map过程就结束了,下面就是reduce的过程。刚才说了那个x/y很重要,现在就体现出来了,起到标志位的作用。在一个list里面标志位相同的元素相乘就是,再相加就是对应结果。是不是小有成就感。
下面来简单的谈谈MR程序的实现。MR框架还是比较简单,相信初上手的时候都会拿wordcount程序入门,下面就是就是从hadoop自带的wordcount里面抽取出来的MR框架。
publicclass wordCount { publicstaticclass mulMapperextends publicvoid map(Object key, Text value, Context context) throws IOException, InterruptedException { … } } publicstaticclass mulReducerextends publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { … } } publicstaticvoid main(String[] args)throws Configuration conf = String[] otherArgs = .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //对重复出现的目录处理 FileSystem fs = FileSystem.get(URI.create(otherArgs[1]), conf); Path inpath = new Path(otherArgs[0]); Path outpath = new Path(otherArgs[1]); Job job = new Job(conf,"word count"); job.setJarByClass(Multi.class); job.setMapperClass(mulMapper.class); // job.setCombinerClass(mulReducer.class); job.setReducerClass(mulReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, inpath); FileOutputFormat.setOutputPath(job, outpath); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
具体的编程就不讲了,可以根据自己的理解去实现。如果感觉有困难,参考文献后面给出来完整的MR程序代码。
祝你好运!!
参考文献
[1]. 黄宜华, 苗凯翔. 深入理解大数据---大数据处理与编程实践[M].北京:机械工业出版社. 2014
[2]. 陆嘉恒. Hadoop实战[M]. 北京:机械工业出版社. 2012
以下是完整的MR代码。
package org.apache.test; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; publicclass Multi { publicstaticclass mulMapperextends private Textmap_key = private Textmap_value = privateintline1 = 0; // m*n publicvoid map(Object key, Text value, Context context) throws IOException, InterruptedException { int rowM = 2;// int colM = 2;// introwN = 2; int colN = 3;// // System.out.println("再执行一次"); FileSplit filesplit = (FileSplit) context.getInputSplit(); String fileName = filesplit.getPath().getName(); // System.out.println(fileName); if (fileName.contains("n")) { String[] line = value.toString().split(" "); for (int j = 0; j < rowM; j++) { for (int int val = Integer.parseInt(line[i]); map_key.set(j +"," + i); map_value.set("N" +":" System.out.println("N:" context.write(map_key,map_value); } } } String[] line = value.toString().split(" "); for (int i = 0; i < colN; i++) { for (int int val = Integer.parseInt(line[j]); map_key.set(line1 +"," map_value.set("M" +":" System.out.println("M:" context.write(map_key,map_value); } } } line1++; } } publicstaticclass mulReducerextends publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { finalint[] M =newint[2]; finalint[] N =newint[2]; int sum = 0; System.out.println("按key值小到大输出:"); for (Text val : values) { String[] MN = val.toString().split(":"); String[] except_value = MN[1].toString().split(","); System.out.print("tuple==" + MN[0] +":" + MN[1] +"\t"); // if (MN[0].equals("M")) M[Integer.parseInt(except_value[0])] = Integer .parseInt(except_value[1]); else N[Integer.parseInt(except_value[0])] = Integer .parseInt(except_value[1]); } for (int i = 0; i sum += M[i] * N[i]; } System.out.print("sum=" + sum); System.out.println(); // context.write(key, new Text(Integer.toString(sum))); context.write(new Text(),new } } publicstaticvoid main(String[] args)throws Configuration conf =new Configuration(); String[] otherArgs =new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: matrix <in> <out>"); System.exit(2); } //对重复出现的目录处理 FileSystem fs = FileSystem.get(URI.create(otherArgs[1]), conf); Path inpath =new Path(otherArgs[0]); Path outpath =new Path(otherArgs[1]); if (fs.exists(outpath)) { System.out.println(outpath); System.out.println("输出目录已经存在,现在删除...重建目录"); fs.delete(outpath,true); } Job job = job.setJarByClass(Multi.class); job.setMapperClass(mulMapper.class); // job.setCombinerClass(mulReducer.class); job.setReducerClass(mulReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, inpath); FileOutputFormat.setOutputPath(job, outpath); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |