Hadoop的Reduce Join+BloomFilter实现表链接

适用于场景

连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有MAP端过滤掉不需要JOIN的数据,这样传到REDUCE的数据量减少,减少了网络传及磁盘IO。

缺点

Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间。并且,最终的JOIN在REDUCE端还要进行比对,所以对最终结果无影响。

下面我们先来简单了解下什么是布隆过滤器?

Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。如文章标题所述,本文只是做简单介绍,属于科普文章。

应用场景
在正式介绍Bloom Filter算法之前,先来看看什么时候需要用到Bloom Filter算法。
1. HTTP缓存服务器、Web爬虫等
主要工作是判断一条URL是否在现有的URL集合之中(可以认为这里的数据量级上亿)。
对于HTTP缓存服务器,当本地局域网中的PC发起一条HTTP请求时,缓存服务器会先查看一下这个URL是否已经存在于缓存之中,如果存在的话就没有必要去原始的服务器拉取数据了(为了简单起见,我们假设数据没有发生变化),这样既能节省流量,还能加快访问速度,以提高用户体验。
对于Web爬虫,要判断当前正在处理的网页是否已经处理过了,同样需要当前URL是否存在于已经处理过的URL列表之中。

2. 垃圾邮件过滤
假设邮件服务器通过发送方的邮件域或者IP地址对垃圾邮件进行过滤,那么就需要判断当前的邮件域或者IP地址是否处于黑名单之中。如果邮件服务器的通信邮件数量非常大(也可以认为数据量级上亿),那么也可以使用Bloom Filter算法。

1、BloomFilter能解决什么问题?
以少量的内存空间判断一个元素是否属于这个集合, 代价是有一定的错误率

2、工作原理
1. 初始化一个数组, 所有位标为0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始为0)
2. 将已知集合S中的每一个数组, 按以下方式映射到A中
2.0 选取n个互相独立的hash函数 h1, h2, … hk
2.1 将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi)
2.2 将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作)
3. 对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x)
如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S

3、几个前提
1. hash函数的计算不能性能太差, 否则得不偿失
2. 任意两个hash函数之间必须是独立的.
即任意两个hash函数不存在单一相关性, 否则hash到其中一个索引上的元素也必定会hash到另一个相关的索引上, 这样多个hash没有意义

4、错误率
工作原理的第3步, 的出来的结论, 一个是绝对靠谱的, 一个是不能100%靠谱的。在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。关于具体的错误率,这和最优的哈希函数个数以及位数组的大小有关,而这是可以估算求得一个最优解的:
哈希函数个数k、位数组大小m及字符串数量n之间存在相互关系。相关文献证明了对于给定的m、n,当 k = ln(2)* m/n 时出错的概率是最小的。 具体的请看:http://blog.csdn.net/jiaomeng/article/details/1495500

5、基本特征
从以上对基本原理和数学基础的分析,我们可以得到Bloom filter的如下基本特征,用于指导实际应用。
(1)存在一定错误率,发生在正向判断上(存在性),反向判断不会发生错误(不存在性);
(2)错误率是可控制的,通过改变位数组大小、hash函数个数或更低碰撞率的hash函数来调节;
(3)保持较低的错误率,位数组空位至少保持在一半以上;
(4)给定m和n,可以确定最优hash个数,即k = ln2 * (m/n),此时错误率最小;
(5)给定允许的错误率E,可以确定合适的位数组大小,即m >= log2(e) * (n * log2(1/E)),继而确定hash函数个数k;
(6)正向错误率无法完全消除,即使不对位数组大小和hash函数个数进行限制,即无法实现零错误率;
(7)空间效率高,仅保存“存在状态”,但无法存储完整信息,需要其他数据结构辅助存储;
(8)不支持元素删除操作,因为不能保证删除的安全性。

