MapReduce编程系列 — 6:多表关联

1、项目名称:

2、程序代码:

版本一(详细版):

package com.mtjoin;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MTjoin {
    public static int time = 0;
    public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key, Text value, Context context)throws IOException,InterruptedException{
            System.out.println("mapper........................");
            String line = value.toString();
            if(line.contains("factoryname")==true || line.contains("addressID")== true){
                return ;
            }
            int i = 0;
            while(line.charAt(i) >= ‘9‘|| line.charAt(i) <= ‘0‘){
                i++;
            }

            if(line.charAt(0) >= ‘9‘|| line.charAt(0) <= ‘0‘){
                int j = i-1;
                while(line.charAt(j) != ‘ ‘) j--;
                System.out.println("key:"+line.substring(i)+"  value:"+line.substring(0,j));

                String values[] = {line.substring(0, j),line.substring(i)};

                context.write(new Text(values[1]), new Text("1+"+values[0]));
            }
            else {
                int j = i + 1;
                while(line.charAt(j)!=‘ ‘)  j++;
                System.out.println("key:"+line.substring(0, i+1)+"  value:"+line.substring(j));
                String values[] ={line.substring(0,i+1),line.substring(j)};
                context.write(new Text(values[0]), new Text("2+"+values[1]));
            }
        }
    }

        public static class Reduce extends Reducer<Text, Text, Text, Text>{
            public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{
                System.out.println("reducer........................");
                if( time == 0){
                    context.write(new Text("factoryname"), new Text("addressname"));
                    time++;
                }
                int factorynum = 0;
                String factory[] = new String[10];
                int addressnum = 0;
                String address[] = new String[10];

                Iterator ite = values.iterator();
                while(ite.hasNext()){
                    String record = ite.next().toString();
                    char type = record.charAt(0);
                    if(type == ‘1‘){
                        factory[factorynum] = record.substring(2);
                        factorynum++;
                    }
                    else{
                        address[addressnum] = record.substring(2);
                        addressnum++;
                    }
                }
                if(factorynum != 0 && addressnum != 0){
                    for(int m = 0 ; m < factorynum ; m++){
                        for(int n = 0; n < addressnum; n++){
                            context.write(new Text(factory[m]), new Text(address[n]));
                            System.out.println("factoryname:"+factory[m]+"  addressname:"+address[n]);
                        }
                    }
                }
            }
        }
    public static void main(String [] args)throws Exception{
        Configuration conf = new Configuration();
        String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.err.println("Usage:MTjoin<in><out>");
            System.exit(2);
        }
        Job job = new Job(conf,"multiple table join");
        job.setJarByClass(MTjoin.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true)? 0:1);
        }
}

版本二(简化版):

package com.mtjoin;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MTjoin {
    public static int time = 0;
    public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key, Text value, Context context)throws IOException,InterruptedException{
            System.out.println("mapper........................");
            String line = value.toString();
            if(line.contains("factoryname")==true || line.contains("addressID")== true){
                return ;
            }
            int len = line.length();

            if(line.charAt(0) > ‘9‘|| line.charAt(0) < ‘0‘){
                System.out.println("key:"+line.substring(len-1)+"  value:"+line.substring(0,len-2));

                String values[] = {line.substring(0, len-2),line.substring(len-1)};

                context.write(new Text(values[1]), new Text("1+"+values[0]));
            }
            else {
                System.out.println("key:"+line.substring(0, 1)+"  value:"+line.substring(2));
                String values[] ={line.substring(0,1),line.substring(2)};
                context.write(new Text(values[0]), new Text("2+"+values[1]));
            }
        }
    }

        public static class Reduce extends Reducer<Text, Text, Text, Text>{
            public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{
                System.out.println("reducer........................");
                if( time == 0){
                    context.write(new Text("factoryname"), new Text("addressname"));
                    time++;
                }
                int factorynum = 0;
                String factory[] = new String[10];
                int addressnum = 0;
                String address[] = new String[10];

                Iterator ite = values.iterator();
                while(ite.hasNext()){
                    String record = ite.next().toString();
                    char type = record.charAt(0);
                    if(type == ‘1‘){
                        factory[factorynum] = record.substring(2);
                        factorynum++;
                    }
                    else{
                        address[addressnum] = record.substring(2);
                        addressnum++;
                    }
                }
                if(factorynum != 0 && addressnum != 0){
                    for(int m = 0 ; m < factorynum ; m++){
                        for(int n = 0; n < addressnum; n++){
                            context.write(new Text(factory[m]), new Text(address[n]));
                            System.out.println("factoryname:"+factory[m]+"  addressname:"+address[n]);
                        }
                    }
                }
            }
        }

    public static void main(String [] args)throws Exception{
        Configuration conf = new Configuration();
        String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.err.println("Usage:MTjoin<in><out>");
            System.exit(2);
        }
        Job job = new Job(conf,"multiple table join");
        job.setJarByClass(MTjoin.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true)? 0:1);
        }
}

