Hadoop中MapReduce多种join实现实例分析

一、概述

对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。

二、实现原理

1、在Reudce端进行连接。

在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:

(1)自定义一个value返回类型:

package com.mr.reduceSizeJoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class CombineValues implements WritableComparable<CombineValues>{
    //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
    private Text joinKey;//链接关键字
    private Text flag;//文件来源标志
    private Text secondPart;//除了链接键外的其他部分
    public void setJoinKey(Text joinKey) {
        this.joinKey = joinKey;
    }
    public void setFlag(Text flag) {
        this.flag = flag;
    }
    public void setSecondPart(Text secondPart) {
        this.secondPart = secondPart;
    }
    public Text getFlag() {
        return flag;
    }
    public Text getSecondPart() {
        return secondPart;
    }
    public Text getJoinKey() {
        return joinKey;
    }
    public CombineValues() {
        this.joinKey =  new Text();
        this.flag = new Text();
        this.secondPart = new Text();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        this.joinKey.write(out);
        this.flag.write(out);
        this.secondPart.write(out);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.joinKey.readFields(in);
        this.flag.readFields(in);
        this.secondPart.readFields(in);
    }
    @Override
    public int compareTo(CombineValues o) {
        return this.joinKey.compareTo(o.getJoinKey());
    }
    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
    }
} 

(2)map、reduce主体代码

package com.mr.reduceSizeJoin;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author zengzhaozheng
 * 用途说明:
 * reudce side join中的left outer join
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
 * tb_dim_city.dat文件内容,分隔符为"|":
 * id     name  orderid  city_code  is_show
 * 0       其他        9999     9999         0
 * 1       长春        1        901          1
 * 2       吉林        2        902          1
 * 3       四平        3        903          1
 * 4       松原        4        904          1
 * 5       通化        5        905          1
 * 6       辽源        6        906          1
 * 7       白城        7        907          1
 * 8       白山        8        908          1
 * 9       延吉        9        909          1
 * -------------------------风骚的分割线-------------------------------
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
 * tb_user_profiles.dat文件内容,分隔符为"|":
 * userID   network     flow    cityID
 * 1           2G       123      1
 * 2           3G       333      2
 * 3           3G       555      1
 * 4           2G       777      3
 * 5           3G       666      4
 *
 * -------------------------风骚的分割线-------------------------------
 *  结果:
 *  1   长春  1   901 1   1   2G  123
 *  1   长春  1   901 1   3   3G  555
 *  2   吉林  2   902 1   2   3G  333
 *  3   四平  3   903 1   4   2G  777
 *  4   松原  4   904 1   5   3G  666
 */
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
    private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
        private CombineValues combineValues = new CombineValues();
        private Text flag = new Text();
        private Text joinKey = new Text();
        private Text secondPart = new Text();
        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            //数据来自tb_dim_city.dat文件,标志即为"0"
            if(pathName.endsWith("tb_dim_city.dat")){
                String[] valueItems = value.toString().split("\\|");
                //过滤格式错误的记录
                if(valueItems.length != 5){
                    return;
                }
                flag.set("0");
                joinKey.set(valueItems[0]);
                secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
                combineValues.setFlag(flag);
                combineValues.setJoinKey(joinKey);
                combineValues.setSecondPart(secondPart);
                context.write(combineValues.getJoinKey(), combineValues);

                }//数据来自于tb_user_profiles.dat,标志即为"1"
            else if(pathName.endsWith("tb_user_profiles.dat")){
                String[] valueItems = value.toString().split("\\|");
                //过滤格式错误的记录
                if(valueItems.length != 4){
                    return;
                }
                flag.set("1");
                joinKey.set(valueItems[3]);
                secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
                combineValues.setFlag(flag);
                combineValues.setJoinKey(joinKey);
                combineValues.setSecondPart(secondPart);
                context.write(combineValues.getJoinKey(), combineValues);
            }
        }
    }
    public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
        //存储一个分组中的左表信息
        private ArrayList<Text> leftTable = new ArrayList<Text>();
        //存储一个分组中的右表信息
        private ArrayList<Text> rightTable = new ArrayList<Text>();
        private Text secondPar = null;
        private Text output = new Text();
        /**
         * 一个分组调用一次reduce函数
         */
        @Override
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)
                throws IOException, InterruptedException {
            leftTable.clear();
            rightTable.clear();
            /**
             * 将分组中的元素按照文件分别进行存放
             * 这种方法要注意的问题:
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
             */
            for(CombineValues cv : value){
                secondPar = new Text(cv.getSecondPart().toString());
                //左表tb_dim_city
                if("0".equals(cv.getFlag().toString().trim())){
                    leftTable.add(secondPar);
                }
                //右表tb_user_profiles
                else if("1".equals(cv.getFlag().toString().trim())){
                    rightTable.add(secondPar);
                }
            }
            logger.info("tb_dim_city:"+leftTable.toString());
            logger.info("tb_user_profiles:"+rightTable.toString());
            for(Text leftPart : leftTable){
                for(Text rightPart : rightTable){
                    output.set(leftPart+ "\t" + rightPart);
                    context.write(key, output);
                }
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
          Configuration conf=getConf(); //获得配置文件对象
            Job job=new Job(conf,"LeftOutJoinMR");
            job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
            job.setMapperClass(LeftOutJoinMapper.class);
            job.setReducerClass(LeftOutJoinReducer.class);
            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式

            //设置map的输出key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(CombineValues.class);

            //设置reduce的输出key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.waitForCompletion(true);
            return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        try {
            int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
            System.exit(returnCode);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            logger.error(e.getMessage());
        }
    }
} 

