9.3.1 map端连接- DistributedCache分布式缓存小数据集

1.1.1         map端连接- DistributedCache分布式缓存小数据集

当一个数据集非常小时,可以将小数据集发送到每个节点,节点缓存到内存中,这个数据集称为边数据。用map函数将小数据集中的数据按键聚合到大的数据集中,输出连接数据集,进行连接操作。

(1)   分布式缓存指定缓存文件

执行命令行时,采用hadoop  jar hadoop-example.jar MapSideJoinMain  -files input/cityfile/tb_dim_city.dat input/data/all output

-files input/cityfile/tb_dim_city.dat指定需要缓存的文件,会被复制到各个节任务点。

(2)指定缓存文件的三种类型

Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中,任务运行前,节点管理器从分布式文件系统中复制文件到本地。

1) -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。

2) -archives 选项向自己的任务中复制存档(压缩)文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。

3) -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。

(3)缓存文件删除机制

节点管理器为缓存中的文件各维护一个计数器,任务运行时,文件计数器加1,任务完成后,计数器减1,计数器为0时才能删除文件,当节点缓存容量大于一定值(yarn.nodemanger.localizer.cache.target-size-mb设置,默认10GB),才会删除最近最少使用的文件。

(4)Job的分布式缓存API

除了可以用命令行参数指定缓存文件外,还以通过Job的API指定缓存文件;即通过job对象调用下面的函数设置缓存文件。

//以下两组方法将文件或存档添加到分布式缓存

public void addCacheFile(URI uri);

public void addCacheArchive(URI uri);

//以下两组方法将一次性向分布式缓存中添加一组文件或存档

public void setCacheFiles(URI[] files);

public void setCacheArchives(URI[] archives);

//以下两组方法将文件或存档添加到 MapReduce 任务的类路径

public void addFileToClassPath(Path file);

public void addArchiveToClassPath(Path archive);

public void createSymlink();

(6)DistributedCache缓存小数据集实现hadoop map端连接实例

下面的实例是将城市名称的数据集和用户信息的数据集进行连接,城市名称的数据集很小,而用户信息的数据集很大,所以可以采用缓存文件的方式,将城市信息数据集发送到任务,map任务通过setup方法从缓存中读取小数据集文件tb_dim_city.dat,在内存中形成map映射,map函数处理用户信息数据,根据用户信息中的城市id去map映射中找到城市名称,然后合并输出。

package Temperature;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 *
 * 用途说明:  
 * Map side join中的left outer join  
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),  
 * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":  
 * id     name  orderid  city_code  is_show  
 * 0       其他        9999     9999         0  
 * 1       长春        1        901          1  
 * 2       吉林        2        902          1  
 * 3       四平        3        903          1  
 * 4       松原        4        904          1  
 * 5       通化        5        905          1  
 * 6       辽源        6        906          1  
 * 7       白城        7        907          1  
 * 8       白山        8        908          1  
 * 9       延吉        9        909          1  
 * -------------------------风骚的分割线-------------------------------  
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  
 * tb_user_profiles.dat文件内容,分隔符为"|":  
 * userID   network     flow    cityID  
 * 1           2G       123      1  
 * 2           3G       333      2  
 * 3           3G       555      1  
 * 4           2G       777      3  
 * 5           3G       666      4  
 * -------------------------风骚的分割线-------------------------------  
 *  结果:  
 *  1   长春  1   901 1   1   2G  123  
 *  1   长春  1   901 1   3   3G  555  
 *  2   吉林  2   902 1   2   3G  333  
 *  3   四平  3   903 1   4   2G  777  
 *  4   松原  4   904 1   5   3G  666  
 */
public class MapSideJoinMain extends Configured implements Tool{
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
    public static class LeftOutJoinMapper extends Mapper {

        private HashMap city_info = new HashMap<String,String>();
        private Text outPutKey = new Text();
        private Text outPutValue = new Text();
        private String mapInputStr = null;
        private String mapInputSpit[] = null;
        private String city_secondPart = null;
        /**
         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache  
         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。  
         */
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            BufferedReader br = null;
            //获得当前作业的DistributedCache相关文件  
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            String cityInfo = null;
            for(Path p : distributePaths){
                if(p.toString().endsWith("tb_dim_city.dat")){
                    //读缓存文件,并放到mem中  
                    br = new BufferedReader(new FileReader(p.toString()));
                    while(null!=(cityInfo=br.readLine())){
                        String[] cityPart = cityInfo.split("\\|",5);
                        if(cityPart.length ==5){
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
                        }
                    }
                }
            }
        }

