MapReduce,DataJoin,链接多数据源

主要介绍用DataJoin类来链接多数据源,先看一下例子,假设二个数据源customs和orders

customer ID       Name      PhomeNumber

1                        赵一        025-5455-566

2                        钱二        025-4587-565

3                        孙三        021-5845-5875

客户的订单号:

Customer ID     order ID     Price    Data

2                          1               93       2008-01-08

3                          2               43       2012-01-21

1                          3               43       2012-05-12

2                          4               32       2012-5-14

问题:现在要生成订单

customer ID    name    PhomeNumber     Price     Data

2                      钱二     025-4587-565        93          2008-01-08

上面是一个例子,下面介绍一下hadoop中DataJoin类具体的做法。

首先,需要为不同数据源下的每个数据定义一个数据标签,这一点不难理解,就是标记数据的出处。

其次,需要为每个待链接的数据记录确定一个链接主键,这一点不难理解。DataJoin类库分别在map阶段和Reduce阶段提供一个处理框架,尽可能帮助程序员完成一些处理的工作,仅仅留下一些必须工作,由程序完成。

Map阶段

DataJoin类库里有一个抽象基类DataJoinMapperBase,该基类实现了map方法,该方法为对每个数据源下的文本的记录生成一个带表见的数据记录对象。但是程序必须指定它是来自于哪个数据源,即Tag,还要指定它的主键是什么即GroupKey。如果指定了Tag和GroupKey,那么map将会生成一下的记录,customer表为例

customers         1                赵一        025-5455-566;       customers         2                钱二        025-4587-565;

Map过程中Tag和GroupKey都是程序员给定,所以要肯定要就有接口供程序员去实现,DataJoinMapperBase实现下面3个接口。

abstract Text gernerateInputTag(String inuptFile), 看方法名就知道是设置Tag。

abstract Text generateGroupKey(TaggedMapOutput lineRecord), 该方法是设置GroupKey,其中,lineRecord是数据源中的一行数据,该方法可以在这一行数据上设置任意的GroupKey为主键。

abstract TaggedMapOutput generateMapOutput(object value), 该抽象方法用于把数据源中的原始数据记录包装成一个带标签的数据源。TaggedMapOutputs是一行记录的数据类型。代码如下:

import org.apache.hadoop.contrib.utils.join.*;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class MapClass extends DataJoinMapperBase{

    @Override
    protected Text generateGroupKey(TaggedMapOutput arg0) {
        String line = ((Text)arg0.getData()).toString();
        String[] tokens = line.split(",");
        String groupKey = tokens[0];
        return new Text(groupKey);
    }

    @Override
    protected Text generateInputTag(String arg0) {

        String dataSource = arg0.split("-")[0];
        return new Text(dataSource);
    }

    @Override
    protected TaggedMapOutput generateTaggedMapOutput(Object arg0) {
        TaggedWritable tw = new TaggedWritable((Text)arg0);
        tw.setTag(this.inputTag);
        return tw;
    }
} 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class TaggedWritable extends TaggedMapOutput{

    private Writable data;
     public TaggedWritable(Writable data) {
         this.tag = new Text("");
         this.data = data;
     } 

    @Override
    public Writable getData() {
        return data;
    }

    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.tag.readFields(arg0);
        this.data.readFields(arg0);
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        this.tag.write(arg0);
        this.data.write(arg0);
    }
}

每个记录的数据源标签可以由generateInputTag()产生,通过setTag()方法设置记录的Tag。

note:1.该记录不是关系数据库,是文本文件,2. TaggedMapOutput在import org.apache.hadoop.contrib.utils.join.*头文件中,有的时候在eclipse下,每个这个头文件,这时   只要找到你的hadoop的目录下contrib/datajoin文件加,把jar文件导入eclipse中即可。

Reduce 阶段

DataJoinReduceBase中已经实现reduce()方法,具有同一GroupKey的数据分到同一Reduce中,通过reduce的方法将对来自不同的数据源和据用相同的GroupKey做一次叉积组合。这个比较难懂,举个例子:


customers         2                钱二        025-4587-565;

orders      2                1               93       2008-01-08;

orders 2           4               32       2012-5-14

按照map()结果的数据,就是下表给出的结果(3个记录),他们都有一个共同的GroupKey,带来自于二个数据源,所以叉积的结果为


customers         2                钱二        025-4587-565

orders      2                1               93       2008-01-08


customers         2                钱二        025-4587-565