其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

2、在Map端进行连接。

使用场景:一张表十分小、一张表很大。

用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

直接上代码,比较简单:

package com.mr.mapSideJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author zengzhaozheng
 *
 * 用途说明:
 * Map side join中的left outer join
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
 * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":
 * id     name  orderid  city_code  is_show
 * 0       其他        9999     9999         0
 * 1       长春        1        901          1
 * 2       吉林        2        902          1
 * 3       四平        3        903          1
 * 4       松原        4        904          1
 * 5       通化        5        905          1
 * 6       辽源        6        906          1
 * 7       白城        7        907          1
 * 8       白山        8        908          1
 * 9       延吉        9        909          1
 * -------------------------风骚的分割线-------------------------------
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
 * tb_user_profiles.dat文件内容,分隔符为"|":
 * userID   network     flow    cityID
 * 1           2G       123      1
 * 2           3G       333      2
 * 3           3G       555      1
 * 4           2G       777      3
 * 5           3G       666      4
 * -------------------------风骚的分割线-------------------------------
 *  结果:
 *  1   长春  1   901 1   1   2G  123
 *  1   长春  1   901 1   3   3G  555
 *  2   吉林  2   902 1   2   3G  333
 *  3   四平  3   903 1   4   2G  777
 *  4   松原  4   904 1   5   3G  666
 */
public class MapSideJoinMain extends Configured implements Tool{
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {

        private HashMap<String,String> city_info = new HashMap<String, String>();
        private Text outPutKey = new Text();
        private Text outPutValue = new Text();
        private String mapInputStr = null;
        private String mapInputSpit[] = null;
        private String city_secondPart = null;
        /**
         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache
         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。
         */
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            BufferedReader br = null;
            //获得当前作业的DistributedCache相关文件
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            String cityInfo = null;
            for(Path p : distributePaths){
                if(p.toString().endsWith("tb_dim_city.dat")){
                    //读缓存文件,并放到mem中
                    br = new BufferedReader(new FileReader(p.toString()));
                    while(null!=(cityInfo=br.readLine())){
                        String[] cityPart = cityInfo.split("\\|",5);
                        if(cityPart.length ==5){
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
                        }
                    }
                }
            }
        }

        /**
         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的
         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了
         */
        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            //排掉空行
            if(value == null || value.toString().equals("")){
                return;
            }
            mapInputStr = value.toString();
            mapInputSpit = mapInputStr.split("\\|",4);
            //过滤非法记录
            if(mapInputSpit.length != 4){
                return;
            }
            //判断链接字段是否在map中存在
            city_secondPart = city_info.get(mapInputSpit[3]);
            if(city_secondPart != null){
                this.outPutKey.set(mapInputSpit[3]);
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
                context.write(outPutKey, outPutValue);
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
            Configuration conf=getConf(); //获得配置文件对象
            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件
            Job job=new Job(conf,"MapJoinMR");
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

            job.setJarByClass(MapSideJoinMain.class);
            job.setMapperClass(LeftOutJoinMapper.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型
            job.setMapOutputKeyClass(Text.class);

            //设置reduce的输出key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.waitForCompletion(true);
            return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        try {
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);
            System.exit(returnCode);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            logger.error(e.getMessage());
        }
    }
} 

