MapReduce编程之Semi Join多种应用场景与使用

Map Join 实现方式一

● 使用场景:一个大表(整张表内存放不下,但表中的key内存放得下),一个超大表

● 实现方式:分布式缓存

● 用法:

SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。

代码实现

package com.hadoop.reducejoin.test;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.Key;

/*
 * 一个大表,一个小表(也很大,内存中放不下)
 * map 阶段:Semi Join解决小表整个记录内存放不下的场景,那么就取出来一小部分关键字段放入内存,过滤大表
 * 提前过滤,提前提取出小表中的连接字段放入内存中,在map阶段就仅留下大表中那些小表中存在的连接字段key
 * reduce 阶段:reduce side join
 */
public class SemiJoin {
    /**
     * 为来自不同表(文件)的key/value对打标签以区别不同来源的记录。
     * 然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
     */
    public static class SemiJoinMapper extends
            Mapper<Object, Text, Text, Text> {
        // 定义Set集合保存小表中的key
        private Set<String> joinKeys = new HashSet<String>();
        private Text joinKey = new Text();
        private Text combineValue = new Text();

        /**
         * 获取分布式缓存文件
         */
        protected void setup(Context context) throws IOException,
                InterruptedException {
            BufferedReader br;
            String infoAddr = null;
            // 返回缓存文件路径
            Path[] cacheFilesPaths = context.getLocalCacheFiles();
            for (Path path : cacheFilesPaths) {
                String pathStr = path.toString();
                br = new BufferedReader(new FileReader(pathStr));
                while (null != (infoAddr = br.readLine())) {
                    // 按行读取并解析气象站数据
                    String[] records = StringUtils.split(infoAddr.toString(),
                            "\t");
                    if (null != records)// key为stationID
                        joinKeys.add(records[0]);
                }
            }
        }

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String pathName = ((FileSplit) context.getInputSplit()).getPath()
                    .toString();
            // 如果数据来自于records,加一个records的标记
            if (pathName.endsWith("records-semi.txt")) {
                String[] valueItems = StringUtils.split(value.toString(),
                        "\t");
                // 过滤掉脏数据
                if (valueItems.length != 3) {
                    return;
                }
//                提前过滤,提前提取出小表中的连接字段,在map阶段就仅留下大表中那些小表中存在的连接字段key
                if (joinKeys.contains(valueItems[0])) {
                    joinKey.set(valueItems[0]);
                    combineValue.set("records-semi.txt" + valueItems[1] + "\t"
                            + valueItems[2]);
                    context.write(joinKey, combineValue);
                }

            } else if (pathName.endsWith("station.txt")) {
                // 如果数据来自于station,加一个station的标记
                String[] valueItems = StringUtils.split(value.toString(),
                        "\t");
                // 过滤掉脏数据
                if (valueItems.length != 2) {
                    return;
                }
                joinKey.set(valueItems[0]);
                combineValue.set("station.txt" + valueItems[1]);
                context.write(joinKey, combineValue);

            }
        }
    }
    /*
     * reduce 端做笛卡尔积
     */
    public static class SemiJoinReducer extends
            Reducer<Text, Text, Text, Text> {
        private List<String> leftTable = new ArrayList<String>();
        private List<String> rightTable = new ArrayList<String>();
        private Text result = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 一定要清空数据
            leftTable.clear();
            rightTable.clear();
            // 相同key的记录会分组到一起,我们需要把相同key下来自于不同表的数据分开,然后做笛卡尔积
            for (Text value : values) {
                String val = value.toString();
                System.out.println("value=" + val);
                if (val.startsWith("station.txt")) {
                    leftTable.add(val.replaceFirst("station.txt", ""));
                } else if (val.startsWith("records-semi.txt")) {
                    rightTable.add(val.replaceFirst("records-semi.txt", ""));
                }
            }
            // 笛卡尔积
            for (String leftPart : leftTable) {
                for (String rightPart : rightTable) {
                    result.set(leftPart + "\t" + rightPart);
                    context.write(key, result);
                }
            }
        }
    }

    public static void main(String[] arg0) throws Exception {
        Configuration conf = new Configuration();
        String[] args = { "hdfs://sparks:9000/middle/reduceJoin/station.txt",
                "hdfs://sparks:9000/middle/reduceJoin/station.txt",
                "hdfs://sparks:9000/middle/reduceJoin/records-semi.txt",
                "hdfs://sparks:9000/middle/reduceJoin/SemiJoin-out" };
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: semijoin <in> [<in>...] <out>");
            System.exit(2);
        }

        //输出路径
        Path mypath = new Path(otherArgs[otherArgs.length - 1]);
        FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        Job job = Job.getInstance(conf, "SemiJoin");

        //添加缓存文件
        job.addCacheFile(new Path(otherArgs[0]).toUri());
        job.setJarByClass(SemiJoin.class);
        job.setMapperClass(SemiJoinMapper.class);
        job.setReducerClass(SemiJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //添加输入路径
        for (int i = 1; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        //添加输出路径
        FileOutputFormat.setOutputPath(job, new Path(
                otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

SemiJoin

Reduce join + BloomFilter

● 使用场景:一个大表(表中的key内存仍然放不下),一个超大表

在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。

BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和membershipTest ()。

因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。

● BloomFilter参数计算方式:

n:小表中的记录数。

m:位数组大小,一般m是n的倍数,倍数越大误判率就越小,但是也有内存限制,不能太大,这个值需要反复测试得出。

k:hash个数,最优hash个数值为:k = ln2 * (m/n)

代码实现

package com.hadoop.reducejoin.test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
/*
 * 一个大表,一个小表
 * map 阶段:BloomFilter 解决小表的key集合在内存中仍然存放不下的场景,过滤大表
 * reduce 阶段:reduce side join
 */
public class BloomFilteringDriver {
    /**
     * 为来自不同表(文件)的key/value对打标签以区别不同来源的记录。
     * 然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
     */
    public static class BloomFilteringMapper extends
            Mapper<Object, Text, Text, Text> {
        // 第一个参数是vector的大小,这个值尽量给的大,可以避免hash对象的时候出现索引重复
        // 第二个参数是散列函数的个数
        // 第三个是hash的类型,虽然是int型,但是只有默认两个值
        // 哈希函数个数k、位数组大小m及字符串数量n之间存在相互关系
        //n 为小表记录数,给定允许的错误率E,可以确定合适的位数组大小,即m >= log2(e) * (n * log2(1/E))
        // 给定m和n,可以确定最优hash个数,即k = ln2 * (m/n),此时错误率最小
        private BloomFilter filter = new BloomFilter(10000, 6, Hash.MURMUR_HASH);
        private Text joinKey = new Text();
        private Text combineValue = new Text();

        /**
         * 获取分布式缓存文件
         */
        @SuppressWarnings("deprecation")
        protected void setup(Context context) throws IOException,
                InterruptedException {
            BufferedReader br;
            String infoAddr = null;
            // 返回缓存文件路径
            Path[] cacheFilesPaths = context.getLocalCacheFiles();
            for (Path path : cacheFilesPaths) {
                String pathStr = path.toString();
                br = new BufferedReader(new FileReader(pathStr));
                while (null != (infoAddr = br.readLine())) {
                    // 按行读取并解析气象站数据
                    String[] records = StringUtils.split(infoAddr.toString(),
                            "\t");
                    if (null != records)// key为stationID
                        filter.add(new Key(records[0].getBytes()));
                }
            }

        }

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String pathName = ((FileSplit) context.getInputSplit()).getPath()
                    .toString();
            // 如果数据来自于records,加一个records的标记
            if (pathName.endsWith("records-semi.txt")) {
                String[] valueItems = StringUtils.split(value.toString(),
                        "\t");
                // 过滤掉脏数据
                if (valueItems.length != 3) {
                    return;
                }
                //通过filter 过滤大表中的数据
                if (filter.membershipTest(new Key(valueItems[0].getBytes()))) {
                    joinKey.set(valueItems[0]);
                    combineValue.set("records-semi.txt" + valueItems[1] + "\t"
                            + valueItems[2]);
                    context.write(joinKey, combineValue);
                }

            } else if (pathName.endsWith("station.txt")) {
                // 如果数据来自于station,加一个station的标记
                String[] valueItems = StringUtils.split(value.toString(),
                        "\t");
                // 过滤掉脏数据
                if (valueItems.length != 2) {
                    return;
                }
                joinKey.set(valueItems[0]);
                combineValue.set("station.txt" + valueItems[1]);
                context.write(joinKey, combineValue);
            }

        }
    }
    /*
     * reduce 端做笛卡尔积
     */
    public static class BloomFilteringReducer extends
            Reducer<Text, Text, Text, Text> {
        private List<String> leftTable = new ArrayList<String>();
        private List<String> rightTable = new ArrayList<String>();
        private Text result = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 一定要清空数据
            leftTable.clear();
            rightTable.clear();
            // 相同key的记录会分组到一起,我们需要把相同key下来自于不同表的数据分开,然后做笛卡尔积
            for (Text value : values) {
                String val = value.toString();
                System.out.println("value=" + val);
                if (val.startsWith("station.txt")) {
                    leftTable.add(val.replaceFirst("station.txt", ""));
                } else if (val.startsWith("records-semi.txt")) {
                    rightTable.add(val.replaceFirst("records-semi.txt", ""));
                }
            }
            // 笛卡尔积
            for (String leftPart : leftTable) {
                for (String rightPart : rightTable) {
                    result.set(leftPart + "\t" + rightPart);
                    context.write(key, result);
                }
            }
        }
    }

    public static void main(String[] arg0) throws Exception {
        Configuration conf = new Configuration();
        String[] args = { "hdfs://sparks:9000/middle/reduceJoin/station.txt",
                "hdfs://sparks:9000/middle/reduceJoin/station.txt",
                "hdfs://sparks:9000/middle/reduceJoin/records-semi.txt",
                "hdfs://sparks:9000/middle/reduceJoin/BloomFilte-out" };
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: BloomFilter <in> [<in>...] <out>");
            System.exit(2);
        }

        //输出路径
        Path mypath = new Path(otherArgs[otherArgs.length - 1]);
        FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        Job job = Job.getInstance(conf, "bloomfilter");

        //添加缓存文件
        job.addCacheFile(new Path(otherArgs[0]).toUri());
        job.setJarByClass(BloomFilteringDriver.class);
        job.setMapperClass(BloomFilteringMapper.class);
        job.setReducerClass(BloomFilteringReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //添加输入文件
        for (int i = 1; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        //设置输出路径
        FileOutputFormat.setOutputPath(job, new Path(
                otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

BloomFilteringDriver

总结

三种join方式适用于不同的场景,其处理效率上相差很大,其主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。

时间: 2024-08-04 20:35:35

MapReduce编程之Semi Join多种应用场景与使用的相关文章

mapReduce编程之google pageRank

1 pagerank算法介绍 1.1 pagerank的假设 数量假设:每个网页都会给它的链接网页投票,假设这个网页有n个链接,则该网页给每个链接平分投1/n票. 质量假设:一个网页的pagerank值越大,则它的投票越重要.表现为将它的pagerank值作为它投票的加权值. 1.2 矩阵表示形式 ......... 最终PR值会收敛为稳定值. 1.3 deadends和spider traps deadends:一个网页没有链接,则最终PR值会收敛为全为0: spider traps:一个网页

mapReduce编程之auto complete

1 n-gram模型与auto complete n-gram模型是假设文本中一个词出现的概率只与它前面的N-1个词相关.auto complete的原理就是,根据用户输入的词,将后续出现概率较大的词组显示出来.因此我们可以基于n-gram模型来对用户的输入作预测. 我们的实现方法是:首先用mapreduce在offline对语料库中的数据进行n-gram建模,存到数据库中.然后用户在输入的时候向数据库中查询,获取之后出现的概率较大的词,通过前端php脚本刷新实时显示在界面上.如下所示: 2 m

MapReduce编程之WordCount

//mapreduce程序 import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.a

Hive中join, outer join, semi join区别

Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行. 最常用的就是多表关联查询,主要讲解下join.outer join和semi join的具体使用. join是最简单的关联操作,两边关联只取交集. outer join分为left outer join.right outer join和full outer join. left outer join是以左表驱动,右表不存在的

Java高性能编程之CAS与ABA及解决方法

Java高性能编程之CAS与ABA及解决方法 前言 如果喜欢暗色调的界面或者想换换界面,可以看看我在个人博客发布的 Java高性能编程之CAS与ABA及解决方法. CAS概念 CAS,全称Compare And Swap,比较与交换. 属于硬件级别的同步原语,从处理器层面提供了内存操作的原子性. 从概念上,我们可以得出三点.第一,CAS的运作方式(通过比较与交换实现).第二,硬件层面支持,性能肯定不低(当然它也不是银弹).第三,提供原子性,那么它的功能肯定是确保原子性,从而确保线程安全. 实际使

学习编程之Python篇(一)

第一次接触编程,你将面对两大难题: 1.  对所要使用的编程语言的语法和语义不甚了了. 2.  不知道如何通过编程来解决问题. 作为一名新手,你会尝试同时来解决这两个难题:一边熟悉编程语言的语法语义,一边考虑如何靠编程解决问题.这是一个循序渐进的过程,万事开头难,务必保持耐心,切勿操之过急. 学习编程其实没有什么捷径可走,最好的方法就是反复操练,聆听规则,讨论方法,都不如真正做点什么. 在掌握了一些编程语言的语法语义之后,接下来的难题就是怎样才能写出好的程序.那么,我们首先来看看什么是好的程序.

Linux编程之ICMP洪水攻击

我的上一篇文章<Linux编程之PING的实现>里使用ICMP协议实现了PING的程序,ICMP除了实现这么一个PING程序,还有哪些不为人知或者好玩的用途?这里我将介绍ICMP另一个很有名的黑科技:ICMP洪水攻击. ICMP洪水攻击属于大名鼎鼎的DOS(Denial of Service)攻击的一种,一种是黑客们喜欢的攻击手段,这里本着加深自己对ICMP的理解的目的,也试着基于ICMP写一段ICMP的洪水攻击小程序. 洪水攻击(FLOOD ATTACK)指的是利用计算机网络技术向目的主机发

net异步编程之await

net异步编程之await 初探asp.net异步编程之await 终于毕业了,也顺利进入一家期望的旅游互联网公司.27号入职.放肆了一个多月没写代码,好方啊. 另外一下观点均主要针对于await. 写在前面(带着问题学习) 一.根据代码和执行结果,初探异步编程的执行过程. *问题1:await会让当前线程一直等待吗?await等待的时间中一直占用线程资源吗? *问题2:等待await数据返回交给等待线程再继续向下执行吗? *问题3:向await下一条语句执行的线程,是执行await的线程吗?

Hadoop高级编程之为Hadoop实现构建企业级安全解决方案

本章内容提要 ●    理解企业级应用的安全顾虑 ●    理解Hadoop尚未为企业级应用提供的安全机制 ●    考察用于构建企业级安全解决方案的方法 第10章讨论了Hadoop安全性以及Hadoop中用于提供安全控制的机制.当构建企业级安全解决方案(它可能会围绕着与Hadoop数据集交互的许多应用程序和企业级服务)时,保证Hadoop自身的安全仅仅是安全解决方案的一个方面.各种组织努力对数据采用一致的安全机制,而数据是从采用了不同安全策略的异构数据源中提取的.当这些组织从多个源获取数据,接