map join 与 reduce join

要解决什么问题?

解决的都是同一个问题,即将两张“表‘进行join操作。

reduce join是在map阶段完成数据的标记,在reduce阶段完成数据的合并

map join是直接在map阶段完成数据的合并,没有reduce阶段

比如有如下问题:

这是订单表。

这是商品表。

现在需要将商品表中的商品名称填充到订单表中。得到如下的联合表:

也就是对商品表和订单表根据pid进行join操作,同时剔除联合表中的pid属性。

Reduce Join

map:

将输入数据统一封装为一个Bean,此Bean包含了商品表和订单表的所有公共和非公共属性,相当于进行了全外连接,并新增加一个属性——文件名称,以区分数据是来自与商品表还是订单表;map输出的key是pid,value是bean

shuffle:

根据pid对bean进行排序,所有pid相同的数据都会被聚合到同一个key下,发往同一个reducetask

reduce:

对同一个pid下所有的bean,首先要区分出它们是来自于哪个表,是商品表还是订单表。如果是商品表,数据只有一条,保存其中的pname属性;如果是订单表,数据有多条,用保存的pname属性替换pid属性,并输出

map join

没有reduce过程,所有的工作都在map阶段完成,极大减少了网络传输和io的代价。如何实现:

上述的join过程可以看作外表与内表的连接过程,外表是订单表,外表大,内表是商品表,内表小。所以可以把内表事先缓存于各个maptask结点,然后等到外表的数据传输过来以后,直接用外表的数据连接内表的数据并输出即可

1)在driver中设置加载缓存文件,这样每个maptask就可以获取到该文件;设置reducetask个数为0,去除reduce阶段

2)在setup方法中读取缓存文件,并将结果以kv的形式存入hashmap,k是pid,v是pname

3)在map方法中,根据pid,通过hashmap找到pname,替换pid,写出结果;

完整的示例代码如下:

mapper

package mapjoin;

import java.io.BufferedReader;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private  Map<String,String> produs = new HashMap<>();
    Text text = new Text();

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {

        //获取缓存文件的输入流
        URI[] uris=context.getCacheFiles();
        String path=uris[0].getPath().toString();
        BufferedReader br=new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));

        //将小表中的数据,封装进HashMap中
        String line = null;
        while(StringUtils.isNotEmpty((line=br.readLine()))) {
            String[] fields = line.split("\t");
            produs.put(fields[0], fields[1]);
        }
    }

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {

        //这里的记录,只来源于order表,也就是大表,直接切割并封装
        String[] fields = value.toString().split("\t");
        String pname = produs.get(fields[1]);
        pname = (pname==null)?"":pname;

        String s = fields[0]+"\t"+pname+"\t"+fields[2];
        text.set(s);
        context.write(text, NullWritable.get());
    }
}

driver

package mapjoin;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapJoinDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // TODO Auto-generated method stub

        // 0 根据自己电脑路径重新配置
        args = new String[]{"e:/input/table/order.txt", "e:/output/table2"};

        // 1 获取job信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);

        // 3 关联map
        job.setMapperClass(MapJoinMapper.class);

        // 4 设置最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 5 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 加载缓存数据
        job.addCacheFile(new URI("file:///e:/input/inputcache/pro.txt"));

        // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 8 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

原文地址:https://www.cnblogs.com/chxyshaodiao/p/12636768.html

时间: 2024-08-08 12:13:19

map join 与 reduce join的相关文章

MapReduce中的Reduce join操作

-------file1[ID NAME]-------- 1 zhangsan2 lisi3 wangwu -------file2[ID VALUE]--------1 452 563 89 -------结果[NAME VALUE]------------zhagnsan 45lisi 56wangwu 89 一般数据库的join操作 a join b  on a.id = b.id 后面的条件在reduce中指的是相同的key,在sql中很容易区分出后面条件的字段到底来自那张表 而在Ma

Hadoop的Reduce Join+BloomFilter实现表链接

