使用分布式缓存求多矩阵乘积

使用分布式缓存有两点需要注意,这是今天折腾了一天的体会。

1)利用DistributedCache类添加缓存文件的语句要紧紧跟在Configuration实例之后

1         Configuration conf=new Configuration();
2         DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存
3         FileSystem fs=FileSystem.get(URI.create(cachePath),conf);
4         System.out.println(fs.getUri().toString());
5         fs.delete(new Path(outUri),true);
6         conf.set("rightMatrixNum","5");
7         conf.set("u","5");
8         Job job=new Job(conf,"MultiMatrix");
9         //DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存    

原先添加在第9行,运行一直报“空引用”的错,将 DistributedCache.addCacheFile(new URI(cachePath),conf);添加到第2行,紧跟在conf之后,就OK了(不过要满足以下第2点)

2)第2点就是使用分布式缓存的自定义mapper/reducer类必须定义为内部类。

3)当满足以上两点之后,程序中就可以正常的使用分布式缓存了,不过运行又会遇到一个问题,“FileNotFoundException....”后面就是缓存文件的路径,程序找不到缓存的文件,MR程序中读取文件时,默认FileSystem是hdfs,也就是从集群上读取,但是这些缓存文件恰恰是放在数据节点本地文件系统中的,所以程序中当然会“找不到文件”,解决方法很简单,在使用DistributedCache.getLocalCacheFiles()得到路径的技术上,在其前面追加字符串”file://“就可以了,如下所示(第6行)(http://hugh-wangp.iteye.com/blog/1468989)

 1 public void setup(Context context) throws IOException {
 2             Configuration conf=context.getConfiguration();
 3             System.out.println("map setup() start!");
 4             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 5             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
 6            String localCacheFile="file://"+cacheFiles[0].toString();     
 7             System.out.println("local path is:"+cacheFiles[0].toString());
 8             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 9             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
10             SequenceFile.Reader reader=null;
11             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
12             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
13             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
14             int valueLength=Array.getLength(value.toArray());
15             while (reader.next(key,value)){
16                 obValue=value.toArray();
17                 leftMatrix[key.get()]=new double[valueLength];
18                 for (int i=0;i<valueLength;++i){
19                     leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString());
20                 }
21
22             }
23         }

