1 import java.io.IOException; 2 import org.apache.hadoop.conf.Configuration; 3 import org.apache.hadoop.io.*; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.Mapper; 6 import org.apache.hadoop.mapreduce.Reducer; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 import org.apache.hadoop.fs.Path; 11 public class matrix { 12 public static int rowM=0; 13 public static int columnM=0; 14 public static int columnN=0; 15 public static class MyMapper extends Mapper<Object, Text, Text, Text>{ 16 private Text map_key=new Text(); 17 private Text map_value=new Text(); 18 public void setup(Context context){ 19 Configuration conf=context.getConfiguration(); 20 columnN=Integer.parseInt(conf.get("columnN")); 21 rowM=Integer.parseInt(conf.get("rowM")); 22 } 23 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ 24 FileSplit fileSplit=(FileSplit)context.getInputSplit(); 25 String filename=fileSplit.getPath().getName(); 26 System.out.println("map的数据分片长度是:"+fileSplit.getLength()); 27 System.out.println("数据分片的起始位置是:"+fileSplit.getStart()); 28 String[] tempLocation=fileSplit.getLocations(); 29 for (String string : tempLocation) { 30 System.out.println("数据分片所在的主机是:"+string); 31 } 32 if(filename.contains("M")){ 33 String[] tuple=value.toString().split(","); 34 int i=Integer.parseInt(tuple[0]); 35 String[] tupleS=tuple[1].split("\t"); 36 int j=Integer.parseInt(tupleS[0]); 37 int Mij=Integer.parseInt(tupleS[1]); 38 for (int k = 1; k <columnN+1 ; k++) { 39 map_key.set(i+","+k); 40 map_value.set("M"+","+j+","+Mij); 41 context.write(map_key, map_value); 42 } 43 } 44 else if(filename.contains("N")){ 45 String[] tuple=value.toString().split(","); 46 int j=Integer.parseInt(tuple[0]); 47 String[] tupleS=tuple[1].split("\t"); 48 int k=Integer.parseInt(tupleS[0]); 49 int Njk=Integer.parseInt(tupleS[1]); 50 for (int i = 1; i <rowM+1 ; i++) { 51 map_key.set(i+","+k); 52 map_value.set("N"+","+j+","+Njk); 53 context.write(map_key, map_value); 54 } 55 } 56 } 57 } 58 public static class MyReducer extends Reducer<Text, Text, Text, Text>{ 59 private int sum=0; 60 public void setup(Context context) throws IOException{ 61 Configuration conf=context.getConfiguration(); 62 columnM=Integer.parseInt(conf.get("columnM")); 63 } 64 public void reduce(Text key,Iterable<Text> value,Context context)throws IOException,InterruptedException{ 65 int[] M=new int[columnM+1]; 66 int[] N=new int[columnM+1]; 67 System.out.println(key.toString()+"对应的value列表所有值是:"); 68 for (Text val : value){ 69 System.out.println(val.toString()); 70 String[] tuple=val.toString().split(","); 71 if(tuple[0].equals("M")){ 72 M[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]); 73 }else { 74 N[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]); 75 } 76 } 77 for (int j=1;j<columnM+1;++j) { 78 sum+=M[j]*N[j]; 79 } 80 context.write(key, new Text(Integer.toString(sum))); 81 sum=0; 82 } 83 } 84 public static void main(String[] args)throws Exception { 85 if(args.length!=3){ 86 System.err.println("Usage: MatrixMultiply <inputPathM> <inputPathN> <outputPath>"); 87 System.exit(2); 88 } 89 else{ 90 System.out.println("M文件路径:"+args[0]); 91 String[] infoTupleM=args[0].split("_"); 92 rowM=Integer.parseInt(infoTupleM[1]); 93 columnM=Integer.parseInt(infoTupleM[2]); 94 String[] infoTupleN=args[1].split("_"); 95 columnN=Integer.parseInt(infoTupleN[2]); 96 } 97 Configuration conf=new Configuration(); 98 conf.set("columnM", Integer.toString(columnM)); 99 conf.set("rowM", Integer.toString(rowM)); 100 conf.set("columnN", Integer.toString(columnN)); 101 Job job=new Job(conf, "Matrix"); 102 job.setJarByClass(matrix.class); 103 job.setMapperClass(MyMapper.class); 104 job.setReducerClass(MyReducer.class); 105 job.setOutputKeyClass(Text.class); 106 job.setOutputValueClass(Text.class); 107 FileInputFormat.setInputPaths(job, new Path(args[0]),new Path(args[1])); 108 FileOutputFormat.setOutputPath(job, new Path(args[2])); 109 System.exit(job.waitForCompletion(true)?0:1); 110 } 111 }
以上是j计算矩阵M*N结果的源码,总共需要三个输入参数,分别是:M矩阵的路径、N矩阵的路径以及结果的输入路径。其中M存放在文件中,文件的格式是"M_rows_columns",实验的矩阵文件是M_300_500。实验中N的文件是N_500_700。并且M和N文件中的格式都是相同的,都是"i,j\tMij"的形式,其中i表示元素所在矩阵的行数,j表示元素所在矩阵的列数,Mij表示矩阵元素。如下图所示(N_500_700中形式也是如此):
矩阵内都是随机数,矩阵文件由以下shell脚本生成:
#!/bin/bash for i in `seq 1 $1` do for j in `seq 1 $2` do s=$(($RANDOM%100)) echo -e "$i,$j\t$s" >>M_$1_$2 done done
2)map的输出形式:
假设M是i*j的矩阵,N是j*k的矩阵。
对于M矩阵:map的输出形式是(<i,k>,<"M",j,Mij>),其中<i,k>是key,<"M",j,Mij>是value。
M表示此键值对是M矩阵的内容
Mij是M矩阵中的一个元素
i和j是这个元素在矩阵中的位置
k是矩阵N的列数
时间: 2024-10-10 05:42:31