适用于场景 连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有MAP端过滤掉不需要JOIN的数据,这样传到REDUCE的数据量减少,减少了网络传及磁盘IO. 缺点 Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间.并且,最终的JOIN在REDUCE端还要进行比对,所以对最终结果无影响. 下面我们先来简单了解下什么是布隆过滤器? Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的.它实际上是一

[译]PYTHON FUNCTIONS - MAP, FILTER, AND REDUCE

map, filter, and reduce Python提供了几个函数,使得能够进行函数式编程.这些函数都拥有方便的特性,他们可以能够很方便的用python编写. 函数式编程都是关于表达式的.我们可以说,函数式编程是一种面向表达式的编程. Python提供的面向表达式的函数有: map(aFunction, aSequence) filter(aFunction, aSequence) reduce(aFunction, aSequence) lambda list comprehensio

Hive 中Join的专题---Join详解

1.什么是等值连接? 2.hive转换多表join时,如果每个表在join字句中,使用的都是同一个列,该如何处理? 3.LEFT,RIGHT,FULL OUTER连接的作用是什么? 4.LEFT或RIGHT join是连接从左边还有右边? Hive表连接的语法支持如下: Sql代码  : join_table: table_reference JOIN table_factor [join_condition] | table_reference {LEFT|RIGHT|FULL} [OUTER

sql之left join、right join、inner join的区别

left join(左联接) 返回包括左表中的所有记录和右表中联结字段相等的记录 right join(右联接) 返回包括右表中的所有记录和左表中联结字段相等的记录inner join(等值连接) 只返回两个表中联结字段相等的行 举例如下: --------------------------------------------表A记录如下:aID aNum1 a200501112 a200501123 a200501134 a200501145 a20050115 表B记录如下:bID bNa

sql语法:inner join on, left join on, right join on详细使用方法

inner join(等值连接) 只返回两个表中联结字段相等的行 left join(左联接) 返回包括左表中的所有记录和右表中联结字段相等的记录 right join(右联接) 返回包括右表中的所有记录和左表中联结字段相等的记录 INNER JOIN 语法: INNER JOIN 连接两个数据表的用法: SELECT * FROM 表1 INNER JOIN 表2 ON 表1.字段号=表2.字段号 INNER JOIN 连接三个数据表的用法: SELECT * FROM (表1 INNER J

【转】Mysql之inner join,left join,right join详解

首先借用官方的解释下: inner join(等值连接):只返回两个表中联结字段相等的行: left join(左联接):返回包括左表中的所有记录和右表中联结字段相等的记录: right join(右联接):返回包括右表中的所有记录和左表中联结字段相等的记录. 比如我们有xs.cj两个表 xs表 cj表 --------------- ---------------------- id name id score 1 张三 1 96 2 李四 2 80 3 86 Sql代码 1 SELECT *

图解MYSQL JOIN ON,SQL JOIN 详解,数据库sql join语句

对于SQL的Join,在学习起来可能是比较乱的.我们知道,SQL的Join语法有很多inner的,有outer的,有left的,有时候,对于Select出来的结果集是什么样子有点不是很清楚.Coding Horror上有一篇文章(实在不清楚为什么Coding Horror也被墙)通过 文氏图 Venn diagrams 解释了SQL的Join.我觉得清楚易懂,转过来. 假设我们有两张表. Table A 是左边的表. Table B 是右边的表. 其各有四条记录,其中有两条记录是相同的,如下所示

NESTED LOOPS &amp; HASH JOIN &amp; SORT MERGE JOIN

表连接方式及使用场合 NESTED LOOP 对于被连接的数据子集较小的情况,nested loop连接是个较好的选择.nested loop就是扫描一个表,每读到一条记录,就根据索引去另一个表里面查找,没有索引一般就不会是 nested loops.一般在nested loop中, 驱动表满足条件结果集不大,被驱动表的连接字段要有索引,这样就走nstedloop.如果驱动表返回记录太多,就不适合nested loops了.如果连接字段没有索引,则适合走hash join,因为不需要索引. 可用