Hadoop 单表多表关联

单表:

package org.bigdata.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.bigdata.util.WordCountMapReduce.TextDescComparator;
import org.bigdata.util.WordCountMapReduce.WordCountCombiner;
import org.bigdata.util.WordCountMapReduce.WordCountMapper;
import org.bigdata.util.WordCountMapReduce.WordCountReducer;

/**
 * 单表关联
 *
 * @author wwhhf
 *
 */
public class SingleJoinMapReduce {

    /**
     * a->b b->c a->c
     *
     * @author wwhhf
     *
     */
    public static class SingleJoinMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String terms[] = value.toString().split(" ");
            // 正常顺序
            context.write(new Text(terms[0]), new Text(terms[1] + ":1"));
            // 颠倒顺序
            context.write(new Text(terms[1]), new Text(terms[0] + ":2"));
        }

    }

    public static class SingleJoinReducer extends
            Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> lefts = new ArrayList<>();
            List<String> rights = new ArrayList<>();
            for (Text value : values) {
                String terms[] = value.toString().split(":");
                if ("1".equals(terms[1])) {
                    lefts.add(terms[0]);
                } else {
                    rights.add(terms[0]);
                }
            }
            for (String left : lefts) {
                for (String right : rights) {
                    context.write(new Text(left), new Text(right));
                }
            }
        }

    }

    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName("SingleJoin");
            job.setJarByClass(SingleJoinMapReduce.class);

            // mapper
            job.setMapperClass(SingleJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(SingleJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setSortComparatorClass(TextDescComparator.class);

            FileInputFormat.addInputPath(job, new Path("/single"));
            FileOutputFormat.setOutputPath(job, new Path("/single_out/"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

}

多表:

package org.bigdata.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 多表关联
 *
 * @author wwhhf
 *
 */
public class MultiJoinMapReduce {

    public static class MultiJoinMapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String fileName = fileSplit.getPath().getName();

            String record = value.toString();
            if (fileName.startsWith("b")) {
                // table 2
                // 1 Beijing
                Pattern pattern = Pattern
                        .compile("((\\d+)\\s([\\w\\W\\s\\S]+))");
                Matcher matcher = pattern.matcher(record);
                String ckey = null;
                String cvalue = null;
                while (matcher.find()) {
                    ckey = matcher.group(2);
                    cvalue = matcher.group(3);
                }
                context.write(new Text(ckey), new Text(cvalue + ":2"));
            } else {
                // table 1
                // 1 Beijing
                Pattern pattern = Pattern
                        .compile("(([\\w\\W\\s\\S]+)\\s(\\d+))");
                Matcher matcher = pattern.matcher(record);
                String ckey = null;
                String cvalue = null;
                while (matcher.find()) {
                    cvalue = matcher.group(2);
                    ckey = matcher.group(3);
                }
                context.write(new Text(ckey), new Text(cvalue + ":1"));
            }
        }
    }

    public static class MultiJoinReducer extends
            Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            List<String> lefts = new ArrayList<>();
            List<String> rights = new ArrayList<>();
            for (Text value : values) {
                String terms[] = value.toString().split(":");
                if ("1".equals(terms[1])) {
                    lefts.add(terms[0]);
                } else {
                    rights.add(terms[0]);
                }
            }
            for (String left : lefts) {
                for (String right : rights) {
                    context.write(new Text(left), new Text(right));
                }
            }
        }

    }

    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfiguration();
            Job job = Job.getInstance(cfg);
            job.setJobName("MultiJoin");
            job.setJarByClass(MultiJoinMapReduce.class);

            // mapper
            job.setMapperClass(MultiJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // reducer
            job.setReducerClass(MultiJoinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            FileInputFormat.addInputPath(job, new Path("/multi"));
            FileOutputFormat.setOutputPath(job, new Path("/multi_out/"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (IllegalStateException | IllegalArgumentException
                | ClassNotFoundException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

}
时间: 2024-12-25 07:21:44

Hadoop 单表多表关联的相关文章

采购单与调拨单或销售订单关联,增加表FRET记录

采购单与调拨单或销售订单关联,增加表FRET记录,在对采购单做收货时, 系统自动对调拔单或销售订单,创建拣配单(交货单),具体做法: 1.往FRET增加记录: 2.修改采购单抬头和行项目相应关联标识. 代码: (1)在SE11创建结构: ZST_SAVE_FRET BLNRB CHAR 10 0 凭证号,采购 BPOSB NUMC 6 0 凭证项目,采购 BLNRA CHAR 10 0 发货凭证号 BPOSA NUMC 6 0 凭证项目,发货 (2)在SE37创建函数如下: FUNCTION Z

【JEECG技术博文】JEECG表单配置-树形表单

表单配置支持树型表单了,具体效果如下图: 配置说明 1.是否树:选择是. 2.树形表单父Id:表的自关联外键. 3.树形表单列表:显示树形图标的列,如上图中为[组织机构名称]. 4.默认值:最外层数据的父Id值,具体看表的设计.上图中在数据库表中的默认值为null.

在InfoPath表单内提交表单并启动工作流

在InfoPath表单内提交表单并启动工作流 MOSS中对工作流的强大支持,让我们可以做很多应用. 举个例子,我们可以用表单库做审批表单,然后给这个表单库附加一个流程. 我们可能希望用户在填写表单的时候,在表单中填一些在启动工作流时需要用的信息,比如设置审批人等等. 我们的表单会像下面这样,包括待审批内容,和设置审批者等其他工作流设置. 我们也希望用户在点“保存”的时候,提交表单并同时根据用户设置的审批人来启动工作流. ( MOSS默认的模式是,提交完表单以后,在另一个页面启动工作流,需要做两步

数据库---3 单表 多表的查询

单表查询 前期表准备 create table emp( id int not null unique auto_increment, name varchar(20) not null, sex enum('male','female') not null default 'male', #大部分是男的 age int(3) unsigned not null default 28, hire_date date not null, post varchar(50), post_comment

JavaScript遍历HTML表单元素及表单定义

如下JavaScript代码,通过document对象,遍历HTML所有元素(HTML DOM Element ). <html> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <head> <script> //显示所有存在"ID"属性的HTML元素 function displayallelem(){ va

两表(多表)关联update的写法 .

原文:两表(多表)关联update的写法 . 关于两表关联的update,可以把SQL写成了在SQL Server下面的特有形式,但是这种语法在Oracle下面是行不通的 update customers a    set    city_name=(select b.city_name from tmp_cust_city b where b.customer_id=a.customer_id)   where  exists (select 1                   from 

asp.net 微信企业号办公系统-表单设计-新建表单(属性设置)

点击表单设计工具栏上的 新建表单 按钮会弹出新表单属性设置框: 表单名称:新表单表名称. 数据连接:表单对应的数据库连接(此连接在 系统管理-->数据库连接 中维护). 数据表:表单对应的数据库表. 主键:数据库表的主键(主键只能是自增的int型,或uniqueidentifier(guid)类型). 标题字段:业务表中的哪个字段数据来作为待办任务的标题. 程序库分类:表单的分类,此分类在 数据字典 中维护. 任务标题:是否自动生成标题,如果是自动生成则会以 流程名称(发起者姓名) 的形式自动生

CodeFirst 表之间的关联

多重性关系可以是Optional(一个属性可拥有一个单个实例或没有) Required(一个属性必须拥有一个单个实例) Many很多的(一个属性可以拥有一个集合或一个单个实例). Has方法包括如下几个: • HasOptional • HasRequired • HasMany 在多数情况还需要在Has方法后面跟随如下With方法之一: • WithOptional • WithRequired • WithMany 一对多 modelBuilder.Entity<Destination>(

【源】从零自学Hadoop(15):Hive表操作

阅读目录 序 创建表 查看表 修改表 删除表 系列索引 本文版权归mephisto和博客园共有,欢迎转载,但须保留此段声明,并给出原文链接,谢谢合作. 文章是哥(mephisto)写的,SourceLink 序 上一篇,我们介绍了Hive和对其进行了安装,下面我们就初步的使用hive进行讲解.   下面我们开始介绍hive的创建表,修改表,删除表等. 创建表 一:Hive Client 在Terminal输入hive命令需要安装Hive Client. 二:进入 切换用户,进入hive su h

SQL表关联赋值、系统表、表数据删除

1. 表与表的关联赋值(用于表与表之间有关联字段,数据互传) 双表关联赋值 UPDATE #B SET #B.D=#A.B from #B inner join #A on #B.C=#A.A 多表关联赋值 update a set a.e=c.n from a left join b on a.e=b.j left join c on b.k=c.m 2. 两种删除方式(TRUNCATE TABLE <--> DELETE FROM) TRUNCATE TABLE #USER 删除极快,无日