3、测试数据:

address:

addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian

factory:

factoryname addressname
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1

4、运行过程:

14/09/24 09:39:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/24 09:39:55 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/09/24 09:39:55 INFO input.FileInputFormat: Total input paths to process : 2
14/09/24 09:39:55 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/24 09:39:55 INFO mapred.JobClient: Running job: job_local_0001
14/09/24 09:39:55 INFO util.ProcessTree: setsid exited with exit code 0
14/09/24 09:39:55 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/24 09:39:55 INFO mapred.MapTask: io.sort.mb = 100
14/09/24 09:39:55 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/24 09:39:55 INFO mapred.MapTask: record buffer = 262144/327680
mapper........................
mapper........................
key:1  value:Beijing Red Star
mapper........................
key:3  value:Shenzhen Thunder
mapper........................
key:2  value:Guangzhou Honda
mapper........................
key:1  value:Beijing Rising
mapper........................
key:2  value:Guangzhou Development Bank
mapper........................
key:3  value:Tencent
mapper........................
key:1  value:Bank of Beijing
14/09/24 09:39:55 INFO mapred.MapTask: Starting flush of map output
14/09/24 09:39:55 INFO mapred.MapTask: Finished spill 0
14/09/24 09:39:55 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/09/24 09:39:56 INFO mapred.JobClient:  map 0% reduce 0%
14/09/24 09:39:58 INFO mapred.LocalJobRunner:
14/09/24 09:39:58 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done.
14/09/24 09:39:58 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/24 09:39:58 INFO mapred.MapTask: io.sort.mb = 100
14/09/24 09:39:58 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/24 09:39:58 INFO mapred.MapTask: record buffer = 262144/327680
mapper........................
mapper........................
key:1  value:Beijing
mapper........................
key:2  value:Guangzhou
mapper........................
key:3  value:Shenzhen
mapper........................
key:4  value:Xian
14/09/24 09:39:58 INFO mapred.MapTask: Starting flush of map output
14/09/24 09:39:58 INFO mapred.MapTask: Finished spill 0
14/09/24 09:39:58 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/09/24 09:39:59 INFO mapred.JobClient:  map 100% reduce 0%
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
14/09/24 09:40:01 INFO mapred.Task: Task ‘attempt_local_0001_m_000001_0‘ done.
14/09/24 09:40:01 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
14/09/24 09:40:01 INFO mapred.Merger: Merging 2 sorted segments
14/09/24 09:40:01 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 218 bytes
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
reducer........................
factoryname:Beijing Red Star  addressname:Beijing
factoryname:Beijing Rising  addressname:Beijing
factoryname:Bank of Beijing  addressname:Beijing
reducer........................
factoryname:Guangzhou Honda  addressname:Guangzhou
factoryname:Guangzhou Development Bank  addressname:Guangzhou
reducer........................
factoryname:Shenzhen Thunder  addressname:Shenzhen
factoryname:Tencent  addressname:Shenzhen
reducer........................
14/09/24 09:40:01 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
14/09/24 09:40:01 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/09/24 09:40:01 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://localhost:9000/user/hadoop/mtjoin_output02
14/09/24 09:40:04 INFO mapred.LocalJobRunner: reduce > reduce
14/09/24 09:40:04 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done.
14/09/24 09:40:05 INFO mapred.JobClient:  map 100% reduce 100%
14/09/24 09:40:05 INFO mapred.JobClient: Job complete: job_local_0001
14/09/24 09:40:05 INFO mapred.JobClient: Counters: 22
14/09/24 09:40:05 INFO mapred.JobClient:   Map-Reduce Framework
14/09/24 09:40:05 INFO mapred.JobClient:     Spilled Records=22
14/09/24 09:40:05 INFO mapred.JobClient:     Map output materialized bytes=226
14/09/24 09:40:05 INFO mapred.JobClient:     Reduce input records=11
14/09/24 09:40:05 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/09/24 09:40:05 INFO mapred.JobClient:     Map input records=13
14/09/24 09:40:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=238
14/09/24 09:40:05 INFO mapred.JobClient:     Map output bytes=192
14/09/24 09:40:05 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/09/24 09:40:05 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/09/24 09:40:05 INFO mapred.JobClient:     Reduce input groups=4
14/09/24 09:40:05 INFO mapred.JobClient:     Combine output records=0
14/09/24 09:40:05 INFO mapred.JobClient:     Reduce output records=8
14/09/24 09:40:05 INFO mapred.JobClient:     Map output records=11
14/09/24 09:40:05 INFO mapred.JobClient:     Combine input records=0
14/09/24 09:40:05 INFO mapred.JobClient:     CPU time spent (ms)=0
14/09/24 09:40:05 INFO mapred.JobClient:     Total committed heap usage (bytes)=813170688
14/09/24 09:40:05 INFO mapred.JobClient:   File Input Format Counters
14/09/24 09:40:05 INFO mapred.JobClient:     Bytes Read=216
14/09/24 09:40:05 INFO mapred.JobClient:   FileSystemCounters
14/09/24 09:40:05 INFO mapred.JobClient:     HDFS_BYTES_READ=586
14/09/24 09:40:05 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=122093
14/09/24 09:40:05 INFO mapred.JobClient:     FILE_BYTES_READ=1658
14/09/24 09:40:05 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=202
14/09/24 09:40:05 INFO mapred.JobClient:   File Output Format Counters
14/09/24 09:40:05 INFO mapred.JobClient:     Bytes Written=202

