Hadoop之Reduce侧的联结

理解其就像关系型数据库中的链接查询一样,数据很多的时候,几个数据文件的数据能够彼此有联系,可以使用Reduce联结。举个很简单的例子来说,一个只存放了顾客信息Customer.txt文件,和一个顾客相关联的Order.txt文件,要进行两个文件的信息组合,原理图如下:

这里涉及的几个专业术语:Group key ,datasourde,Tag.前者的话通俗点来说的话就相当于关系型数据库中的主键和外键,通过其id进行的联结依据。datasource,顾名思义,就是数据的来源,那么这里指的就是Custonmers和Orders,Tag的话也比较好理解,就是里面的字段到底是属于哪个文件的。

操作Reduce的侧联结,要用到hadoop-datajoin-2.6.0.jar包,默认路径:

E:\hadoop-2.6.0\share\hadoop\tools\lib(hadoop的工作目录)。

用到的3个类:

1、DataJoinMapperBase

2、DataJoinReducerBase

3、TaggedMapOutput

比较正式的工作原理:

1、mapper端输入后,将数据封装成TaggedMapOutput类型,此类型封装数据源(tag)和值(value);

2、map阶段输出的结果不在是简单的一条数据,而是一条记录。记录=数据源(tag)+数据值(value).

3、combine接收的是一个组合:不同数据源却有相同组键的值;

4、不同数据源的每一条记录只能在一个combine中出现;

好,了解了这些我们就进行编码阶段:

这里的话将几个类写在一起测试,感觉另有一番感觉:

联结之前的Custmoner.txt文件:

联结之前的Order.txt文件:

测试代码:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoin extends Configuration{
    //DataJoinMapperBase默认没导入,路径E:\hadoop-2.6.0\share\hadoop\tools\lib
    public static class MapClass extends DataJoinMapperBase{
        // 设置组键
        @Override
        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line=((Text)aRecord.getData()).toString();
            String [] tokens=line.split(",");
            String groupkey=tokens[0];
            return new Text(groupkey);
        }
        /*
         * 这个在任务开始时调用,用于产生标签
               此处就直接以文件名作为标签
         */
        @Override
        protected Text generateInputTag(String inputFile) {
            return new Text(inputFile);
        }
        // 返回一个任何带任何我们想要的Text标签的TaggedWritable
        @Override
        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv=new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }

    }

    public static class Reduce extends DataJoinReducerBase{
        // 两个参数数组大小一定相同,并且最多等于数据源个数
        @Override
        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if(tags.length<2){
                return null;// 这一步,实现内联结
            }
            String joinedStr="";
            for(int i=0;i<values.length;i++){
                if(i>0){
                    joinedStr+=",";// 以逗号作为原两个数据源记录链接的分割符
                    TaggedWritable tw=(TaggedWritable)values[i];
                    String line=((Text)tw.getData()).toString();
                    String[] tokens=line.split(",",2);// 将一条记录划分两组,去掉第一组的组键名。
                    joinedStr+=tokens[1];
                }
            }
            TaggedWritable retv=new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text)tags[0]);
            return retv;// 这只retv的组键,作为最终输出键。
        }
    }

    /*TaggedMapOutput是一个抽象数据类型,封装了标签与记录内容
     此处作为DataJoinMapperBase的输出值类型,需要实现Writable接口,所以要实现两个序列化方法
     自定义输入类型*/
    public static class TaggedWritable extends TaggedMapOutput{
        private Writable data;
        //如果不给其一个默认的构造方法,Hadoop的使用反射来创建这个对象,需要一个默认的构造函数(无参数)
        public TaggedWritable(){
        }
        public TaggedWritable(Writable data){
            //TODO 这里可以通过setTag()方法进行设置
            this.tag=new Text("");
            this.data=data;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            //加入以下的代码.避免出现空指针异常,当时一定要在其写的时候加入out.writeUTF(this.data.getClass().getName());
            //不然会出现readFully错误
            String temp=in.readUTF();
            if(this.data==null||!this.data.getClass().getName().equals(temp)){
                try {
                    this.data=(Writable)ReflectionUtils.newInstance(Class.forName(temp), null);
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
            }
            this.data.readFields(in);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            out.writeUTF(this.data.getClass().getName());
            this.data.write(out);
        }

        @Override
        public Writable getData() {
            return data;
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(); //组件配置是由Hadoop的Configuration的一个实例实现
        JobConf job = new JobConf(conf, DataJoin.class);
        Path in=new Path("hdfs://master:9000/user/input/yfl/*.txt");
        Path out=new Path("hdfs://master:9000/user/output/testfeng1");
        FileSystem fs=FileSystem.get(conf);
        //通过其命令来删除输出目录
        if(fs.exists(out)){
            fs.delete(out,true);
        }
        //TODO这里注意别导错包了
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("DataJoin");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TaggedWritable.class);
        job.set("mapred.textoutputformat.separator", ",");
        JobClient.runJob(job);

    }
}