以下就是完整的代码,虽然现在还有点问题,不过分布式缓存是实现了的。

  1 /**
  2  * Created with IntelliJ IDEA.
  3  * User: hadoop
  4  * Date: 16-3-6
  5  * Time: 下午12:47
  6  * To change this template use File | Settings | File Templates.
  7  */
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import java.io.IOException;
 11 import java.lang.reflect.Array;
 12 import java.net.URI;
 13
 14 import org.apache.hadoop.fs.Path;
 15 import org.apache.hadoop.io.*;
 16 import org.apache.hadoop.mapreduce.InputSplit;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 23 import org.apache.hadoop.mapreduce.Reducer;
 24 import org.apache.hadoop.mapreduce.Mapper;
 25 import org.apache.hadoop.filecache.DistributedCache;
 26 import org.apache.hadoop.util.ReflectionUtils;
 27
 28 public class MutiDoubleInputMatrixProduct {
 29     public static void main(String[]args) throws IOException, ClassNotFoundException, InterruptedException {
 30         String uri="/testData/input";
 31         String outUri="/sOutput";
 32         String cachePath="/testData/F100";
 33         Configuration conf=new Configuration();
 34         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
 35         FileSystem fs=FileSystem.get(URI.create(uri),conf);
 36         fs.delete(new Path(outUri),true);
 37         conf.set("rightMatrixNum","5");
 38         conf.set("u","5");
 39         Job job=new Job(conf,"MultiMatrix");
 40         //DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式缓存
 41         //DistributedCache.addLocalFiles(conf,cachePath);
 42
 43         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
 44         job.setInputFormatClass(SequenceFileInputFormat.class);
 45         job.setOutputFormatClass(SequenceFileOutputFormat.class);
 46         job.setMapperClass(MyMapper.class);
 47         job.setReducerClass(MyReducer.class);
 48         job.setMapOutputKeyClass(IntWritable.class);
 49         job.setMapOutputValueClass(DoubleArrayWritable.class);
 50         job.setOutputKeyClass(IntWritable.class);
 51         job.setOutputValueClass(DoubleArrayWritable.class);
 52         FileInputFormat.setInputPaths(job, new Path(uri));
 53         FileOutputFormat.setOutputPath(job,new Path(outUri));
 54         System.exit(job.waitForCompletion(true)?0:1);
 55     }
 56   public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
 57         public DoubleArrayWritable map_value=new DoubleArrayWritable();
 58         public static double[][] leftMatrix=null;
 59         public Object obValue=null;
 60         public DoubleWritable[] arraySum=null;
 61         public double sum=0;
 62
 63         public void setup(Context context) throws IOException {
 64             Configuration conf=context.getConfiguration();
 65             System.out.println("map setup() start!");
 66             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 67             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
 68            String localCacheFile="file://"+cacheFiles[0].toString();
 69             System.out.println("local path is:"+cacheFiles[0].toString());
 70             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 71             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
 72             SequenceFile.Reader reader=null;
 73             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
 74             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
 75             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
 76             int valueLength=Array.getLength(value.toArray());
 77             while (reader.next(key,value)){
 78                 obValue=value.toArray();
 79                 leftMatrix[key.get()]=new double[valueLength];
 80                 for (int i=0;i<valueLength;++i){
 81                     leftMatrix[key.get()][i]=Double.parseDouble(Array.get(obValue, i).toString());
 82                 }
 83
 84             }
 85         }
 86         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
 87             obValue=value.toArray();
 88             InputSplit inputSplit=context.getInputSplit();
 89             String fileName=((FileSplit)inputSplit).getPath().getName();
 90             if (fileName.startsWith("FB")) {
 91                 context.write(key,value);
 92             }
 93             else{
 94                 for (int i=0;i<leftMatrix.length;++i){
 95                     sum=0;
 96                     for (int j=0;j<leftMatrix[0].length;++j){
 97                         sum+= leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*Double.parseDouble(context.getConfiguration().get("u"));
 98                     }
 99                     arraySum[i]=new DoubleWritable(sum);
100                 }
101                 map_value.set(arraySum);
102                 context.write(key,map_value);
103             }
104         }
105     }
106   public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
107         public DoubleWritable[] sum=null;
108         public Object obValue=null;
109         public DoubleArrayWritable valueArrayWritable=null;
110
111         public void setup(Context context){
112             int rightMatrixNum= Integer.parseInt(context.getConfiguration().get("rightMatrixNum"));
113             sum=new DoubleWritable[rightMatrixNum];
114             for (int i=0;i<rightMatrixNum;++i){
115                 sum[i]=new DoubleWritable(0.0);
116             }
117         }
118
119         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
120             for(DoubleArrayWritable doubleValue:value){
121                 obValue=doubleValue.toArray();
122                 for (int i=0;i<Array.getLength(obValue);++i){
123                     sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
124                 }
125             }
126             valueArrayWritable.set(sum);
127             for (int i=0;i<sum.length;++i){
128                 sum[i].set(0.0);
129             }
130             context.write(key,valueArrayWritable);
131         }
132     }
133 }
134 class DoubleArrayWritable extends ArrayWritable {
135     public DoubleArrayWritable(){
136         super(DoubleWritable.class);
137     }
138     /*
139     public String toString(){
140         StringBuilder sb=new StringBuilder();
141         for (Writable val:get()){
142             DoubleWritable doubleWritable=(DoubleWritable)val;
143             sb.append(doubleWritable.get());
144             sb.append(",");
145         }
146         sb.deleteCharAt(sb.length()-1);
147         return sb.toString();
148     }
149     */
150 }

另外,分布式缓存也可以在本地使用IDEA调试,路径必须是本地路径就可以了(33~37),就是我创建的工程中的路径。

时间: 2024-10-13 05:55:32

使用分布式缓存求多矩阵乘积的相关文章

分布式缓存法计算矩阵乘法

1)做矩阵F是.txt格式,右矩阵B是SequenceFile,代码如下: 1 package matrix; 2 3 import java.io.BufferedReader; 4 import java.io.FileReader; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.f

分布式缓存- memcached

分布式缓存出于如下考虑,首先是缓存本身的水平线性扩展问题,其次是缓存大并发下的本身的性能问题,再次避免缓存的单点故障问题(多副本和副本一致性).分布式缓存的核心技术包括首先是内存本身的管理问题,包括了内存的分配,管理和回收机制.其次是分布式管理和分布式算法,其次是缓存键值管理和路由. 原文:http://wenku.baidu.com/view/8686d46c7e21af45b307a8c3.html 什么是Memcached 许多Web 应用程序都将数据保存到RDBMS中,应用服务器从中读取