orders 2           4               32       2012-5-14

如果Reduce阶段看懂了,基本上这个就搞定了,Reduce是系统做的,不需要用户重载,接下来的工作就是要实现一个combine()函数,它的作用是将每个叉积合并起来,形成订单的格式。

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceClass extends DataJoinReducerBase{

    @Override
    protected TaggedMapOutput combine(Object[] tags, Object[] values) {
      if(tags.length<2)return null;
      StringBuffer joinData = new StringBuffer();
      int count=0;

        for(Object value: values){
            joinData.append(",");
            TaggedWritable tw = (TaggedWritable)value;
            String recordLine = ((Text)tw.getData()).toString();
            String[] tokens = recordLine.split(",",2);
            if(count==0) joinData.append(tokens[0]);
            joinData.append(tokens[1]);
        }

        TaggedWritable rtv = new TaggedWritable(new Text(new String(joinData)));
        rtv.setTag((Text)tags[0]);
        return rtv;
    }

    public static void main(String[] args){

        Configuration conf = new Configuration();
        JobConf job = new JobConf(conf, ReduceClass.class);  

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("DataJoin");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);  

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(TaggedWritable.class);
        job.set("mapred.textoutputformat.separator", ",");
        JobClient.runJob(job); 

    }
}

时间: 2024-08-28 05:46:01

MapReduce,DataJoin,链接多数据源的相关文章

Google MapReduce 中文版

摘要 MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现.用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于key/value pair的数据集合:然后再创建一个Reduce函数用来合并所有的具有相同中间key值的中间value值.现实世界中有很多满足上述处理模型的例子,本论文将详细描述这个模型. MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理.这个系统在运行时只关心:如何分割输入数据,在大量计算

MapReduce:超大机群上的简单数据处理【MapReduce 中文版 中文翻译】

MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作. 以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这样就可以让那些没有并行分布式处理系统经验的程序员利用大量

转】MapReduce: Simplified Data Processing(一)

摘要MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作.以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这样就可以让那些没有并行分布式处理系统经验的程序员利用大

Hadoop之MapReduce程序应用一

摘要:MapReduce程序处理专利数据集. 关键词:MapReduce程序   专利数据集 数据源:专利引用数据集cite75_99.txt.(该数据集可以从网址http://www.nber.org/patents/下载) 问题描述: 读取专利引用数据集并对它进行倒排.对于每一个专利,找到那些引用它的专利并进行合并.top5输出结果如下: 1                                3964859, 4647229 10000                      

weblogic配置JNDI数据源

因项目需要使用多数据源,因此要使用JNDI方式去配置,在此留下操作步骤. # 启动Admin Server Console #登录到管理系统选择数据源节点 #新建数据源 #配置数据源信息 * 上面那个设置数据源名称 * JNDI名称和数据源名称可以相同也可以不相同,建议按照jdbc.XXX这种格式创建(测试时使用myname名字有点问题) * 数据库选择对应类型即可 #选择数据库驱动 这个可以随便选择一个,后面可以修改 #这一步直接下一步 #配置数据源信息 #测数据源链接 #部署数据源,点击完成

MapReduce:超大机群上的简单数据处理

    摘要 MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个 map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间 value.下面将列举许多可以用这个模型来表示的现实世界的工作. 以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这样就可以让那些没有并行分布式处理系统经

HADOOP之MAPREDUCE程序应用二

摘要:MapReduce程序进行单词计数. 关键词:MapReduce程序  单词计数 数据源:人工构造英文文档file1.txt,file2.txt. file1.txt 内容 Hello   Hadoop I   am  studying   the   Hadoop  technology file2.txt内容 Hello  world The  world  is  very  beautiful I   love    the   Hadoop    and    world 问题描

MapReduce中文翻译

MapReduce:超大机群上的简单数据处理   摘要 MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作. 以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这

数据源绑定

ASP.NET数据绑定总结 概念: 数据绑定(data binding):数据源与服务器控件的关联,“数据绑定”是一种把数据绑定到一种用户界面元素(控件)的通用机制. 分类: ASP.NET中涉及到的数据绑定大概可以分为: 使用<%# 表达式 %> 使用DataSource属性 使用数据源控件 使用Eval方法 使用<%# 表达式 %> 绑定数据源 对于这种绑定方式,无论是Html标记,还是Web服务器控件都是实用的. 属性绑定: 例:将HTML文本框文本要绑定到页面的一个字段na