Reduce侧联接

案例分析前提,了解其原理,以及术语

术语部分:
 1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)
2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。
3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。

原理部分:

原理:

1、mapper端输入后,将数据封装成TaggedMapOutput类型,此类型封装数据源(tag)和值(value);

2、map阶段输出的结果不在是简单的一条数据,而是一条记录。记录=数据源(tag)+数据值(value).

3、combine接收的是一个组合:不同数据源却有相同组键的值;

4、不同数据源的每一条记录只能在一个combine中出现;

如图:


1.利用datajoin包来实现join:
---------------------
Hadoop的datajoin包中有三个需要我们继承的类:DataJoinMapperBase,DataJoinReducerBase,TaggedMapOutput。正如其名字一样,我们的MapClass将会扩展DataJoinMapperBase,Reduce类会扩展DataJoinReducerBase。这个datajoin包已经实现了map()和reduce()方法,因此我们的子类只需要实现一些新方法来设置一些细节。

  

  在用DataJoinMapperBase和DataJoinReducerBase之前,我们需要弄清楚我们贯穿整个程序使用的新的虚数据类TaggedMapOutput。

  

  根据之前我们在图Advance MapReduce的数据流中所展示的那样,mapper输出一个包(由一个key和一个value(tagged record)组成)。datajoin包将key设置为Text类型,将value设置为TaggedMapOutput类型(TaggedMapOutput是一个将我们的记录使用一个Text类型的tag包装起来的数据类型)。它实现了getTag()和setTag(Text tag)方法。它还定义了一个getData()方法,我们的子类将实现这个方法来处理record记录。我们并没有明确地要求子类实现setData()方法,但我们最好还是实现这个方法以实现程序的对称性(或者在构造函数中实现)。作为Mapper的输出,TaggedMapOutput需要是Writable类型,因此的子类还需要实现readFields()和write()方法。

DataJoinMapperBase:
-------------------

  回忆join数据流图,mapper的主要功能就是打包一个record使其能够和其他拥有相同group key的记录去向一个Reducer。DataJoinMapperBase完成所有的打包工作,这个类定义了三个虚类让我们的子类实现:

  protected abstract Text generateInputTag(String inputFile);

  protected abstract TaggedMapOutput generateTaggedMapOutut(Object value);

  protected abstract Text generateGroupKey(TaggedMapOutput aRecored);

  

  在一个map任务开始之前为所有这个map任务会处理的记录定义一个tag(Text),结果将保存到DataJoinMapperBase的inputTag变量中,我们也可以保存filename至inputFile变量中以待后用。

  在map任务初始化之后,DataJoinMapperBase的map()方法会对每一个记录执行。它调用了两个我们还没有实现的虚方法:generateTaggedMapOutput()以及generateGroupKey(aRecord);(详见代码)

DataJoinReducerBase:
--------------------

DataJoinMapperBase将我们所需要做的工作以一个full outer join的方式简化。我们的Reducer子类只需要实现combine()方法来滤除掉我们不需要的组合来得到我们需要的(inner join, left outer join等)。同时我们也在combiner()中将我们的组合格式化为输出格式。
/hadoop-2.6.0/share/hadoop/tools/lib   程序需要自己手动导入Jar包

MapperClass.java

package com.yc.zzg.test;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperClass extends DataJoinMapperBase{

    @Override
    protected Text generateGroupKey(TaggedMapOutput arg0) {
        String line = ((Text)arg0.getData()).toString();
        String[] tokens = line.split(",");
        String groupKey = tokens[0];
        return new Text(groupKey);
    }

    @Override
    protected Text generateInputTag(String arg0) {

        return new Text(arg0);
    }

    @Override
    protected TaggedMapOutput generateTaggedMapOutput(Object arg0) {
        TaggedWritable tw = new TaggedWritable((Text)arg0);
        tw.setTag(this.inputTag);
        return tw;
    }

}