6、应用场景举例:
(1)拼写检查、数据库系统、文件系统
(2)假设要你写一个网络蜘蛛(web crawler)。由于网络间的链接错综复杂,蜘蛛在网络间爬行很可能会形成“环”。为了避免形成“环”,就需要知道蜘蛛已经访问过那些URL。给一个URL,怎样知道蜘蛛是否已经访问过呢?
(3)网络应用
  P2P网络中查找资源操作,可以对每条网络通路保存Bloom Filter,当命中时,则选择该通路访问。
  广播消息时,可以检测某个IP是否已发包。
  检测广播消息包的环路,将Bloom Filter保存在包里,每个节点将自己添加入Bloom Filter。
  信息队列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾邮件地址过滤
  像网易,QQ这样的公众电子邮件(email)提供商,总是需要过滤来自发送垃圾邮件的人(spamer)的垃圾邮件。一个办法就是记录下那些发垃圾邮件的email 地址。由于那些发送者不停地在注册新的地址,全世界少说也有几十亿个发垃圾邮件的地址,将他们都存起来则需要大量的网络服务器。如果用哈希表,每存储一亿个 email 地址,就需要1.6GB 的内存(用哈希表实现的具体办法是将每一个email 地址对应成一个八字节的信息指纹,然后将这些信息指纹存入哈希表,由于哈希表的存储效率一般只有50%,因此一个email 地址需要占用十六个字节。一亿个地址大约要1.6GB, 即十六亿字节的内存)。因此存贮几十亿个邮件地址可能需要上百GB 的内存。而Bloom Filter只需要哈希表1/8 到1/4 的大小就能解决同样的问题。Bloom Filter决不会漏掉任何一个在黑名单中的可疑地址。而至于误判问题,常见的补救办法是在建立一个小的白名单,存储那些可能别误判的邮件地址。
(5)Bloomfilter在HBase中的作用
HBase利用Bloomfilter来提高随机读(Get)的性能,对于顺序读(Scan)而言,设置Bloomfilter是没有作用的(0.92以后,如果设置了bloomfilter为ROWCOL,对于指定了qualifier的Scan有一定的优化,但不是那种直接过滤文件,排除在查找范围的形式)
Bloomfilter在HBase中的开销?
Bloomfilter是一个列族(cf)级别的配置属性,如果你在表中设置了Bloomfilter,那么HBase会在生成StoreFile时包含一份bloomfilter结构的数据,称其为MetaBlock;MetaBlock与DataBlock(真实的KeyValue数据)一起由LRUBlockCache维护。所以,开启bloomfilter会有一定的存储及内存cache开销。
Bloomfilter如何提高随机读(Get)的性能?
对于某个region的随机读,HBase会遍历读memstore及storefile(按照一定的顺序),将结果合并返回给客户端。如果你设置了bloomfilter,那么在遍历读storefile时,就可以利用bloomfilter,忽略某些storefile。
注意:hbase的bloom filter是惰性加载的,在写压力比较大的情况下,会有不停的compact并产生storefile,那么新的storefile是不会马上将bloom filter加载到内存的,等到读请求来的时候才加载。
这样问题就来了,第一,如果storefile设置的比较大,max size为2G,这会导致bloom filter也比较大;第二,系统的读写压力都比较大。这样或许会经常出现单个 GET请求花费3-5秒的超时现象。

 使用布隆过滤器,之前,需要先对需要处理的key,生成处理的二进制文件,代码如下:

package com.reducebloomfilterjoin;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

/**
 * 布隆过滤器
 * 生成二进制文件数据
 *
 * **/
public class TrainingBloomData {

    public static int getOptimalBloomFilterSize(int numRecords,

    float falsePosRate) {
        int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
                .pow(Math.log(2), 2));

        return size;

    }

    public static int getOptimalK(float numMembers, float vectorSize) {

                return (int) Math.round(vectorSize / numMembers * Math.log(2));

            }

    public static void main(String[] args) throws IOException {

        Path inputFile = new Path("hdfs://192.168.75.130:9000/root/bloom/kk.txt");

        int numMembers = Integer.parseInt("10");

        float falsePosRate = Float.parseFloat("0.01");

        Path bfFile = new Path("hdfs://192.168.75.130:9000/root/bloom/bloom.bin");

         FileSystem fss=FileSystem.get(new Configuration());
         if(fss.exists(bfFile)){
             fss.delete(bfFile, true);
             System.out.println("存在此路径,已经删除!");
         }

        // Calculate our vector size and optimal K value based on approximations

        int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);

        int nbHash = getOptimalK(numMembers, vectorSize);

        // create new Bloom filter

        BloomFilter filter = new BloomFilter(vectorSize, nbHash,

                Hash.MURMUR_HASH);

        // Open file for read

        System.out.println("Training Bloom filter of size " + vectorSize

                + " with " + nbHash + " hash functions, " + numMembers

                + " approximate number of records, and " + falsePosRate

                + " false positive rate");

        String line = null;

        int numRecords = 0;

        FileSystem fs = FileSystem.get(new Configuration());

        for (FileStatus status : fs.listStatus(inputFile)) {

            BufferedReader rdr;

            // if file is gzipped, wrap it in a GZIPInputStream

            if (status.getPath().getName().endsWith(".gz")) {

                rdr = new BufferedReader(new InputStreamReader(

                        new GZIPInputStream(fs.open(status.getPath()))));

            } else {

                rdr = new BufferedReader(new InputStreamReader(fs.open(status

                        .getPath())));

            }

            System.out.println("Reading " + status.getPath());

            while ((line = rdr.readLine()) != null) {

                filter.add(new Key(line.getBytes()));

                ++numRecords;

            }

            rdr.close();

        }

        System.out.println("Trained Bloom filter with " + numRecords

                + " entries.");

        System.out.println("Serializing Bloom filter to HDFS at " + bfFile);

        FSDataOutputStream strm = fs.create(bfFile);

        filter.write(strm);

        strm.flush();

        strm.close();

        System.out.println("Done training Bloom filter.");

    }

}