这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。

另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。

3、SemiJoin。

SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:

package com.mr.SemiJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author zengzhaozheng
 *
 * 用途说明:
 * reudce side join中的left outer join
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
 * tb_dim_city.dat文件内容,分隔符为"|":
 * id     name  orderid  city_code  is_show
 * 0       其他        9999     9999         0
 * 1       长春        1        901          1
 * 2       吉林        2        902          1
 * 3       四平        3        903          1
 * 4       松原        4        904          1
 * 5       通化        5        905          1
 * 6       辽源        6        906          1
 * 7       白城        7        907          1
 * 8       白山        8        908          1
 * 9       延吉        9        909          1
 * -------------------------风骚的分割线-------------------------------
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
 * tb_user_profiles.dat文件内容,分隔符为"|":
 * userID   network     flow    cityID
 * 1           2G       123      1
 * 2           3G       333      2
 * 3           3G       555      1
 * 4           2G       777      3
 * 5           3G       666      4
 * -------------------------风骚的分割线-------------------------------
 * joinKey.dat内容:
 * city_code
 * 1
 * 2
 * 3
 * 4
 * -------------------------风骚的分割线-------------------------------
 *  结果:
 *  1   长春  1   901 1   1   2G  123
 *  1   长春  1   901 1   3   3G  555
 *  2   吉林  2   902 1   2   3G  333
 *  3   四平  3   903 1   4   2G  777
 *  4   松原  4   904 1   5   3G  666
 */