运行的结果:

为了让调试更加的方便,在程序中直接使用delete命令已达到删除输出目录的功能,省去每次都要手动删除的麻烦,这里需要在我们的工程目录下面的bin目录下面添加主机的core-site.xml和hdfs-site.xml文件,然后给对于的目录赋上权限chmod -R 777 xxx,即可。

hadoop很有意思,我希望自己能走的更远!!!坚持,加油!!!

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2025-01-14 01:36:14

Hadoop之Reduce侧的联结的相关文章

一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们. 接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨

基于hadoop (map/reduce)的大规模分布式并行计算生产排程系统的设计

map/reduce是大数据处理的技术,它的思路是把大规模数据分成一个个小数据块,每个数据块由一个map任务来处理,处理的中间结果汇总到reduce,最后生成最终的处理结果,这个处理和汇总的过程是可以反复多次的,也就是map-reduce-map-reduce 基于map/reduce的思路,我们可以设计基于hadoop(map/reduce)的大规模分布式并行计算生产排程系统,怎么样把大数据处理变成大规模分布式并行计算呢,就是我们不切分数据,每个map任务的数据都是相同的,每个map任务对排程

Hadoop的Reduce Join+BloomFilter实现表链接

适用于场景 连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有MAP端过滤掉不需要JOIN的数据,这样传到REDUCE的数据量减少,减少了网络传及磁盘IO. 缺点 Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间.并且,最终的JOIN在REDUCE端还要进行比对,所以对最终结果无影响. 下面我们先来简单了解下什么是布隆过滤器? Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的.它实际上是一

Hadoop Map/Reduce

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集.一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们.框架会对map的输出先进行排序, 然后把结果输入给reduce任务.通常作业的输入和输出都会被存储在文件系统中. 整个框架负责任务的调度和监控,以及重新执行已经失败的任务.通常,Map/R

Reduce侧联接

案例分析前提,了解其原理,以及术语 术语部分: 1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式) 2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中.在这个目的下,我们将使用每个record自身的Data source名称标记每个record. 3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Cust

tf-idf hadoop map reduce

package com.jumei.robot.mapreduce.tfidf; import java.io.IOException; import java.util.Collection; import java.util.Comparator; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import java.util.TreeMap; import org.ap

DataJoin类 实现不同格式数据reduce侧连接

实验名称:Datajoin数据连接 实验目的: 1.记录我的Hadoop 实验过程,我是NCU HANG TIAN BAN 的学生.将会附上完整可运行的代码.程序中框架是一套模板百度的.书上也有但是重要算法是我自己写的将会标注. http://blog.csdn.net/wawmg/article/details/8759076 这是我参考的框架模板. 2.提示大致浏览可看加粗部分[1.2.3.4] 实验要求: 任务1.多个数据源的内连接 [数据样例] 输入: factory: factoryn

Hadoop map reduce 任务数量优化

mapred.tasktracker.map.tasks.maximum 官方解释:The maximum number of map tasks that will be run  simultaneously by a task tracker. 我的理解:一个tasktracker最多可以同时运行的map任务数量 默认值:2 优化值:mapred.tasktracker.map.tasks.maximum = cpu数量 cpu数量 = 服务器CPU总核数 / 每个CPU的核数服务器CPU

Hadoop学习:Map/Reduce初探与小Demo实现

一.    概念知识介绍 Hadoop MapReduce是一个用于处理海量数据的分布式计算框架.这个框架攻克了诸如数据分布式存储.作业调度.容错.机器间通信等复杂问题,能够使没有并行 处理或者分布式计算经验的project师,也能非常轻松地写出结构简单的.应用于成百上千台机器处理大规模数据的并行分布式程序. Hadoop MapReduce基于"分而治之"的思想,将计算任务抽象成map和reduce两个计算过程,能够简单理解为"分散运算-归并结果"的过程.一个 M