MapReduce实现多表链接

多表链接

输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名地址名对应关系,输出"工厂名——地址名"表。

factory:

factoryname                    addressed
Beijing Red Star                    1
Shenzhen Thunder                3
Guangzhou Honda                2
Beijing Rising                       1
Guangzhou Development Bank      2
Tencent                        3
Back of Beijing                     1

address:

addressID    addressname
1            Beijing
2            Guangzhou
3            Shenzhen
4            Xian

设计思路

取出两个表中共同列作为map中的key,同时需要标识每个列所在的表,供在reduce中拆分。

代码实现

Mapper类

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyMapper extends Mapper<LongWritable,Text,Text,Text> {

    private static Text k = new Text();
    private static Text v= new Text();
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String path = ((FileSplit)context.getInputSplit()).getPath().getName();//获取文件名
        String line = value.toString();
        StringTokenizer st = new StringTokenizer(value.toString());
        String[] tmp = line.split("    +");
        if(tmp.length ==2){
            String first = tmp[0];
            String second = tmp[1];
            if(path.equals("factory")){
                if(first.equals("factoryname")) return;
                k.set(second);
                v.set(first+"1");
            }else if(path.equals("address")){
                if(second.equals("addressname")) return;
                k.set(first);
                v.set(second+"2");
            }
            context.write(k,v);
        }
    }
}

Reducer类

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, Text, Text, Text>{

    private Text k = new Text();
    private Text v = new Text();

    @Override
    protected void setup(Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text("factoryname"), new Text("addressname"));
    }

    @Override
    protected void reduce(Text key, Iterable<Text> value,Context context)
            throws IOException, InterruptedException {
            List<String> factory = new ArrayList<String>();
            List<String> address = new ArrayList<String>();
            for(Text val : value){
                String str = val.toString();
                String stf = str.substring(str.length()-1);
                String con = str.substring(0,str.length()-1);
                int flag = Integer.parseInt(stf);
                if(flag == 1){
                    factory.add(con);
                }else if(flag ==2){
                    address.add(con);
                }
            }
            for(int i=0;i<factory.size();i++){
                k.set(factory.get(i));
                for(int j=0;j<address.size();j++){
                    v.set(address.get(j));
                    context.write(k, v);
                }
            }
    }
}

Job驱动类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MTJoin {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = new Job(conf,"multi table join");
        job.setJarByClass(MTJoin.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/mtinput"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/usr/qqx/mtoutput"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
时间: 2024-10-06 20:47:11

MapReduce实现多表链接的相关文章

MapReduce实现单表链接

单表关联 实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表. file: child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma 设计思路 MapReduc

MapReduce实现两表的Join--原理及python和java代码实现

用Hive一句话搞定的,但是有时必须要用mapreduce 方法介绍 1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side jo

Hadoop的Reduce Join+BloomFilter实现表链接

适用于场景 连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有MAP端过滤掉不需要JOIN的数据,这样传到REDUCE的数据量减少,减少了网络传及磁盘IO. 缺点 Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间.并且,最终的JOIN在REDUCE端还要进行比对,所以对最终结果无影响. 下面我们先来简单了解下什么是布隆过滤器? Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的.它实际上是一

SQL之表链接

表链接:join on 默认前面有修饰符inner join 内连接 a表和b表所有的根据关系可能对 应的的链接方式显示出来 没有关系不显示 当两个表中间存在某个关系的时候需要把它整 合成一个表 显示出来在select和from中间把想要显示的直接写上然 后from某一个表 拼接join另外一个表 on后边 写这两个表之间的关系先去car表里面找第一条数据 然后拿着第一条 数据信息根据on的关系去brand表里面找那条数 据对接起来 例子:把 code name brand_name显示出来se

sql表链接

表链接:join on 默认前面有修饰符inner join 内连接 a表和b表所有的根据关系可能对 应的的链接方式显示出来 没有关系不显示 当两个表中间存在某个关系的时候需要把它整 合成一个表 显示出来在select和from中间把想要显示的直接写上然 后from某一个表 拼接join另外一个表 on后边 写这两个表之间的关系先去car表里面找第一条数据 然后拿着第一条 数据信息根据on的关系去brand表里面找那条数 据对接起来 例子:把 code name brand_name显示出来se

大数据技术之MapReduce中多表合并案例

大数据技术之MapReduce中多表合并案例 1)需求: 订单数据表t_order: id pid amount 1001 01 1 1002 02 2 1003 03 3 订单数据order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6 商品信息表t_product pid pname 01 小米 02 华为 03 格力 商品数据pd.txt 01 小米 02 华为 03 格力 将商品信息表中数据根据商品pid合

实训任务05 MapReduce获取成绩表的最高分记录

实训任务05  MapReduce获取成绩表的最高分记录 实训1:统计用户纺问次数 任务描述: 统计用户在2016年度每个自然日的总访问次数.原始数据文件中提供了用户名称与访问日期.这个任务就是要获取以每个自然日为单位的所有用户访问次数的累加值.如果通过MapReduce编程实现这个任务,首先要考虑的是,Mapper与Reducer各自的处理逻辑是怎样的:然后根据处理逻辑编写出核心代码:最后在Eclipse中编写完整代码,编译打包后提交给集群运行. 分析思路和逻辑 (1)       输入/输出

MapReduce,DataJoin,链接多数据源

主要介绍用DataJoin类来链接多数据源,先看一下例子,假设二个数据源customs和orders customer ID       Name      PhomeNumber 1                        赵一        025-5455-566 2                        钱二        025-4587-565 3                        孙三        021-5845-5875 客户的订单号: Custom

MapReduce实现两表join

一.方法介绍 假设要进行join的数据分别来自File1和File2. 参考:https://blog.csdn.net/yimingsilence/article/details/70242604 1.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,t