测试数据如下:
小表模拟数据
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862

大表模拟数据
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10

package com.reducebloomfilterjoin;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

/***
 *
 * Hadoop1.2的版本
 *
 *
 * ReduceJoin+布隆过滤器   实现Reduce Join
 *
 * @author qindongliang
 *
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 *
 *
 *
 * **/
public class ReduceBoolmFilter {

    /**
     *
     *
     * 自定义一个输出实体
     *
     * **/
    private static class CombineEntity implements WritableComparable<CombineEntity>{

        private Text joinKey;//连接key
        private Text flag;//文件来源标志
        private Text secondPart;//除了键外的其他部分的数据

        public CombineEntity() {
            // TODO Auto-generated constructor stub
            this.joinKey=new Text();
            this.flag=new Text();
            this.secondPart=new Text();
        }

        public Text getJoinKey() {
            return joinKey;
        }

        public void setJoinKey(Text joinKey) {
            this.joinKey = joinKey;
        }

        public Text getFlag() {
            return flag;
        }

        public void setFlag(Text flag) {
            this.flag = flag;
        }

        public Text getSecondPart() {
            return secondPart;
        }

        public void setSecondPart(Text secondPart) {
            this.secondPart = secondPart;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.joinKey.readFields(in);
            this.flag.readFields(in);
            this.secondPart.readFields(in);

        }

        @Override
        public void write(DataOutput out) throws IOException {
            this.joinKey.write(out);
            this.flag.write(out);
            this.secondPart.write(out);

        }

        @Override
        public int compareTo(CombineEntity o) {
            // TODO Auto-generated method stub
            return this.joinKey.compareTo(o.joinKey);
        }

    }