TaggedWritable.java

package com.yc.zzg.test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class TaggedWritable extends TaggedMapOutput {

    private Writable data;

    public TaggedWritable() {
    }  

    public TaggedWritable(Writable data) {
         this.tag = new Text("");
         this.data = data;
     } 

    @Override
    public Writable getData() {
        return data;
    }

    public void setData(Writable data) {
        this.data = data;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
          this.tag.readFields(in);
          //加入此部分代码,否则,可能报空指针异常
          String temp=in.readUTF();
          if (this.data == null|| !this.data.getClass().getName().equals(temp)) {
              try {
                  this.data = (Writable) ReflectionUtils.newInstance(
                          Class.forName(temp), null);
              } catch (ClassNotFoundException e) {
                  e.printStackTrace();
              }
          }
          this.data.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        this.tag.write(out);
        //此行代码很重要
        out.writeUTF(this.data.getClass().getName());  

        this.data.write(out);
    }

}

Reduce.java

package com.yc.zzg.test;

import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class Reduce extends DataJoinReducerBase {

    @Override
    protected TaggedMapOutput combine(Object[] tags, Object[] values) {
         if(tags.length<2)return null;
          StringBuffer joinData = new StringBuffer();
          int count=0;

            for(Object value: values){
                joinData.append(",");
                TaggedWritable tw = (TaggedWritable)value;
                String recordLine = ((Text)tw.getData()).toString();
                String[] tokens = recordLine.split(",",2);
                if(count==0) joinData.append(tokens[0]);
                joinData.append(tokens[1]);
            }

            TaggedWritable rtv = new TaggedWritable(new Text(new String(joinData)));
            rtv.setTag((Text)tags[0]);
            return rtv;
    }
}

Drive.java

package com.yc.zzg.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Drive {

    public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
            JobConf job = new JobConf(conf, Drive.class);  

            Path in = new Path("hdfs://localhost:9000/input/inputtest/*");
            Path out = new Path("hdfs://localhost:9000/output/test20");
            FileInputFormat.setInputPaths(job, in);
            FileOutputFormat.setOutputPath(job, out);
            job.setJobName("DataJoin");
            job.setMapperClass(MapperClass.class);
            job.setReducerClass(Reduce.class);  

            job.setInputFormat(TextInputFormat.class);
            job.setOutputFormat(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(TaggedWritable.class);
            job.set("mapred.textoutputformat.separator", ",");
            JobClient.runJob(job);
    }
}

Customers.txt

1,Stephanie Leung,555-555-5555      
2,Edward Kim,123-456-7890        
3,Jose Madriz,281-330-8004        
4,David Stork,408-555-0000          

Orders.txt

 3,A,12.95,02-Jun-2008
 1,B,88.25,20-May-2008
 2,C,32.00,30-Nov-2007
 3,D,25.02,22-Jan-2009

所碰到问题有几点,提出来和大家分析一下

1。第一个问题是DataJoinMapperBase包的问题,前面已经解决了

2。第二个问题是原来的程序会报一个

java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.yc.zzg.test.TaggedWritable.<init>()

Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.yc.zzg.test.TaggedWritable.<init>()

所以你需要给一个构造方法

 public TaggedWritable() {
    }  

3。第三个问题是我有两个多个文件怎么导入,你将多个文件放入同一个文件夹里然后用

   Path in = new Path("hdfs://localhost:9000/input/inputtest/*");

就可以导入多个文件啦,同理也可以拼file*.txt之类的

4。有的时候我为了测试一个工程,从test1测试到了test20,为了方便我们输出的时候总是要创建一个新的目录,解决方案如下

1。hadoop需要把集群上的core-site.xml和hdfs-site.xml放到当前工程下。eclipse工作目录的bin文件夹下面

2。    FileSystem fs=FileSystem.get(conf);
            if(fs.exists(out)){
                fs.delete(out, true);
                System.out.println("输出路径存在,已删除!");
            }  

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-14 10:13:24

