MapReduce实现Reduce端Join操作实例

使用案例:

联接两张表
Table EMP:(新建文件EMP,第一行属性名不要)
Name        Sex      Age        DepNo
zhang       male     20           1
li         female    25           2
wang       female    30           3
zhou        male     35           2

Table Dep:(新建文件DEP,第一行属性名不要)
DepNo        DepName
1            Sales
2            Dev
3            Mgt

Inner join:
select Name,Sex,Age,DepName from EMP inner join DEP on EMP.DepNo=DEP.DepNo

Result:
Name        Sex      Age        DepName
zhang       male     20          Sales
li         female    25          Dev
wang       female    30          Mgt
zhou        male     35          Dev

接下来使用MapReduce实进行Join操作。

Reduce端进行Join操作

reduce端联接比map端联接更普遍,因为输入的数据不需要特定的结构;效率低,因为所有数据必须经过shuffle过程,但是编写简单。

基本思路:

1、Map端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的;

2、在reduce处理函数里,按照标识对数据进行保存

3、然后根据Key的Join来求出结果直接输出;

package Join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class EmpJoinDep implements WritableComparable{

    private String Name="";
    private String Sex="";
    private int Age=0;
    private int DepNo=0;
    private String DepName="";
    private String table="";
    public EmpJoinDep() {}

    public EmpJoinDep(EmpJoinDep empJoinDep) {
        this.Name = empJoinDep.getName();
        this.Sex = empJoinDep.getSex();
        this.Age = empJoinDep.getAge();
        this.DepNo = empJoinDep.getDepNo();
        this.DepName = empJoinDep.getDepName();
        this.table = empJoinDep.getTable();
    }

    public String getName() {
        return Name;
    }

    public void setName(String name) {
        Name = name;
    }

    public String getSex() {
        return Sex;
    }

    public void setSex(String sex) {
        this.Sex = sex;
    }

    public int getAge() {
        return Age;
    }

    public void setAge(int age) {
        this.Age = age;
    }

    public int getDepNo() {
        return DepNo;
    }

    public void setDepNo(int depNo) {
        DepNo = depNo;
    }

    public String getDepName() {
        return DepName;
    }

    public void setDepName(String depName) {
        DepName = depName;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(Name);
        out.writeUTF(Sex);
        out.writeInt(Age);
        out.writeInt(DepNo);
        out.writeUTF(DepName);
        out.writeUTF(table);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.Name = in.readUTF();
        this.Sex = in.readUTF();
        this.Age = in.readInt();
        this.DepNo = in.readInt();
        this.DepName = in.readUTF();
        this.table = in.readUTF();
    }

    //不做任何排序
    @Override
    public int compareTo(Object o) {
        return 0;
    }

    @Override
    public String toString() {
        return "EmpJoinDep [Name=" + Name + ", Sex=" + Sex + ", Age=" + Age
                + ", DepName=" + DepName + "]";
    }

}
package Join;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceJoin {
    private final static String INPUT_PATH = "hdfs://liguodong:8020/inputjoin";
    private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputmapjoin";

    public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, EmpJoinDep>{
        private EmpJoinDep empJoinDep = new EmpJoinDep();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] values = value.toString().split("\\s+");
            if(values.length==4){
                empJoinDep.setName(values[0]);
                empJoinDep.setSex(values[1]);
                empJoinDep.setAge(Integer.parseInt(values[2]));
                empJoinDep.setDepNo(Integer.parseInt(values[3]));
                empJoinDep.setTable("EMP");
                context.write(new IntWritable(Integer.parseInt(values[3])), empJoinDep);
            }

            if(values.length==2){
                empJoinDep.setDepNo(Integer.parseInt(values[0]));
                empJoinDep.setDepName(values[1]);
                empJoinDep.setTable("DEP");
                context.write(new IntWritable(Integer.parseInt(values[0])), empJoinDep);
            }
        }
    }

    public static class MyReducer extends Reducer<IntWritable, EmpJoinDep, NullWritable, EmpJoinDep>{

        @Override
        protected void reduce(IntWritable key, Iterable<EmpJoinDep> values,
                Context context)
                throws IOException, InterruptedException {
            String depName = "";
            List<EmpJoinDep> list = new LinkedList<EmpJoinDep>();
            //1  emp
            //1  dep
            for (EmpJoinDep val : values) {
                list.add(new EmpJoinDep(val));
                //如果是部门表,如果部门编号为1,则获取该部门的名字。
                if(val.getTable().equals("DEP")){
                    depName = val.getDepName();
                }
            }
            //如果上面部门编号是1,则这里也是1。
            for (EmpJoinDep listjoin : list) {
                //如果是员工表,则需要设置员工的所属部门。
                if(listjoin.getTable().equals("EMP")){
                    listjoin.setDepName(depName);
                    context.write(NullWritable.get(), listjoin);
                }

            }

        }

    } 

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "Reduce Join"); 

        job.setJarByClass(ReduceJoin.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(EmpJoinDep.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(EmpJoinDep.class);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行结果:

**上传数据:**
[root@liguodong file]# vi EMP
[root@liguodong file]# vi DEP
[root@liguodong file]# hdfs dfs -mkdir /inputjoin
[root@liguodong file]# hdfs dfs -put EMP /inputjoin/
[root@liguodong file]# hdfs dfs -put DEP /inputjoin/
[root@liguodong file]# hdfs dfs -cat /inputjoin/DEP
1            Sales
2            Dev
3            Mgt
[root@liguodong file]# hdfs dfs -cat /inputjoin/EMP
zhang       male     20           1
li         female    25           2
wang       female    30           3
zhou        male     35           2

[root@liguodong file]# hdfs dfs -cat /outputmapjoin/p*
EmpJoinDep [Name=zhang, Sex=male, Age=20, DepName=Sales]
EmpJoinDep [Name=zhou, Sex=male, Age=35, DepName=Dev]
EmpJoinDep [Name=li, Sex=female, Age=25, DepName=Dev]
EmpJoinDep [Name=wang, Sex=female, Age=30, DepName=Mgt]
时间: 2025-01-06 17:36:46

MapReduce实现Reduce端Join操作实例的相关文章

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来 map端的join 适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作 在一个TaskTracker中可以运行多个map任务.每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了.使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从

MapReduce表连接操作之Reduce端join

一:背景 Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程. 二:技术实现 基本思路 (1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的. (2):在reduce处理函数中,按照标识对数据进行处理. (3):然后根据Key去join来求出结果直接输出. 数据准备 准备好下面两张表: (1):tb_a(以下简称表A) [java] view plain copy id  name 1

MapReduce的Reduce side Join

1. 简单介绍 reduce side  join是全部join中用时最长的一种join,可是这样的方法可以适用内连接.left外连接.right外连接.full外连接和反连接等全部的join方式.reduce side  join不仅能够对小数据进行join,也能够对大数据进行join,可是大数据会占用大量的集群内部网络IO,由于全部数据终于要写入到reduce端进行join. 假设要做join的数据量很大的话.就不得不用reduce join了. 2. 适用场景 -join的两部分数据量很大

MapReduce中的join算法-reduce端join

在海量数据的环境下,不可避免的会碰到join需求, 例如在数据分析时需要连接从不同的数据源中获取到数据. 假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一. 一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息. 气象站和天气记录的示例数据分别如下所示: Station ID            Station Name 011990-99999    SIHCCAJAVRI 012650-99999    TRNSET-HANSMOEN Statio

hive 常用的 join 操作 实例

test_a 表 id value 1 java 2 python 3 c++ test_b 表 id value 1 java 2 go 3 php 4 c++ 1. join 计算的是笛卡尔积,不推荐使用 select * from test_a join test_b on test_a.value = test_b.value; 查询结果: java java c++ c++ 2. left outer join & right outer join 注意:最好将小表放在 左 | 右 s

Reduce端join弊端&amp;方法key探讨

原文地址:https://www.cnblogs.com/TiePiHeTao/p/fe9b73346f72255aee178385e3aae9a4.html

MapReduce 实现数据join操作

前段时间有一个业务需求,要在外网商品(TOPB2C)信息中加入 联营自营 识别的字段.但存在的一个问题是,商品信息 和 自营联营标示数据是 两份数据:商品信息较大,是存放在hbase中.他们之前唯一的关联是url.所以考虑用url做key将两者做join,将 联营自营标识 信息加入的商品信息中,最终生成我需要的数据: 一,首先展示一下两份数据的demo example 1. 自营联营标识数据(下面开始就叫做unionseller.txt) http://cn.abc.www/product436

MapReduce实现两表join

一.方法介绍 假设要进行join的数据分别来自File1和File2. 参考:https://blog.csdn.net/yimingsilence/article/details/70242604 1.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,t