MapReduce Join关联

Reduce join

原理

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了

需求

订单数据表t_order
id     pid    amount
1001    01      1
1002    02      2
1003    03      3

商品信息表t_product
pid    pname
01     小米
02     华为
03     格力

最终数据形式
id    pname    amount
1001    小米    1
1004    小米    4
1002    华为    2
1005    华为    5
1003    格力    3
1006    格力    6

缺点

缺点:这种方式中,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜(同一个reduce接收到的数据量很大)

 解决方案: map端实现数据合并

案例

package com.bigdata.mapreduce.table;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class TableBean implements Writable {
    private String order_id; // 订单id
    private String p_id; // 产品id
    private int amount; // 产品数量
    private String pname; // 产品名称
    private String flag;// 表的标记

    public TableBean() {
        super();
    }

    public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
        super();
        this.order_id = order_id;
        this.p_id = p_id;
        this.amount = amount;
        this.pname = pname;
        this.flag = flag;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    public String getOrder_id() {
        return order_id;
    }

    public void setOrder_id(String order_id) {
        this.order_id = order_id;
    }

    public String getP_id() {
        return p_id;
    }

    public void setP_id(String p_id) {
        this.p_id = p_id;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(order_id);
        out.writeUTF(p_id);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.order_id = in.readUTF();
        this.p_id = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }

    @Override
    public String toString() {
        return order_id + "\t" + pname + "\t" + amount + "\t" ;
    }
}
2)编写TableMapper程序
package com.bigdata.mapreduce.table;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
    TableBean bean = new TableBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // 1 获取输入文件类型
        FileSplit split = (FileSplit) context.getInputSplit();
        String name = split.getPath().getName();

        // 2 获取输入数据
        String line = value.toString();

        // 3 不同文件分别处理
        if (name.startsWith("order")) {// 订单表处理
            // 3.1 切割
            String[] fields = line.split("\t");

            // 3.2 封装bean对象
            bean.setOrder_id(fields[0]);
            bean.setP_id(fields[1]);
            bean.setAmount(Integer.parseInt(fields[2]));
            bean.setPname("");
            bean.setFlag("0");

            k.set(fields[1]);
        }else {// 产品表处理
            // 3.3 切割
            String[] fields = line.split("\t");

            // 3.4 封装bean对象
            bean.setP_id(fields[0]);
            bean.setPname(fields[1]);
            bean.setFlag("1");
            bean.setAmount(0);
            bean.setOrder_id("");

            k.set(fields[0]);
        }
        // 4 写出
        context.write(k, bean);
    }
}
3)编写TableReducer程序
package com.bigdata.mapreduce.table;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context)
            throws IOException, InterruptedException {

        // 1准备存储订单的集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        // 2 准备bean对象
        TableBean pdBean = new TableBean();

        for (TableBean bean : values) {

            if ("0".equals(bean.getFlag())) {// 订单表
                // 拷贝传递过来的每条订单数据到集合中
                TableBean orderBean = new TableBean();
                try {
                    BeanUtils.copyProperties(orderBean, bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                orderBeans.add(orderBean);
            } else {// 产品表
                try {
                    // 拷贝传递过来的产品表到内存中
                    BeanUtils.copyProperties(pdBean, bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        // 3 表的拼接
        for(TableBean bean:orderBeans){
            bean.setPname (pdBean.getPname());
            // 4 数据写出去
            context.write(bean, NullWritable.get());
        }
    }
}
4)编写TableDriver程序
package com.bigdata.mapreduce.table;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TableDriver {

    public static void main(String[] args) throws Exception {
        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 指定本程序的jar包所在的本地路径
        job.setJarByClass(TableDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Map join

使用场景

一张表十分小、一张表很大。

解决方案

在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。

具体办法

(1)在mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
  job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到task运行节点

案例

package test;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {

    public static void main(String[] args) throws Exception {
        // 1 获取job信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 设置加载jar包路径
        job.setJarByClass(DistributedCacheDriver.class);

        // 3 关联map
        job.setMapperClass(DistributedCacheMapper.class);
        // 4 设置最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 5 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 加载缓存数据
        job.addCacheFile(new URI("file:///e:/inputcache/pd.txt"));

        // 7 map端join的逻辑不需要reduce阶段,设置reducetask数量为0
        job.setNumReduceTasks(0);

        // 8 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
(2)读取缓存的文件数据
package test;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

    Map<String, String> pdMap = new HashMap<>();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {

        // 1 获取缓存的文件
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"),"UTF-8"));

        String line;
        while(StringUtils.isNotEmpty(line = reader.readLine())){
            // 2 切割
            String[] fields = line.split("\t");

            // 3 缓存数据到集合
            pdMap.put(fields[0], fields[1]);
        }

        // 4 关流
        reader.close();
    }

    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();

        // 2 截取
        String[] fields = line.split("\t");

        // 3 获取产品id
        String pId = fields[1];

        // 4 获取商品名称
        String pdName = pdMap.get(pId);

        // 5 拼接
        k.set(line + "\t"+ pdName);

        // 6 写出
        context.write(k, NullWritable.get());
    }
}

原文地址:https://www.cnblogs.com/lovemeng1314/p/11762041.html

时间: 2024-11-06 07:49:35

MapReduce Join关联的相关文章

oracle使用LEFT JOIN关联产生的问题在查询结果中使用CASE WHEN 无法判断

oracle使用LEFT JOIN关联产生的问题在查询结果中使用CASE WHEN 无法判断 查询方式一: 1 SELECT 2 CASE WHEN (SELECT CAST(SUM(CASE 3 WHEN (ALLOCABLE_PRIME_CURRENCY_VALUE IS NULL AND STATE_IND = 1) THEN 4 NVL(PRIME_CURRENCY_VALUE, 0) 5 ELSE 6 NVL(ALLOCABLE_PRIME_CURRENCY_VALUE, 0) END

mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204).有空再看看 实际上实现过程是不是和他写的代码一样. 前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天

PLSQL_性能优化系列02_Oracle Join关联

2014-09-25 BaoXinjian 一.摘要 Oracle三种主要连接方式的比较 1. Hash Join (1).概述 i. 读取一个表的资料,并将放置到内存中,并建立唯一关键字的位图索引 ii. 读取另一个表,和内存中表通过Hash算法进行比较 (2).适用对象 i. 大表连接小表 ii. 两个大表 2. Nested Loops (1).概述 i. 循环外表记录 ii. 进行逐个比对和内标的连接是否符合条件 (2).适用对象 小表驱动大表,返回较少的结果集 3. Merge Joi

SQL join中级篇--hive中 mapreduce join方法分析

1. 概述. 本文主要介绍了mapreduce框架上如何实现两表JOIN. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签 (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2.

thinkphp left join关联查询

使用sql查询:SELECT a.*,b.gs_schoolnamecn,b.gs_schoolnameen FROM tsh_greenaction a  LEFT JOIN tsh_greenschool b on a.greenschoolid=b.id 使用thinkphp连贯操作: $model=M('greenaction'); $list=$model->alias('a')->join(' LEFT JOIN tsh_greenschool b on a.greenschool

MapReduce Join的使用

一.Map端Join 可连接两个都非常大的数据集之间可使用map端连接,数据在到达map端之前就执行连接操作. 需满足: 两个要连接的数据集都先划分成相同数量的分区,相同的key要保证在同一分区中(每个分区中两个数据集数据量不一定要要相同), 并且要 按连接key排序:  利用CompositeInputFormat类,可实现map端连接: 代码参考:GitHub上Join示例 其它参考:hadoop实现join (CompositeInputFormat) 参考2 二.Reduce端连接 Re

Hive中小表与大表关联(join)的性能分析【转】

Hive中小表与大表关联(join)的性能分析 [转自:http://blog.sina.com.cn/s/blog_6ff05a2c01016j7n.html] 经常看到一些Hive优化的建议中说当小表与大表做关联时,把小表写在前面,这样可以使Hive的关联速度更快,提到的原因都是说因为小表可以先放到内存中,然后大表的每条记录再去内存中检测,最终完成关联查询.这样的原因看似合理,但是仔细推敲,又站不住脚跟. 多小的表算小表?如果所谓的小表在内存中放不下怎么办?我用2个只有几条记录的表做关联查询

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

MapReduce中的Join算法

在关系型数据库中Join是非常常见的操作,各种优化手段已经到了极致.在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要从不同的数据源中获取数据.不同于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法. 我们先简要地描述待解决的问题.假设有两个数据集:气象站数据库和天气记录数据库 气象站的示例数据,如下 Station ID Station Name 011990-99999 SIHCCAJAVRI 012650-99999 TRN