5、运行结果:

factoryname    addressname
Beijing Red Star    Beijing
Beijing Rising    Beijing
Bank of Beijing    Beijing
Guangzhou Honda    Guangzhou
Guangzhou Development Bank    Guangzhou
Shenzhen Thunder    Shenzhen
Tencent    Shenzhen

时间: 2024-10-03 00:35:24

MapReduce编程系列 — 6:多表关联的相关文章

MapReduce编程之实现多表关联

多表关联和单表关联类似.它也是通过对原始数据进行一定的处理.从当中挖掘出关心的信息.例如以下 输入的是两个文件,一个代表工厂表,包括工厂名列和地址编号列:还有一个代表地址表,包括地址名列和地址编号列. 要求从输入数据中找出工厂名和地址名的相应关系.输出工厂名-地址名表 样本例如以下: factory: <span style="font-size:14px;">factoryname addressed Beijing Red Star 1 Shenzhen Thunder

MapReduce编程系列 — 5:单表关联

1.项目名称: 2.项目数据: chile    parentTom    LucyTom    JackJone    LucyJone    JackLucy    MaryLucy    BenJack    AliceJack    JesseTerry    AliceTerry    JessePhilip    TerryPhilip    AlimaMark    TerryMark    Alma 3.设计思路: 分析这个实例,显然需要进行单表连接,连接的是左表的parent列

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

MapReduce程序之实现单表关联

设计思路 分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表. 连接结果中除去连接的两列就是所需要的结果--"grandchild--grandparent"表.要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接:其次就是连接列的设置:最后是结果的整理. 考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了.再

MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录

有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的.这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result. 同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除. 先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output. <project xmlns="http://maven.ap

MapReduce 编程 系列十二 用Hadoop Streaming技术集成newLISP脚本

本文环境和之前的Hadoop 1.x不同,是在Hadoop 2.x环境下测试.功能和前面的日志处理程序一样. 第一个newLISP脚本,起到mapper的作用,在stdin中读取文本数据,将did作为key, value为1,然后将结果输出到stdout 第二个newLISP脚本,起到reducer的作用,在stdin中读取<key, values>, key是dic, values是所有的value,简单对value求和后,写到stdout中 最后应该可以在HDFS下看到结果. 用脚本编程的

MapReduce 编程 系列六 MultipleOutputs使用

在前面的例子中,输出文件名是默认的: _logs part-r-00001 part-r-00003 part-r-00005 part-r-00007 part-r-00009 part-r-00011 part-r-00013 _SUCCESS part-r-00000 part-r-00002 part-r-00004 part-r-00006 part-r-00008 part-r-00010 part-r-00012 part-r-00014 part-r-0000N 还有一个_SUC

MapReduce 编程 系列九 Reducer数目

本篇介绍怎样控制reduce的数目.前面观察结果文件,都会发现通常是以part-r-00000 形式出现多个文件,事实上这个reducer的数目有关系.reducer数目多,结果文件数目就多. 在初始化job的时候.是能够设置reducer的数目的.example4在example的基础上做了改动.改动了pom.xml.使得结束一个參数作为reducer的数目.改动了LogJob.java的代码,作为设置reducer数目. xsi:schemaLocation="http://maven.ap

MapReduce 编程 系列四 MapReduce例子程序运行

MapReduce程序编译是可以在普通的Java环境下进行,现在来到真实的环境上运行. 首先,将日志文件放到HDFS目录下 $ hdfs dfs -put *.csv /user/chenshu/share/logs/ 14/09/27 17:03:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where app