【Hadoop基础教程】7、Hadoop之一对多关联查询

我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD、Beijing Red Star)的关联信息。

开发环境



硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点)

软件环境:Java 1.7.0_45、hadoop-1.2.1

1、 Map过程



首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容并存入< key,value>例如<0,” 1:Beijing”>。Map过程首先按照输入文件的类型不同对输入信息进行不同的处理,例如,对于address类型输入文件将value值(”1:Beijing”)处理成<”1”,”address:Beijing”>,对于company类型输入文件将value值(”Beijing Red Star:1”)处理成<”1”,”company:Beijing Red Star”>,如图所示:

Map端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        Text addressId = new Text();
        Text info = new Text();
        String[] line = value.toString().split(":");// 获取文件的每一行数据,并以":"分割
        String path = ((FileSplit) context.getInputSplit()).getPath().toString();
        if (line.length < 2) {
            return;
        }
        if (path.indexOf("company") >= 0) {//处理company文件的value信息: "Beijing Red Star:1"
            addressId.set(line[1]);//"1"
            info.set("company" + ":" + line[0]);//"company:Beijing Red Star"
            context.write(addressId,info);//<key,value> --<"1","company:Beijing Red Star">
        } else if (path.indexOf("address") >= 0) {//处理adress文件的value信息:"1:Beijing"
            addressId.set(line[0]);//"1"
            info.set("address" + ":" + line[1]);//"address:Beijing"
            context.write(addressId,info);//<key,value> --<"1","address:Beijing">
        }
    }
}

2、 Reduce过程



Reduce过程首先对输入< key,values>即<”1”,[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]>的values值进行遍历获取到单元信息value(例如”company:Beijing Red Star”),然后根据value中的标识符(company和address)将公司名和地址名分别存入到company集合和address集合,最后对company集合和address集合进行笛卡尔积运算得到company与address的关系,并进行输出,如图所示。

Reduce端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class ReduceClass extends Reducer<Text, Text, Text, Text>{

    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)
            throws IOException, InterruptedException {
        List<String> companys = new ArrayList<String>();
        List<String> addresses = new ArrayList<String>();
//["company:Beijing Red Star","company:Beijing JD","address:Beijing"]
        Iterator<Text> it = values.iterator();
        while(it.hasNext()){
            String value = it.next().toString();//"company:Beijing Red Star"
            String[] result = value.split(":");
            if(result.length >= 2){
                if(result[0].equals("company")){
                    companys.add(result[1]);
                }else if(result[0].equals("address")){
                    addresses.add(result[1]);
                }
            }
        }
        // 求笛卡尔积
        if(0 != companys.size()&& 0 != addresses.size()){
            for(int i=0;i<companys.size();i++){
                for(int j=0;j<addresses.size();j++){
                    context.write(new Text(companys.get(i)), new Text(addresses.get(j)));//<key,value>--<"Beijing JD","Beijing">
                }
            }
        }
    }
}

3、 驱动实现



驱动核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Usage: company Join address <companyTableDir> <addressTableDir> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "company Join address");
        //设置Job入口类
        job.setJarByClass(CompanyJoinAddress.class);
        // 设置Map和Reduce处理类
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//companyTableDir
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//addressTableDir
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//out
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

4、部署运行


1)启动Hadoop集群

[[email protected] ~]$ start-dfs.sh
[[email protected] ~]$ start-mapred.sh
[[email protected] ~]$ jps
5283 SecondaryNameNode
5445 JobTracker
5578 Jps
5109 NameNode

2)部署源码

#设置工作环境
[[email protected] ~]$ mkdir -p /usr/hadoop/workspace/MapReduce
#部署源码
将CompanyJoinAddress文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;

… 你可以直接 下载 CompanyJoinAddress

3)编译文件