跟我一起数据挖掘(17)——分布式缓存

分布式缓存架构 先看架构: 图一 用户通过访问http服务器,然后访问应用服务器资源,应用服务器调用后端的数据库,在第一次访问的时候,直接访问数据库,然后将要缓存的内容放入memcached集群,集群规模根据缓存文件的大小而定.在第二次访问的时候就直接进入缓存读取,不需要进行数据库的操作.这个适合数据变化不频繁的场景,比如:互联网站显示的榜单.阅读排行等. 博客园的48小时阅读排行就类似于这一种: 当然,缓存的架构使用方式不止这一种方式,在数据挖掘系统中,对于不频繁更新的数据或者离线的数据都可以

2014 HDU多校弟五场J题 【矩阵乘积】

题意很简单,就是两个大矩阵相乘,然后求乘积. 用 Strassen算法 的话,当N的规模达到100左右就会StackOverFlow了 况且输入的数据范围可达到800,如果变量还不用全局变量的话连内存开辟都开不出来 1 #pragma comment(linker, "/STACK:16777216") 2 #include <iostream> 3 #include <stdio.h> 4 #define ll long long 5 using namesp

分布式缓存的一致性hash算法

基本场景 比如你有 N 个 cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N 个 cache 上呢,你很可能会采用类似下面的通用方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache : 常规取余的hash算法 hash(key) % N 对于N台缓存服务器构成的集群缓存,依次编号为0 - (N-1)先对要存储的key进行hash取值,然后用hash值对N取余,得到一个在缓存服务器编号区间的一个数字,则将当前key存到这台服务器

Memcached 分布式缓存实现原理

摘要 在高并发环境下,大量的读.写请求涌向数据库,此时磁盘IO将成为瓶颈,从而导致过高的响应延迟,因此缓存应运而生.无论是单机缓存还是分布式缓存都有其适应场景和优缺点,当今存在的缓存产品也是数不胜数,最常见的有redis和memcached等,既然是分布式,那么他们是怎么实现分布式的呢?本文主要介绍分布式缓存服务mencached的分布式实现原理. 缓存本质 计算机体系缓存 什么是缓存,我们先看看计算机体系结构中的存储体系,根据冯·诺依曼计算机体系结构模型,计算机分为五大部分:运算器.控制器.存

MemCache分布式缓存的一个bug

Memcached分布式缓存策略不是由服务器端至支持的,多台服务器之间并不知道彼此的存在.分布式的实现是由客户端代码(Memcached.ClientLibrary)通过缓存key-server映射来实现的,基本原理就是对缓存key求hash值,用hash值对服务器数量进行模运算,该key值被分配到模运算结果为索引的那台server上. Memcached.ClientLibrary对缓存key计算hashcode的核心算法如下: 1 /// <summary> 2 /// Returns a

如何用分布式缓存服务实现Redis内存优化

Redis是一种支持Key-Value等多种数据结构的存储系统,其数据特性是"ALL IN MEMORY",因此优化内存十分重要.在对Redis进行内存优化时,先要掌握Redis内存存储的特性比如字符串,压缩编码,整数集合等,再根据数据规模和所用命令需求去调整,从而达到空间和效率的最佳平衡. 但随着数据大幅增长,开发人员需要面对重新优化内存所带来开发和数据迁移的双重成本也越来越高.Redis所有的数据都在内存中,那么,我们是否可以通过简便高效的方式去实现Redis内存优化呢? 答案当然

第八章 企业项目开发--分布式缓存memcached

注意:本节代码基于<第七章 企业项目开发--本地缓存guava cache> 1.本地缓存的问题 本地缓存速度一开始高于分布式缓存,但是随着其缓存数量的增加,所占内存越来越大,系统运行内存越来越小,最后系统会被拖慢(这一点与第二点联系起来) 本地缓存存于本机,其缓存数量与大小受本机内存大小限制 本地缓存存于本机,其他机器的访问不到这样的缓存 解决方案:分布式缓存 Jboss cache:缓存还存于本机,但是会同步更新到其他机器(解决了第三个问题,解决不了第一和第二个问题),如果缓存机器数量很多