Hadoop DistributedCache分布式缓存的使用

做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签。因为标签库不是很大,没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件。

main方法中的配置:

//分布式缓存要存储的文件路径
String cachePath[] = {
                "hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv",
                "hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"
        };
//向分布式缓存中添加文件
        job.addCacheFile(new Path(cachePath[0]).toUri());
        job.addCacheFile(new Path(cachePath[1]).toUri());

参考上面代码即可向分布式缓存中添加文件。

在Mapper和Reducer方法中读取分布式缓存文件:

/*
 * 重写Mapper的setup方法,获取分布式缓存中的文件
 */
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
                   throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        super.setup(context);
        URI[] cacheFile = context.getCacheFiles();
        Path tagSetPath = new Path(cacheFile[0]);
        Path tagedUrlPath = new Path(cacheFile[1]);
        文件操作(如把内容读到set或map中);
    }

@Override
public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            在map()中使用读取出的数据;
      }

同样,如果在Reducer中也要读取分布式缓存文件,示例如下:

/*
 * 重写Reducer的setup方法,获取分布式缓存中的文件
 */
    @Override
    protected void setup(Context context)
                   throws IOException, InterruptedException {
        super.setup(context);
        mos = new MultipleOutputs<Text, Text>(context);

        URI[] cacheFile = context.getCacheFiles();
        Path tagSetPath = new Path(cacheFile[0]);
        Path tagSetPath = new Path(cacheFile[1]);
        文件读取操作;
    }

 @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
              throws IOException, InterruptedException {
      while(values.iterator().hasNext()){
          使用读取出的数据;
      }
       context.write(key, new Text(sb.toString()));
      }
时间: 2024-10-13 20:53:24

Hadoop DistributedCache分布式缓存的使用的相关文章

9.3.1 map端连接- DistributedCache分布式缓存小数据集

1.1.1         map端连接- DistributedCache分布式缓存小数据集 当一个数据集非常小时,可以将小数据集发送到每个节点,节点缓存到内存中,这个数据集称为边数据.用map函数将小数据集中的数据按键聚合到大的数据集中,输出连接数据集,进行连接操作. (1)   分布式缓存指定缓存文件 执行命令行时,采用hadoop  jar hadoop-example.jar MapSideJoinMain  -files input/cityfile/tb_dim_city.dat

分布式缓存法计算矩阵乘法

1)做矩阵F是.txt格式,右矩阵B是SequenceFile,代码如下: 1 package matrix; 2 3 import java.io.BufferedReader; 4 import java.io.FileReader; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.f

深入浅出Hive企业级架构优化、Hive Sql优化、压缩和分布式缓存(企业Hadoop应用核心产品)

一.本课程是怎么样的一门课程(全面介绍)    1.1.课程的背景       作为企业Hadoop应用的核心产品,Hive承载着FaceBook.淘宝等大佬 95%以上的离线统计,很多企业里的离线统计甚至全由Hive完成,如我所在的电商.       Hive在企业云计算平台发挥的作用和影响愈来愈大,如何优化提速已经显得至关重要.       Hive作业的规模决定着优化层级,一个Hive作业的优化和一万的Hive作业的优化截然不同.       拥有1万多个Hive作业的大电商如何进行Hiv

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

使用分布式缓存求多矩阵乘积

使用分布式缓存有两点需要注意,这是今天折腾了一天的体会. 1)利用DistributedCache类添加缓存文件的语句要紧紧跟在Configuration实例之后 1 Configuration conf=new Configuration(); 2 DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存 3 FileSystem fs=FileSystem.get(URI.create(cachePath),conf);

MapReduce分布式缓存程序,无法在Windows下的Eclipse中执行问题解决

在写mapreduce程序中经常要用到hadoop自动的分布式缓存DistributedCache(新版本已经换新的API),但是在windows下Eclipse中执行,会出现类似如下错误: 2016-03-03 10:53:21,424 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform..

Hive架构层面优化之六分布式缓存

案例: Hadoop jar引用:hadoop jar -libjars aa.jar bb.jar …. jar包会被上传到hdfs,然后分发到每个datanode 假设有20个jar文件,每天jar文件被上传上万次,分发达上万次(百G级),造成很严重的IO开销. 如何使这些jar包在HDFS上进行缓存,同一个jar只需上传和分发一次,后续所有的job可以节省此jar的上传和分发的开销,从而减少不必要的上传和分发呢? 解决方案:使用分布式缓存 MapReduce如何使用分布式缓存 Hadoop

使用map端连接结合分布式缓存机制实现Join算法

前面我们介绍了MapReduce中的Join算法,我们提到了可以通过map端连接或reduce端连接实现join算法,在文章中,我们只给出了reduce端连接的例子,下面我们说说使用map端连接结合分布式缓存机制实现Join算法 1.介绍 我们使用频道类型数据集和机顶盒用户数据集,进行连接,统计出每天.每个频道.每分钟的收视人数 2.数据集 频道类型数据集就是channelType.csv文件,如下示例 机顶盒用户数据集来源于“08.统计电视机顶盒中无效用户数据,并以压缩格式输出有效数据”这个实

hadoop1.0 TaskTracker因为分布式缓存导致内存泄露的一次问题排查

上周五同事到公司说凌晨的时候有值班同事打电话给他,有部分job卡住了,运行了很长时间都没运行完成,由于是凌晨,他没来得及详细的查看日志,简单的把有问题的tasktracker重启了一下,只有一个节点的TaskTracker进程停掉,让我查一下具体是什么问题.以下是排查过程: 1.登陆到停掉TT进程的处理机 (1).查看磁盘空间 磁盘没有出现空间不足的情况. (2).top查看负载和内存使用情况: 根据上图看出内存和负载都不算高,也不存在僵尸进程. 2.查看进程日志 1.log4j日志: 2014