#切换工作目录
[[email protected] ~]$ cd /usr/hadoop/workspace/MapReduce/CompanyJoinAddress
#编译文件
[[email protected] CompanyJoinAddress]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar -d bin src/com/zonesion/tablejoin/CompanyJoinAddress.java
[[email protected] CompanyJoinAddress]$ ls bin/com/zonesion/tablejoin/* -la
-rw-rw-r-- 1 hadoop hadoop 1909 8月   1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress.class
-rw-rw-r-- 1 hadoop hadoop 2199 8月  1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class
-rw-rw-r-- 1 hadoop hadoop 2242 8月   1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class

4)打包jar文件

[[email protected] CompanyJoinAddress]$ jar -cvf CompanyJoinAddress.jar -C bin/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class(in = 2273) (out= 951)(deflated 58%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class(in = 2242) (out= 1029)(deflated 54%)
adding: com/zonesion/tablejoin/CompanyJoinAddress.class(in = 1909) (out= 983)(deflated 48%)

5)上传输入文件

#创建company输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/company/
#创建address输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/address/
#上传文件到company输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -put input/company* CompanyJoinAddress/input/company/
#上传文件到address输入文件夹
[[email protected] CompanyJoinAddress]$ hadoop fs -put input/address* CompanyJoinAddress/input/address/

6)运行Jar文件

[[email protected] CompanyJoinAddress]$ hadoop jar CompanyJoinAddress.jar com.zonesion.tablejoin.CompanyJoinAddress CompanyJoinAddress/input/company/  CompanyJoinAddress/input/address/ CompanyJoinAddress/output
14/08/01 10:50:05 INFO input.FileInputFormat: Total input paths to process : 4
14/08/01 10:50:05 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/08/01 10:50:05 WARN snappy.LoadSnappy: Snappy native library not loaded
14/08/01 10:50:05 INFO mapred.JobClient: Running job: job_201408010921_0008
14/08/01 10:50:06 INFO mapred.JobClient:  map 0% reduce 0%
14/08/01 10:50:09 INFO mapred.JobClient:  map 50% reduce 0%
14/08/01 10:50:10 INFO mapred.JobClient:  map 100% reduce 0%
14/08/01 10:50:17 INFO mapred.JobClient:  map 100% reduce 100%
14/08/01 10:50:17 INFO mapred.JobClient: Job complete: job_201408010921_0008
14/08/01 10:50:17 INFO mapred.JobClient: Counters: 29
......

7)查看输出结果

[[email protected] CompanyJoinAddress]$ hadoop fs -ls CompanyJoinAddress/output
Found 3 items
-rw-r--r--   1 hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_SUCCESS
drwxr-xr-x   - hadoop supergroup  0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_logs
-rw-r--r-- 1 hadoop supergroup 241 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/part-r-00000
[[email protected] CompanyJoinAddress]$ hadoop fs -cat CompanyJoinAddress/output/part-r-00000
Beijing Red Star        Beijing
Beijing Rising          Beijing
Back of Beijing     Beijing
Beijing JD          Beijing
xiaomi              Beijing
Guangzhou Honda     Guangzhou
Guangzhou Development Bank  Guangzhou
Shenzhen Thunder        Shenzhen
Tencent             Shenzhen
aiplay              hangzhou
huawei              wuhan

您可能喜欢

【Hadoop基础教程】5、Hadoop之单词计数

【Hadoop基础教程】6、Hadoop之单表关联查询

【Hadoop基础教程】7、Hadoop之一对多关联查询

【Hadoop基础教程】8、Hadoop之一对多关联查询

【Hadoop基础教程】9、Hadoop之倒排索引

时间: 2024-10-04 13:40:20

【Hadoop基础教程】7、Hadoop之一对多关联查询的相关文章

《Hadoop基础教程》之初识Hadoop

Hadoop一直是我想学习的技术,正巧最近项目组要做电子商城,我就开始研究Hadoop,虽然最后鉴定Hadoop不适用我们的项目,但是我会继续研究下去,技多不压身. <Hadoop基础教程>是我读的第一本Hadoop书籍,当然在线只能试读第一章,不过对Hadoop历史.核心技术和应用场景有了初步了解. Hadoop历史 雏形开始于2002年的Apache的Nutch,Nutch是一个开源Java 实现的搜索引擎.它提供了我们运行自己的搜索引擎所需的全部工具.包括全文搜索和Web爬虫. 随后在2

《Hadoop基础教程》之初识Hadoop 【转】

Hadoop一直是我想学习的技术,正巧最近项目组要做电子商城,我就开始研究Hadoop,虽然最后鉴定Hadoop不适用我们的项目,但是我会继续研究下去,技多不压身. <Hadoop基础教程>是我读的第一本Hadoop书籍,当然在线只能试读第一章,不过对Hadoop历史.核心技术和应用场景有了初步了解. Hadoop历史 雏形开始于2002年的Apache的Nutch,Nutch是一个开源Java 实现的搜索引擎.它提供了我们运行自己的搜索引擎所需的全部工具.包括全文搜索和Web爬虫. 随后在2

【Hadoop基础教程】8、Hadoop之一对多关联查询

我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD.Beijing Red Star)的关联信息. 开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java 1.7.0_45.hadoop-1.2.1 1. Map过程 首先使用默认的TextInputFormat类对输入文件进行

【Hadoop基础教程】9、Hadoop之倒排索引

开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java 1.7.0_45.hadoop-1.2.1 1.倒排索引 倒排索引是文档检索系统中最常用的数据结构,被广泛用于全文搜索引擎.它主要是用来存储某个单词(或词组)在一个文档或一组文档的存储位置的映射,即提供了一种根据内容来查找文档的方式.由于不是根据文档来确定文档所包含的内容,而是进行了相反的操作(根据关键字来查找文档),因而称为倒排索引(Inverted Index).通常情况

《Hadoop基础教程》之初识Hadoop(转载)

转载自博主:上善若水任方圆http://blessht.iteye.com/blog/2095675 Hadoop一直是我想学习的技术,正巧最近项目组要做电子商城,我就开始研究Hadoop,虽然最后鉴定Hadoop不适用我们的项目,但是我会继续研究下去,技多不压身. <Hadoop基础教程>是我读的第一本Hadoop书籍,当然在线只能试读第一章,不过对Hadoop历史.核心技术和应用场景有了初步了解. Hadoop历史 雏形开始于2002年的Apache的Nutch,Nutch是一个开源Jav

[转载] 《Hadoop基础教程》之初识Hadoop

转载自http://blessht.iteye.com/blog/2095675 Hadoop一直是我想学习的技术,正巧最近项目组要做电子商城,我就开始研究Hadoop,虽然最后鉴定Hadoop不适用我们的项目,但是我会继续研究下去,技多不压身. <Hadoop基础教程>是我读的第一本Hadoop书籍,当然在线只能试读第一章,不过对Hadoop历史.核心技术和应用场景有了初步了解. Hadoop历史 雏形开始于2002年的Apache的Nutch,Nutch是一个开源Java 实现的搜索引擎.

【Hadoop基础教程】1、Hadoop之服务器基础环境搭建(转)

本blog以K-Master服务器基础环境配置为例分别演示用户配置.sudo权限配置.网路配置.关闭防火墙.安装JDK工具等.用户需参照以下步骤完成KVMSlave1~KVMSlave3服务器的基础环境配置. 开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java 1.7.0_45.hadoop-1.2.1 hadoop1.X和hadoop2.X的文件结构已经完全不一样了,网上很少看到hadoop1.X以上的安装示例教程,我选择的

【Hadoop基础教程】1、Hadoop之服务器基础环境搭建

本blog以K-Master服务器基础环境配置为例分别演示用户配置.sudo权限配置.网路配置.关闭防火墙.安装JDK工具等.用户需参照以下步骤完成KVMSlave1~KVMSlave3服务器的基础环境配置. 开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java 1.7.0_45.hadoop-1.2.1 1.安装环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java

【Hadoop基础教程】2、Hadoop之单机模式搭建

单机模式所需要的系统资源是最少的,这种安装模式下,Hadoop的core-site.xml.mapred-site.xml.hdfs-site.xml配置文件均为空.默认情况下,官方hadoop-1.2.1.tar.gz文件默认使用的就是单机安装模式.当配置文件为空时,Hadoop完全运行在本地,不与其他节点交互,也不使用Hadoop文件系统,不加载任何守护进程,该模式主要用于开发调试MapReduce应用程序的逻辑,不与任何守护进程交互进而避免复杂性.以hadoop用户远程登录K-Master