MapReduce中的Reduce join操作

-------file1[ID NAME]--------
1 zhangsan
2 lisi
3 wangwu

-------file2[ID VALUE]--------
1 45
2 56
3 89

-------结果[NAME VALUE]------------
zhagnsan 45
lisi 56
wangwu 89

一般数据库的join操作

a join b  on a.id = b.id

后面的条件在reduce中指的是相同的key,在sql中很容易区分出后面条件的字段到底来自那张表

而在MapReduce中呢,就不好区分了,所以这里在Map阶段做标记

当map读取原始文件的时,能不能区分出是file1还是file2?

FileSplit files = (FileSplit)context.getInputSplit();
 String path = files.getPath().toString();

Map阶段完成,进入reduce阶段

ID做k2 ,V2变为 1 {zhangsan,45}   但是如何区分 谁在前面,谁在后面呢?

这时候就需要用到map阶段打标记

问:map阶段如何打标记?

当我们判断出是file1时,对v2做标记,让V2的值是 #zhangsan,如果是file2时,让V2的值是*45

这样的话到了reduce就很容易区分了,谁做k3,v3就显而易见了

一:背景

Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程。

二:技术实现

基本思路
(1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的。
(2):在reduce处理函数中,按照标识对数据进行处理。
(3):然后根据Key去join来求出结果直接输出。

#需求:现有user表和city表,按cityID进行连接

user表:

1    zhangSan    1
2    liSi    2
3    wangWu    1
4    zhaoLiu    3
5    maQi    3
注:第三列为cityID。
city表:

1    beiJin
2    shangHai
3    guangZhou

注:关于表连接操作,我们可以实现直接打标记的做法,看这里,也可以使用实体bean的方式。这篇文章是采用实体bean的方式实现表连接操作。

实现代码:

UserCity.java:

public class UserCity implements WritableComparable<UserCity>{

    //用户ID
    private String userNo = "";
    //用户名
    private String userName = "";
    //城市ID
    private String cityNo = "";
    //城市名称
    private String cityName = "";
    //用户和城市的标志
    private int flag = 0;

    public UserCity() {
    }

    public UserCity(String userNo, String userName, String cityNo, String cityName, int flag) {
        this.userNo = userNo;
        this.userName = userName;
        this.cityNo = cityNo;
        this.cityName = cityName;
        this.flag = flag;
    }

    public UserCity(UserCity user) {
        this.userNo = user.getUserNo();
        this.userName = user.getUserName();
        this.cityNo = user.getCityNo();
        this.cityName = user.getCityName();
        this.flag = user.getFlag();
    }

    public String getUserNo() {
        return userNo;
    }

    public void setUserNo(String userNo) {
        this.userNo = userNo;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getCityNo() {
        return cityNo;
    }

    public void setCityNo(String cityNo) {
        this.cityNo = cityNo;
    }

    public String getCityName() {
        return cityName;
    }

    public void setCityName(String cityName) {
        this.cityName = cityName;
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        this.userNo = input.readUTF();
        this.userName = input.readUTF();
        this.cityNo = input.readUTF();
        this.cityName = input.readUTF();
        this.flag = input.readInt();
    }

    @Override
    public void write(DataOutput output) throws IOException {
        output.writeUTF(this.userNo);
        output.writeUTF(this.userName);
        output.writeUTF(this.cityNo);
        output.writeUTF(this.cityName);
        output.writeInt(this.flag);

    }

    @Override
    public int compareTo(UserCity o) {

        return 0;
    }

    @Override
    public String toString() {
        return "userNo=" + userNo + ", userName=" + userName + ", cityName=" + cityName ;
    }

}
注:把要关联的字段定义到一个实体bean中,并且添加一个Boolean变量,用于标记。

UserCityJoinMapReduce.java:

public class UserCityJoinMapReduce {

    //定义输入输出路径
        private static final String INPATH = "hdfs://liaozhongmin21:8020/reduceJoinFiles";
        private static final String OUTPATH = "hdfs://liaozhongmin21:8020/out";

        public static void main(String[] args) {
            try {
                //创建配置
                Configuration conf = new Configuration();

                //创建FileSystem
                FileSystem fileSystem = FileSystem.get(new URI(OUTPATH), conf);
                //判断输出文件是否存在,如果存在就进行删除
                if (fileSystem.exists(new Path(OUTPATH))){
                    fileSystem.delete(new Path(OUTPATH), true);
                }

                //创建Job
                Job job = new Job(conf, UserCityJoinMapReduce.class.getName());

                //设置输入文件的输入格式
                job.setInputFormatClass(TextInputFormat.class);
                //设置输入目录
                FileInputFormat.setInputPaths(job, new Path(INPATH));

                //设置自定义Mapper
                job.setMapperClass(UserCityJoinMapper.class);

                //设置Mapper输出的Key和Value
                job.setMapOutputKeyClass(IntWritable.class);
                job.setMapOutputValueClass(UserCity.class);

                //设置分区
                job.setPartitionerClass(HashPartitioner.class);
                //设置Reducer的个数
                job.setNumReduceTasks(1);

                //设置自定义的Reducer
                job.setReducerClass(UserCityJoinReducer.class);

                //设置输出的格式化类
                job.setOutputFormatClass(TextOutputFormat.class);
                //设置输出目录
                FileOutputFormat.setOutputPath(job, new Path(OUTPATH));

                //设置输出的key和value
                job.setOutputKeyClass(NullWritable.class);
                job.setOutputValueClass(Text.class);

                //提交任务
                System.exit(job.waitForCompletion(true) ? 1 : 0);

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    public static class UserCityJoinMapper extends Mapper<LongWritable, Text, IntWritable, UserCity>{
        //定义输出的key和value
        private IntWritable outKey = new IntWritable();
        private UserCity user = null;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, UserCity>.Context context) throws IOException,
                InterruptedException {
            //获取行文本内容
            String line = value.toString();
            //对行文本内容进行切分
            String[] splits = line.split("\t");

            //对字符串数组进行判断
            if (splits.length == 2){//如果长度为2就表示城市信息
                //创建对象
                user = new UserCity();
                //设置属性
                user.setCityNo(splits[0]);
                user.setCityName(splits[1]);
                user.setFlag(0);
                //设置输出的key
                outKey.set(Integer.parseInt(splits[0]));
                //把结果写出去
                context.write(outKey, user);
            } else if (splits.length == 3){//如果长度为3就表示是User对象
                //创建对象
                user = new UserCity();
                //设置属性
                user.setUserNo(splits[0]);
                user.setUserName(splits[1]);
                //不要忘记设置关联属性
                user.setCityNo(splits[2]);
                //设置标志:1表示用户
                user.setFlag(1);

                //设置输出去key(城市ID)
                outKey.set(Integer.parseInt(splits[2]));
                //把结果写出去
                context.write(outKey, user);
            }
        }
    }

    /**
     * 城市和用户是一对多(一个城市对应多个用户),也就是说相同key传过来之后的结果就是一个城市和多个用户(这个至关重要!)
     * 问题:多对多怎么搞?
     * @author 廖钟民
     *2015年4月6日下午9:40:17
     */
    public static class UserCityJoinReducer extends Reducer<IntWritable, UserCity, NullWritable, Text>{

        //定义输出的value
        private Text outValue = new Text();
        //城市对象(用于存储唯一的城市)
        private UserCity city = null;
        //定义集合用于存储对象
        private List<UserCity> userCities = new ArrayList<UserCity>();

        @Override
        protected void reduce(IntWritable key, Iterable<UserCity> values, Reducer<IntWritable, UserCity, NullWritable, Text>.Context context) throws IOException,
                InterruptedException {

            //使用list集合之前,要清空上一次的数据。
            userCities.clear();

            //遍历values,把结果装到List集合中
            for (UserCity u : values){//这个values集合中只有一个城市对象,多个用户对象

                if (u.getFlag() == 0){//如果标志为0表示城市对象,这个是唯一的City对象
                    city = new UserCity(u);
                } else {//除了唯一的City对象外,其他的都是用户对象,把这些用户对象都添加到集合里
                    userCities.add(new UserCity(u));
                }
            }

            //遍历集合(把用户对象的城市信息都给填补上)
            for (UserCity user : userCities){

                //给用户对象设置城市信息
                user.setCityName(city.getCityName());
                //设置写出去value
                outValue.set(user.toString());
                //把结果写出去
                context.write(NullWritable.get(), outValue);
            }
        }
    }
}

程序运行的结果如下:
时间: 2024-08-06 16:06:22

MapReduce中的Reduce join操作的相关文章

MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来 map端的join 适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作 在一个TaskTracker中可以运行多个map任务.每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了.使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从

MapReduce中combine、partition、shuffle的作用是什么

http://www.aboutyun.com/thread-8927-1-1.html Mapreduce在hadoop中是一个比較难以的概念.以下须要用心看,然后自己就能总结出来了. 概括: combine和partition都是函数.中间的步骤应该仅仅有shuffle! 1.combine combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,能够自己定义的. combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204).有空再看看 实际上实现过程是不是和他写的代码一样. 前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天

MapReduce 实现数据join操作

前段时间有一个业务需求,要在外网商品(TOPB2C)信息中加入 联营自营 识别的字段.但存在的一个问题是,商品信息 和 自营联营标示数据是 两份数据:商品信息较大,是存放在hbase中.他们之前唯一的关联是url.所以考虑用url做key将两者做join,将 联营自营标识 信息加入的商品信息中,最终生成我需要的数据: 一,首先展示一下两份数据的demo example 1. 自营联营标识数据(下面开始就叫做unionseller.txt) http://cn.abc.www/product436

(转)MapReduce 中的两表 join 几种方案简介

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

MapReduce 中的两表 join 实例

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标

(转)MapReduce中的两表join几种方案简介

转自:http://blog.csdn.net/leoleocmm/article/details/8602081 1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和Fi

Hadoop初学指南(8)--MapReduce中的Combiner操作

本文主要介绍了MapReduce中的Combiner操作. 在MapReduce的执行步骤中,我们一共分了8步,其中Map中的最后一步规约操作就是今天要讲的Combiner. 首先看一下前文中的计数器: 我们可以发现,其中有两个计数器:Combine output records和Combine input records,他们的计数都是0,这是因为我们在代码中没有进行规约操作. 现在我们加入规约操作. 在前文代码(参看http://xlows.blog.51cto.com/5380484/14