使用分布式缓存有两点需要注意,这是今天折腾了一天的体会。
1)利用DistributedCache类添加缓存文件的语句要紧紧跟在Configuration实例之后
1 Configuration conf=new Configuration(); 2 DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存 3 FileSystem fs=FileSystem.get(URI.create(cachePath),conf); 4 System.out.println(fs.getUri().toString()); 5 fs.delete(new Path(outUri),true); 6 conf.set("rightMatrixNum","5"); 7 conf.set("u","5"); 8 Job job=new Job(conf,"MultiMatrix"); 9 //DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存
原先添加在第9行,运行一直报“空引用”的错,将 DistributedCache.addCacheFile(new URI(cachePath),conf);添加到第2行,紧跟在conf之后,就OK了(不过要满足以下第2点)
2)第2点就是使用分布式缓存的自定义mapper/reducer类必须定义为内部类。
3)当满足以上两点之后,程序中就可以正常的使用分布式缓存了,不过运行又会遇到一个问题,“FileNotFoundException....”后面就是缓存文件的路径,程序找不到缓存的文件,MR程序中读取文件时,默认FileSystem是hdfs,也就是从集群上读取,但是这些缓存文件恰恰是放在数据节点本地文件系统中的,所以程序中当然会“找不到文件”,解决方法很简单,在使用DistributedCache.getLocalCacheFiles()得到路径的技术上,在其前面追加字符串”file://“就可以了,如下所示(第6行)(http://hugh-wangp.iteye.com/blog/1468989):
1 public void setup(Context context) throws IOException { 2 Configuration conf=context.getConfiguration(); 3 System.out.println("map setup() start!"); 4 //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 5 Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf); 6 String localCacheFile="file://"+cacheFiles[0].toString(); 7 System.out.println("local path is:"+cacheFiles[0].toString()); 8 // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 9 FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf); 10 SequenceFile.Reader reader=null; 11 reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf); 12 IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); 13 DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf); 14 int valueLength=Array.getLength(value.toArray()); 15 while (reader.next(key,value)){ 16 obValue=value.toArray(); 17 leftMatrix[key.get()]=new double[valueLength]; 18 for (int i=0;i<valueLength;++i){ 19 leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString()); 20 } 21 22 } 23 }
以下就是完整的代码,虽然现在还有点问题,不过分布式缓存是实现了的。
1 /** 2 * Created with IntelliJ IDEA. 3 * User: hadoop 4 * Date: 16-3-6 5 * Time: 下午12:47 6 * To change this template use File | Settings | File Templates. 7 */ 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import java.io.IOException; 11 import java.lang.reflect.Array; 12 import java.net.URI; 13 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.*; 16 import org.apache.hadoop.mapreduce.InputSplit; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 import org.apache.hadoop.mapreduce.Reducer; 24 import org.apache.hadoop.mapreduce.Mapper; 25 import org.apache.hadoop.filecache.DistributedCache; 26 import org.apache.hadoop.util.ReflectionUtils; 27 28 public class MutiDoubleInputMatrixProduct { 29 public static void main(String[]args) throws IOException, ClassNotFoundException, InterruptedException { 30 String uri="/testData/input"; 31 String outUri="/sOutput"; 32 String cachePath="/testData/F100"; 33 Configuration conf=new Configuration(); 34 DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存 35 FileSystem fs=FileSystem.get(URI.create(uri),conf); 36 fs.delete(new Path(outUri),true); 37 conf.set("rightMatrixNum","5"); 38 conf.set("u","5"); 39 Job job=new Job(conf,"MultiMatrix"); 40 //DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存 41 //DistributedCache.addLocalFiles(conf,cachePath); 42 43 job.setJarByClass(MutiDoubleInputMatrixProduct.class); 44 job.setInputFormatClass(SequenceFileInputFormat.class); 45 job.setOutputFormatClass(SequenceFileOutputFormat.class); 46 job.setMapperClass(MyMapper.class); 47 job.setReducerClass(MyReducer.class); 48 job.setMapOutputKeyClass(IntWritable.class); 49 job.setMapOutputValueClass(DoubleArrayWritable.class); 50 job.setOutputKeyClass(IntWritable.class); 51 job.setOutputValueClass(DoubleArrayWritable.class); 52 FileInputFormat.setInputPaths(job, new Path(uri)); 53 FileOutputFormat.setOutputPath(job,new Path(outUri)); 54 System.exit(job.waitForCompletion(true)?0:1); 55 } 56 public static class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{ 57 public DoubleArrayWritable map_value=new DoubleArrayWritable(); 58 public static double[][] leftMatrix=null; 59 public Object obValue=null; 60 public DoubleWritable[] arraySum=null; 61 public double sum=0; 62 63 public void setup(Context context) throws IOException { 64 Configuration conf=context.getConfiguration(); 65 System.out.println("map setup() start!"); 66 //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 67 Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf); 68 String localCacheFile="file://"+cacheFiles[0].toString(); 69 System.out.println("local path is:"+cacheFiles[0].toString()); 70 // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 71 FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf); 72 SequenceFile.Reader reader=null; 73 reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf); 74 IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); 75 DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf); 76 int valueLength=Array.getLength(value.toArray()); 77 while (reader.next(key,value)){ 78 obValue=value.toArray(); 79 leftMatrix[key.get()]=new double[valueLength]; 80 for (int i=0;i<valueLength;++i){ 81 leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString()); 82 } 83 84 } 85 } 86 public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException { 87 obValue=value.toArray(); 88 InputSplit inputSplit=context.getInputSplit(); 89 String fileName=((FileSplit)inputSplit).getPath().getName(); 90 if (fileName.startsWith("FB")) { 91 context.write(key,value); 92 } 93 else{ 94 for (int i=0;i<leftMatrix.length;++i){ 95 sum=0; 96 for (int j=0;j<leftMatrix[0].length;++j){ 97 sum+= leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*Double.parseDouble(context.getConfiguration().get("u")); 98 } 99 arraySum[i]=new DoubleWritable(sum); 100 } 101 map_value.set(arraySum); 102 context.write(key,map_value); 103 } 104 } 105 } 106 public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{ 107 public DoubleWritable[] sum=null; 108 public Object obValue=null; 109 public DoubleArrayWritable valueArrayWritable=null; 110 111 public void setup(Context context){ 112 int rightMatrixNum= Integer.parseInt(context.getConfiguration().get("rightMatrixNum")); 113 sum=new DoubleWritable[rightMatrixNum]; 114 for (int i=0;i<rightMatrixNum;++i){ 115 sum[i]=new DoubleWritable(0.0); 116 } 117 } 118 119 public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException { 120 for(DoubleArrayWritable doubleValue:value){ 121 obValue=doubleValue.toArray(); 122 for (int i=0;i<Array.getLength(obValue);++i){ 123 sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get()); 124 } 125 } 126 valueArrayWritable.set(sum); 127 for (int i=0;i<sum.length;++i){ 128 sum[i].set(0.0); 129 } 130 context.write(key,valueArrayWritable); 131 } 132 } 133 } 134 class DoubleArrayWritable extends ArrayWritable { 135 public DoubleArrayWritable(){ 136 super(DoubleWritable.class); 137 } 138 /* 139 public String toString(){ 140 StringBuilder sb=new StringBuilder(); 141 for (Writable val:get()){ 142 DoubleWritable doubleWritable=(DoubleWritable)val; 143 sb.append(doubleWritable.get()); 144 sb.append(","); 145 } 146 sb.deleteCharAt(sb.length()-1); 147 return sb.toString(); 148 } 149 */ 150 }
另外,分布式缓存也可以在本地使用IDEA调试,路径必须是本地路径就可以了(33~37),就是我创建的工程中的路径。
时间: 2024-12-15 06:59:07