        /**
         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的  
         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了  
         */
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            //排掉空行  
            if(value == null || value.toString().equals("")){
                return;
            }
            mapInputStr = value.toString();
            mapInputSpit = mapInputStr.split("\\|",4);
            //过滤非法记录  
            if(mapInputSpit.length != 4){
                return;
            }
            //判断链接字段是否在map中存在  
            city_secondPart = (String) city_info.get((Object) mapInputSpit[3]);
            if(city_secondPart != null){
                this.outPutKey.set(mapInputSpit[3]);
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
                context.write(outPutKey, outPutValue);
            }
        }
    }
   
    public int run(String[] args) throws Exception {
        Configuration conf=getConf(); //获得配置文件对象  
        DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件  
        Job job=new Job(conf,"MapJoinMR");
        job.setNumReduceTasks(0);

        FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径  
        FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

        job.setJarByClass(MapSideJoinMain.class);
        job.setMapperClass(LeftOutJoinMapper.class);

        job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式  
        job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

        //设置map的输出key和value类型  
        job.setMapOutputKeyClass(Text.class);

        //设置reduce的输出key和value类型  
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        try {
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);
            System.exit(returnCode);
        } catch (Exception e) {
            // TODO Auto-generated catch block  
            logger.error(e.getMessage());
        }
    }
}

实例参考文献:

https://www.cnblogs.com/cssdongl/p/6018806.html

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

原文地址:https://www.cnblogs.com/bclshuai/p/12319471.html

时间: 2024-11-03 05:38:08

9.3.1 map端连接- DistributedCache分布式缓存小数据集的相关文章

使用map端连接结合分布式缓存机制实现Join算法

前面我们介绍了MapReduce中的Join算法,我们提到了可以通过map端连接或reduce端连接实现join算法,在文章中,我们只给出了reduce端连接的例子,下面我们说说使用map端连接结合分布式缓存机制实现Join算法 1.介绍 我们使用频道类型数据集和机顶盒用户数据集,进行连接,统计出每天.每个频道.每分钟的收视人数 2.数据集 频道类型数据集就是channelType.csv文件,如下示例 机顶盒用户数据集来源于“08.统计电视机顶盒中无效用户数据,并以压缩格式输出有效数据”这个实

Hadoop DistributedCache分布式缓存的使用

做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签.因为标签库不是很大,没必要用HBase.我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件. main方法中的配置: //分布式缓存要存储的文件路径 String cachePath[] = { "hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv&qu

分布式缓存法计算矩阵乘法

1)做矩阵F是.txt格式,右矩阵B是SequenceFile,代码如下: 1 package matrix; 2 3 import java.io.BufferedReader; 4 import java.io.FileReader; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.f

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

使用分布式缓存求多矩阵乘积

使用分布式缓存有两点需要注意,这是今天折腾了一天的体会. 1)利用DistributedCache类添加缓存文件的语句要紧紧跟在Configuration实例之后 1 Configuration conf=new Configuration(); 2 DistributedCache.addCacheFile(new URI(cachePath),conf);//添加分布式缓存 3 FileSystem fs=FileSystem.get(URI.create(cachePath),conf);

MapReduce分布式缓存程序,无法在Windows下的Eclipse中执行问题解决

在写mapreduce程序中经常要用到hadoop自动的分布式缓存DistributedCache(新版本已经换新的API),但是在windows下Eclipse中执行,会出现类似如下错误: 2016-03-03 10:53:21,424 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform..

Hive架构层面优化之六分布式缓存

案例: Hadoop jar引用:hadoop jar -libjars aa.jar bb.jar …. jar包会被上传到hdfs,然后分发到每个datanode 假设有20个jar文件,每天jar文件被上传上万次,分发达上万次(百G级),造成很严重的IO开销. 如何使这些jar包在HDFS上进行缓存,同一个jar只需上传和分发一次,后续所有的job可以节省此jar的上传和分发的开销,从而减少不必要的上传和分发呢? 解决方案:使用分布式缓存 MapReduce如何使用分布式缓存 Hadoop

第八章 企业项目开发--分布式缓存memcached

注意:本节代码基于<第七章 企业项目开发--本地缓存guava cache> 1.本地缓存的问题 本地缓存速度一开始高于分布式缓存,但是随着其缓存数量的增加,所占内存越来越大,系统运行内存越来越小,最后系统会被拖慢(这一点与第二点联系起来) 本地缓存存于本机,其缓存数量与大小受本机内存大小限制 本地缓存存于本机,其他机器的访问不到这样的缓存 解决方案:分布式缓存 Jboss cache:缓存还存于本机,但是会同步更新到其他机器(解决了第三个问题,解决不了第一和第二个问题),如果缓存机器数量很多

分布式缓存技术redis学习系列(一)——redis简介以及linux上的安装

redis简介 redis是NoSQL(No Only SQL,非关系型数据库)的一种,NoSQL是以Key-Value的形式存储数据.当前主流的分布式缓存技术有redis,memcached,ssdb,mongodb等.既可以把redis理解为理解为缓存技术,因为它的数据都是缓存在内从中的:也可以理解为数据库,因为redis可以周期性的将数据写入磁盘或者把操作追加到记录文件中.而我个人更倾向理解为缓存技术,因为当今互联网应用业务复杂.高并发.大数据的特性,正是各种缓存技术引入最终目的. 关于r