    private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{

        private CombineEntity combine=new CombineEntity();
        private Text flag=new Text();
        private  Text joinKey=new Text();
        private Text secondPart=new Text();
        /**
         *使用布隆过滤器存储key
         * 代替原来的HashSet存储
         *
         * */
        //private HashSet<String> joinKeySet=new HashSet<String>();

        BloomFilter filter=new BloomFilter();
        @Override
        protected void setup(Context context)throws IOException, InterruptedException {

            //读取文件流
            BufferedReader br=null;
            String temp;
            // 获取DistributedCached里面 的共享文件
            Path paths[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
                  for (Path path : paths) {

                                          if (path.toString().contains("bloom.bin")) {

                                              DataInputStream strm = new DataInputStream(

                                                      new FileInputStream(path.toString()));

                                              // Read into our Bloom filter.

                                              filter.readFields(strm);

                                              strm.close();

                                          }

                                      }

        }

        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {

               //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

            if(pathName.endsWith("a.txt")){

                String  valueItems[]=value.toString().split(",");

                /**
                 * 在这里过滤必须要的连接字符
                 *
                 * */
                //if(joinKeySet.contains(valueItems[0])){
                if(filter.membershipTest(new Key(valueItems[0].getBytes()))){
                    //设置标志位
                    flag.set("0");
                    //设置链接键
                    joinKey.set(valueItems[0]);
                    //设置第二部分
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]);

                    //封装实体
                    combine.setFlag(flag);//标志位
                    combine.setJoinKey(joinKey);//链接键
                    combine.setSecondPart(secondPart);//其他部分

                     //写出
                    context.write(combine.getJoinKey(), combine);
                }else{
                    System.out.println("a.txt里");
                    System.out.println("在小表中无此记录,执行过滤掉!");
                    for(String v:valueItems){
                        System.out.print(v+"   ");
                    }

                    return ;

                }

            }else if(pathName.endsWith("b.txt")){
                String  valueItems[]=value.toString().split(",");

                /**
                 *
                 * 判断是否在集合中
                 *
                 * */
                if(filter.membershipTest(new Key(valueItems[0].getBytes()))){
                //if(joinKeySet.contains(valueItems[0])){
                    //if(joinKeySet.contains(valueItems[0])){
                    //设置标志位
                    flag.set("1");   

                    //设置链接键
                    joinKey.set(valueItems[0]);

                    //设置第二部分注意不同的文件的列数不一样
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);

                    //封装实体
                    combine.setFlag(flag);//标志位
                    combine.setJoinKey(joinKey);//链接键
                    combine.setSecondPart(secondPart);//其他部分

                     //写出
                    context.write(combine.getJoinKey(), combine);

                }else{
                    //执行过滤 ......
                    System.out.println("b.txt里");
                    System.out.println("在小表中无此记录,执行过滤掉!");
                    for(String v:valueItems){
                        System.out.print(v+"   ");
                    }

                    return ;

                }

            }

        }

    }

    private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{

        //存储一个分组中左表信息
        private List<Text> leftTable=new ArrayList<Text>();
        //存储一个分组中右表信息
        private List<Text> rightTable=new ArrayList<Text>();

        private Text secondPart=null;

        private Text output=new Text();

        //一个分组调用一次
        @Override
        protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
                throws IOException, InterruptedException {
             leftTable.clear();//清空分组数据
             rightTable.clear();//清空分组数据

             /**
              * 将不同文件的数据,分别放在不同的集合
              * 中,注意数据量过大时,会出现
              * OOM的异常
              *
              * **/

             for(CombineEntity ce:values){

                 this.secondPart=new Text(ce.getSecondPart().toString());

                 //左表

                 if(ce.getFlag().toString().trim().equals("0")){
                     leftTable.add(secondPart);

                 }else if(ce.getFlag().toString().trim().equals("1")){

                     rightTable.add(secondPart);

                 }

             }

             //=====================
             for(Text left:leftTable){

                 for(Text right:rightTable){

                     output.set(left+"\t"+right);//连接左右数据
                     context.write(key, output);//输出
                 }

             }

        }

    }

    public static void main(String[] args)throws Exception {

         //Job job=new Job(conf,"myjoin");
         JobConf conf=new JobConf(ReduceBoolmFilter.class);
           conf.set("mapred.job.tracker","192.168.75.130:9001");
            conf.setJar("tt.jar");

            //小表共享
            String bpath="hdfs://192.168.75.130:9000/root/bloom/bloom.bin";
            //添加到共享cache里
        DistributedCache.addCacheFile(new URI(bpath), conf);

          Job job=new Job(conf, "aaaaa");
         job.setJarByClass(ReduceBoolmFilter.class);
         System.out.println("模式:  "+conf.get("mapred.job.tracker"));;

         //设置Map和Reduce自定义类
         job.setMapperClass(JMapper.class);
         job.setReducerClass(JReduce.class);

         //设置Map端输出
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(CombineEntity.class);

         //设置Reduce端的输出
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);

         job.setInputFormatClass(TextInputFormat.class);
         job.setOutputFormatClass(TextOutputFormat.class);

         FileSystem fs=FileSystem.get(conf);

         Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew5");

         if(fs.exists(op)){
             fs.delete(op, true);
             System.out.println("存在此输出路径,已删除!!!");
         }

      FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
      FileOutputFormat.setOutputPath(job, op);

      System.exit(job.waitForCompletion(true)?0:1);

    }

}

运行情况

模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404300232_0003
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404300232_0003
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12165
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9890
INFO - Counters.log(587) |   File Output Format Counters
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=237
INFO - Counters.log(589) |     HDFS_BYTES_READ=455
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169734
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters
INFO - Counters.log(589) |     Bytes Read=227
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=243
INFO - Counters.log(589) |     Map input records=10
INFO - Counters.log(589) |     Reduce shuffle bytes=243
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=215
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1810
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=444403712
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184323072
INFO - Counters.log(589) |     Map output records=8

运行结果

