MapReduce Join的使用

一、Map端Join

  可连接两个都非常大的数据集之间可使用map端连接,数据在到达map端之前就执行连接操作。

  需满足:

    两个要连接的数据集都先划分成相同数量的分区,相同的key要保证在同一分区中(每个分区中两个数据集数据量不一定要要相同), 并且要 按连接key排序;

   利用CompositeInputFormat类,可实现map端连接:

  代码参考:GitHub上Join示例

  其它参考:hadoop实现join (CompositeInputFormat)

       参考2 

  

二、Reduce端连接

  Reduce端连接更简单易用,以天气连接为例:

    

使用步骤:

  1、使用MutipleInputs类设定不同输入数据集的InputFormat,以及Mapper;

  2、辅助排序:通过自定义一个WritableComparable类型的 T,添加一个辅助排序字段,重写compareTo()方法,

          作为传入Reducer的key,可完成可控的二次排序;

  3、自定义Partitioner类,保证以自定义WritableComparable类型的T以首字段进行分区;自定一个分组Comparator类;

job.setPartitionerClass(KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);

    自定义Partitioner类、Comparator: 

public static class KeyPartitioner extends Partitioner<TextPair, Text> {
    @Override
    public int getPartition(TextPair key, Text value, int numPartitions) {
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

public static class FirstComparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public FirstComparator() {
        super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
            return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        if (a instanceof TextPair && b instanceof TextPair) {
            return ((TextPair) a).first.compareTo(((TextPair) b).first);
        }
        return super.compare(a, b);
    }
}

  3、在Reducer中把选到达的key提取出来,即可自定义完成Join操作;

三、

时间: 2024-11-06 00:54:14

MapReduce Join的使用的相关文章

mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204).有空再看看 实际上实现过程是不是和他写的代码一样. 前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天

SQL join中级篇--hive中 mapreduce join方法分析

1. 概述. 本文主要介绍了mapreduce框架上如何实现两表JOIN. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签 (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2.

MapReduce Join关联

Reduce join 原理 Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录.然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出. Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了 需求 订单数据表t_order id pid amount 1001 01 1 1002 02 2 1003 03 3

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

MapReduce中的Join算法

在关系型数据库中Join是非常常见的操作,各种优化手段已经到了极致.在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要从不同的数据源中获取数据.不同于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法. 我们先简要地描述待解决的问题.假设有两个数据集:气象站数据库和天气记录数据库 气象站的示例数据,如下 Station ID Station Name 011990-99999 SIHCCAJAVRI 012650-99999 TRN

MapReduce中的join算法-reduce端join

在海量数据的环境下,不可避免的会碰到join需求, 例如在数据分析时需要连接从不同的数据源中获取到数据. 假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一. 一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息. 气象站和天气记录的示例数据分别如下所示: Station ID            Station Name 011990-99999    SIHCCAJAVRI 012650-99999    TRNSET-HANSMOEN Statio

Sample Join Analysis

Sample data: student.txt 1,yaoshuya,25 2,yaoxiaohua,29 3,yaoyuanyie,15 4,yaoshupei,26 Sample data:score.txt 1,yuwen,100 1,shuxue,99 2,yuwen,99 2,shuxue,88 3,yuwen,99 3,shuxue,56 4,yuwen,33 4,shuxue,99 输出文件内容: 1    [yaoshuya,25,yuwen,100] 1    [yaoshu

MapReduce高级特性

计数器 因为计数器的查看往往比查看集群日志来的方便快捷 所以有些情况下计数器信息比集群日志更加有效 用户自定义的计数器 关于Hadoop的内置计数器的介绍可以参考Hadoop权威指南第九章MapReduce Features中的Build-in Counts小节 这里限于篇幅不再说明 MapReduce允许用户在程序中使用枚举或者字符串的格式类自定义计数器 一个作业可以定义的计数器不限,使用枚举类型时 枚举类型的名称即为组名,枚举类型的字段即为计数器名 计数器是全局的,会跨越所有Mapper和R

mapreduce 的二次排序

一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 二: 编写实现二次排序功能, 提供源码文件. 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路. 一: 二次排序 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 1.1 二次排序的功能 1. 当客户端提交一个作业的时候,hadoop 会开启yarn 接受进行数据拷贝处理,之后交友有yarn 框架上的启动服务resourcemanage