Reduce侧联接的相关文章

Hadoop之Reduce侧的联结

理解其就像关系型数据库中的链接查询一样,数据很多的时候,几个数据文件的数据能够彼此有联系,可以使用Reduce联结.举个很简单的例子来说,一个只存放了顾客信息Customer.txt文件,和一个顾客相关联的Order.txt文件,要进行两个文件的信息组合,原理图如下: 这里涉及的几个专业术语:Group key ,datasourde,Tag.前者的话通俗点来说的话就相当于关系型数据库中的主键和外键,通过其id进行的联结依据.datasource,顾名思义,就是数据的来源,那么这里指的就是Cus

DataJoin类 实现不同格式数据reduce侧连接

实验名称:Datajoin数据连接 实验目的: 1.记录我的Hadoop 实验过程,我是NCU HANG TIAN BAN 的学生.将会附上完整可运行的代码.程序中框架是一套模板百度的.书上也有但是重要算法是我自己写的将会标注. http://blog.csdn.net/wawmg/article/details/8759076 这是我参考的框架模板. 2.提示大致浏览可看加粗部分[1.2.3.4] 实验要求: 任务1.多个数据源的内连接 [数据样例] 输入: factory: factoryn

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

(转)MapReduce 中的两表 join 几种方案简介

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势

原创文章,转载请务必将下面这段话置于文章开头处.本文转发自技术世界,原文链接 http://www.jasongj.com/spark/skew/ 摘要 本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等. 为何要处理数据倾斜(Data Skew) 什么是数据倾斜 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是

spark数据倾斜

数据倾斜的主要问题在于,某个分区数量很巨大,在做map运算的时候,将会发生别的分区task很快计算完成,但是某几个分区task的计算成为了系统的瓶颈,明显超过其他分区时间: 1.方案:Kafka的随机主题 如果kafka的topic和分区关联,而且kafka是专用的,那么其实kafka如果能够和随机主机,那么数据将会随机打入到各个分区中,这样可以解决数据热点问题: 2. 方案:将不可切割的文件转换为可切割文件 对于gzip这类文件最好转化为可切割文件:因为对于不可切割的压缩文件,将会作为一个单独

高阶MapReduce_3_reducer侧联结原理

侧联结原理: Map端工作:为来自不同表,也就是多个数据集的key/value对贴上一个标签,来区别不同数据源的记录.然后用链接字段作为kye,其余部分和新加的标志作为value,最后输出一个记录包.也就是说.,map端的工作就是做来源判断,并对符合key的值进行区分. Map端完成之后就是就将数据分组了. Reduce端工作:在reduce端以链接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经贴上标签的),也就是不同的数据集分开,最后进行迪

SQL Server-外部联接基础回顾(十三)

前言 本节我们继续讲讲联接类型中的外部联接,本节之后我们将讲述有关联接性能以及更深入的知识,简短内容,深入的理解,Always to review the basics. 外部联接 外部联接又分为左外部联接和右外部联接,使用关键字分别是LEFT OUTER JOIN.RIGHT OUTER JOIN.FULL OUTER JOIN,在这里OUTER关键字时可选的.LEFT关键字表示保留左侧的行,RIGHT关键字表示保留右侧的行,FULL关键字表示左侧和右侧的行都保留.外部联接的第三个逻辑查询处理

Map Reduce和流处理

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由@从流域到海域翻译,发表于腾讯云+社区 map()和reduce()是在集群式设备上用来做大规模数据处理的方法,用户定义一个特定的映射,函数将使用该映射对一系列键值对进行处理,直接产生出一系列键值对. Map Reduce和流处理 Hadoop的Map / Reduce模型在并行处理大量数据方面非常出色.它提供了一个通用的分区机制(基于数据的关键)来分配不同机器上的聚合式工作负载.基本上, map / reduce的算法设计都是关