1    三劫散仙    13575468248    B    89    2013-02-05
2    凤舞九天    18965235874    C    69    2013-03-09
3    忙忙碌碌    15986854789    A    99    2013-03-05
3    忙忙碌碌    15986854789    D    56    2013-06-07

Hadoop的Reduce Join+BloomFilter实现表链接

时间: 2024-10-01 04:57:25

Hadoop的Reduce Join+BloomFilter实现表链接的相关文章

Hadoop之Reduce侧的联结

理解其就像关系型数据库中的链接查询一样,数据很多的时候,几个数据文件的数据能够彼此有联系,可以使用Reduce联结.举个很简单的例子来说,一个只存放了顾客信息Customer.txt文件,和一个顾客相关联的Order.txt文件,要进行两个文件的信息组合,原理图如下: 这里涉及的几个专业术语:Group key ,datasourde,Tag.前者的话通俗点来说的话就相当于关系型数据库中的主键和外键,通过其id进行的联结依据.datasource,顾名思义,就是数据的来源,那么这里指的就是Cus

map join 与 reduce join

要解决什么问题? 解决的都是同一个问题,即将两张“表‘进行join操作. reduce join是在map阶段完成数据的标记,在reduce阶段完成数据的合并 map join是直接在map阶段完成数据的合并,没有reduce阶段 比如有如下问题: 这是订单表. 这是商品表. 现在需要将商品表中的商品名称填充到订单表中.得到如下的联合表: 也就是对商品表和订单表根据pid进行join操作,同时剔除联合表中的pid属性. Reduce Join map: 将输入数据统一封装为一个Bean,此Bea

hive join 优化 --小表join大表

1.小.大表 join 在小表和大表进行join时,将小表放在前边,效率会高,hive会将小表进行缓存. 2.mapjoin 使用mapjoin将小表放入内存,在map端和大表逐一匹配,从而省去reduce. 例子: select /*+MAPJOIN(b)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1 在0.7版本后,也可以用配置来自动优化 set hive.auto.convert.join=true;

SQL之表链接

表链接:join on 默认前面有修饰符inner join 内连接 a表和b表所有的根据关系可能对 应的的链接方式显示出来 没有关系不显示 当两个表中间存在某个关系的时候需要把它整 合成一个表 显示出来在select和from中间把想要显示的直接写上然 后from某一个表 拼接join另外一个表 on后边 写这两个表之间的关系先去car表里面找第一条数据 然后拿着第一条 数据信息根据on的关系去brand表里面找那条数 据对接起来 例子:把 code name brand_name显示出来se

sql表链接

表链接:join on 默认前面有修饰符inner join 内连接 a表和b表所有的根据关系可能对 应的的链接方式显示出来 没有关系不显示 当两个表中间存在某个关系的时候需要把它整 合成一个表 显示出来在select和from中间把想要显示的直接写上然 后from某一个表 拼接join另外一个表 on后边 写这两个表之间的关系先去car表里面找第一条数据 然后拿着第一条 数据信息根据on的关系去brand表里面找那条数 据对接起来 例子:把 code name brand_name显示出来se

一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们. 接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨

基于hadoop (map/reduce)的大规模分布式并行计算生产排程系统的设计

map/reduce是大数据处理的技术,它的思路是把大规模数据分成一个个小数据块,每个数据块由一个map任务来处理,处理的中间结果汇总到reduce,最后生成最终的处理结果,这个处理和汇总的过程是可以反复多次的,也就是map-reduce-map-reduce 基于map/reduce的思路,我们可以设计基于hadoop(map/reduce)的大规模分布式并行计算生产排程系统,怎么样把大数据处理变成大规模分布式并行计算呢,就是我们不切分数据,每个map任务的数据都是相同的,每个map任务对排程

Hadoop Map/Reduce

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集.一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们.框架会对map的输出先进行排序, 然后把结果输入给reduce任务.通常作业的输入和输出都会被存储在文件系统中. 整个框架负责任务的调度和监控,以及重新执行已经失败的任务.通常,Map/R

INEER JOIN..ON两表查询例子

一.创建两个表 1.hanshu 2.YingHu 二.建立关系 --inner join on用法两表查询--查询hanshu中的name 和 YingHu中的YYqq --select *或(列名) from hanshu(表1) INNER JOIN YingHu(表二) ON hanshu.id=YingHu.id (来源于两表中的关联主键) select hanshu.name,YingHu.YYqq from hanshu INNER JOIN YingHu ON hanshu.id