public class SemiJoin extends Configured implements Tool{
    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);
    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
        private CombineValues combineValues = new CombineValues();
        private HashSet<String> joinKeySet = new HashSet<String>();
        private Text flag = new Text();
        private Text joinKey = new Text();
        private Text secondPart = new Text();
        /**
         * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b
         */
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            BufferedReader br = null;
            //获得当前作业的DistributedCache相关文件
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            String joinKeyStr = null;
            for(Path p : distributePaths){
                if(p.toString().endsWith("joinKey.dat")){
                    //读缓存文件,并放到mem中
                    br = new BufferedReader(new FileReader(p.toString()));
                    while(null!=(joinKeyStr=br.readLine())){
                        joinKeySet.add(joinKeyStr);
                    }
                }
            }
        }
        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            //数据来自tb_dim_city.dat文件,标志即为"0"
            if(pathName.endsWith("tb_dim_city.dat")){
                String[] valueItems = value.toString().split("\\|");
                //过滤格式错误的记录
                if(valueItems.length != 5){
                    return;
                }
                //过滤掉不需要参加join的记录
                if(joinKeySet.contains(valueItems[0])){
                    flag.set("0");
                    joinKey.set(valueItems[0]);
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
                    combineValues.setFlag(flag);
                    combineValues.setJoinKey(joinKey);
                    combineValues.setSecondPart(secondPart);
                    context.write(combineValues.getJoinKey(), combineValues);
                }else{
                    return ;
                }
            }//数据来自于tb_user_profiles.dat,标志即为"1"
            else if(pathName.endsWith("tb_user_profiles.dat")){
                String[] valueItems = value.toString().split("\\|");
                //过滤格式错误的记录
                if(valueItems.length != 4){
                    return;
                }
                //过滤掉不需要参加join的记录
                if(joinKeySet.contains(valueItems[3])){
                    flag.set("1");
                    joinKey.set(valueItems[3]);
                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
                    combineValues.setFlag(flag);
                    combineValues.setJoinKey(joinKey);
                    combineValues.setSecondPart(secondPart);
                    context.write(combineValues.getJoinKey(), combineValues);
                }else{
                    return ;
                }
            }
        }
    }
    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
        //存储一个分组中的左表信息
        private ArrayList<Text> leftTable = new ArrayList<Text>();
        //存储一个分组中的右表信息
        private ArrayList<Text> rightTable = new ArrayList<Text>();
        private Text secondPar = null;
        private Text output = new Text();
        /**
         * 一个分组调用一次reduce函数
         */
        @Override
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)
                throws IOException, InterruptedException {
            leftTable.clear();
            rightTable.clear();
            /**
             * 将分组中的元素按照文件分别进行存放
             * 这种方法要注意的问题:
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
             */
            for(CombineValues cv : value){
                secondPar = new Text(cv.getSecondPart().toString());
                //左表tb_dim_city
                if("0".equals(cv.getFlag().toString().trim())){
                    leftTable.add(secondPar);
                }
                //右表tb_user_profiles
                else if("1".equals(cv.getFlag().toString().trim())){
                    rightTable.add(secondPar);
                }
            }
            logger.info("tb_dim_city:"+leftTable.toString());
            logger.info("tb_user_profiles:"+rightTable.toString());
            for(Text leftPart : leftTable){
                for(Text rightPart : rightTable){
                    output.set(leftPart+ "\t" + rightPart);
                    context.write(key, output);
                }
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
            Configuration conf=getConf(); //获得配置文件对象
            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
            Job job=new Job(conf,"LeftOutJoinMR");
            job.setJarByClass(SemiJoin.class);

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径

            job.setMapperClass(SemiJoinMapper.class);
            job.setReducerClass(SemiJoinReducer.class);

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

            //设置map的输出key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(CombineValues.class);

            //设置reduce的输出key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.waitForCompletion(true);
            return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        try {
            int returnCode =  ToolRunner.run(new SemiJoin(),args);
            System.exit(returnCode);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
} 

这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。

三、总结

blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。

时间: 2024-11-06 22:19:56

Hadoop中MapReduce多种join实现实例分析的相关文章

MapReduce多种join实现实例分析(二)

上一篇<MapReduce多种join实现实例分析(一)>,大家可以点击回顾该篇文章.本文是MapReduce系列第二篇. 一.在Map端进行连接使用场景:一张表十分小.一张表很大.用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中).然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同j

浅谈hadoop中mapreduce的文件分发

最近在做数据分析的时候,需要在mapreduce中调用c语言写的接口,此时就需要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地,但查询资料后发现hadoop有相应的组件来帮助我们完成这个操作,这个组件就是DistributedCache,分布式缓存,运用这个东西可以做到第三方文件的分发和缓存功能,下面详解: 如果我们需要在map之间共享一些数据,如果信息量不大,我们可

Hadoop中两表JOIN的处理方法(转)

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

hadoop中mapreduce的常用类(一)

云智慧(北京)科技有限公司陈鑫 写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的.以前还一直纳闷儿为什么有时候是jobClient提交任务,有时是Job...不管API是否更新,下面这些类也还是存在于API中的,经过自己跟踪源码,发现原理还是这些.只不过进行了重新组织,进行了一些封装,使得扩展性更好.所以还是把这些东西从记事本贴进来吧. 关于这些类的介绍以及使用,有的是在自己debug中看到的,多数为纯翻译API的注释,但是翻译的过程受益良多. GenericOptio

hadoop中mapreduce的常用类(二)

云智慧(北京)科技有限公司陈鑫 NullWritable  不想输出的时候,把它当做key.NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型. FileInputFormat继承于InputFormat InputFormat的作用: 验证输入规范:切分输入文件

java中对于多态的一个实例分析

首先来看这样的一段代码,其中对于类的定义如下: class Parent{ public int myValue=100; public void printValue() { System.out.println("Parent.printValue(),myValue="+myValue); } } class Child extends Parent{ public int myValue=200; public void printValue() { System.out.pr

Hadoop中MapReduce计算框架以及HDFS可以干点啥

我准备学习用hadoop来实现下面的过程: 词频统计 存储海量的视频数据 倒排索引 数据去重 数据排序 聚类分析 ============= 先写这么多

Hadoop中两表JOIN的处理方法

http://dongxicheng.org/mapreduce/hadoop-join-two-tables/ http://dongxicheng.org/mapreduce/run-hadoop-job-problems/ http://dongxicheng.org/mapreduce/hdfs-small-files-solution/

PHP中auto_prepend_file与auto_append_file用法实例分析

如果需要将文件require到所有页面的顶部与底部. 第一种方法:在所有页面的顶部与底部都加入require语句. 例如: ? require('header.php'); //页面正文内容部分 require('footer.php'); 但这种方法如果需要修改顶部或底部require的文件路径,则需要修改所有页面文件.而且需要每个页面都加入require语句,比较麻烦. 第二种方法:使用auto_prepend_file与auto_append_file在所有页面的顶部与底部require文