MapReduce 实现数据join操作

前段时间有一个业务需求,要在外网商品(TOPB2C)信息中加入 联营自营 识别的字段。但存在的一个问题是,商品信息 和 自营联营标示数据是 两份数据;商品信息较大,是存放在hbase中。他们之前唯一的关联是url。所以考虑用url做key将两者做join,将 联营自营标识 信息加入的商品信息中,最终生成我需要的数据;

一,首先展示一下两份数据的demo example

1.
自营联营标识数据(下面开始就叫做unionseller.txt)

http://cn.abc.www/product43675,1

http://cn.abc.www/product43710,0

(是两列的数据,其中第一列是url也即是join操作要使用的key,后面的0,1是表示自营联营标识,0表示自营,1表示联营)

2.
商品信息数据(下面开始叫做shangpin_hbase.txt)

ROWKEY=http://cn.abc.www/product43675^_

image:[email protected]=http://*/*.jpg^_

image:[email protected]=0^_

image:[email protected]=**^_

parser:[email protected]={

......

......

uri:[email protected]=http://www.baby.com.cn/product-35520^_

^^

(上面是一条商品数据的样例,使用的是spider团队提供的一个jar包从hbase中导出来的,但也是比较标准的模式,可以使用hbase中HBaseDocInputFormat的类进行读取,其中貌似笑脸的那些field
separator和line separator实际上是不可见字符,这里主要是为了展示而用可见字符代替;处于隐私原因,将一些内容用*代替)

由于商品数据是在hbase里面的,所以首先将商品数据从hbase中dump出来,通过rowkey来访问hbase大致有两种方式:

1. 通过单个row key访问:即按照某个row key键值进行get操作;

2. 通过row key的range进行scan;在范围内进行扫描;

一般来说,scan的方式会快一些。由于商品数据集的数量较为庞大(会有2000多万条url的商品及属性),并且商品是以url(host反转的方式,例如http://cn.abc.www/product43675)作为rowkey进行存储的。所以采用scan的方式从hbase中取出上商品数据集。

接下来获取自营联营识别数据,由于自营联营标识在hbase中是没有的,是spider团队在进行商品数据dump的时候计算出来的,所以直接用python写个streaming程序从dump数据中取得url字段和自营联营标识就可以了。较为简单,略过,本文主要来讲解join操作和hbase字段解析的过程。

二,现在我们手头上有了shangpin_hbase.txt以及unionseller.txt下面就可以考虑实现join操作的mapreduce了。

用mapreduce实现join比较常见的有两种方法,map端join和reduce端join

1.
Map端join所针对的场景是,两个要进行join的数据集中,其中一个非常大,另一个非常小,以至于我们可以将小的数据集放到内存中。这样我们可以将小数据集复制多份,每个运行map
task的内存中都存在一份(可以使用hash
table),这样map
task可以只扫描大的数据,对于大数据中的每一条记录,去小数据中找到相应的key的数据,然后连接输出;如果想让小数据集复制到每个map
task中,可以使用mapreduce提供的Distributed
Cache机制。

?2. Reduce端join是一种比较简单和容易想到的方式,适合的环境也更为普遍。基本思路是在mapper中为每一个记录打上标记,并且使用连接键作为map输出键,使键相同的记录能够被分到同一个reducer中。

??Reduce端join比较简单,但缺点是两个数据集都要经过mapreduce的shuffle过程(里面涉及到写磁盘,归并排序,还有网络传输等)。所以reduce端的join操作,效率往往低些。本文采用简单一些的reduce端join的实现。

?1.
首先是mapper编写,由于join操作至少有两份数据集,所以常需要使用mapReduce的MultipleInputs.addInputPath()来添加多个输入文件的路径。也需要两个mapper类来实现对不同数据集的处理,首先来看一下处理unionseller.txt的mapper程序:

?

?由于unionseller.txt的每一行是以逗号分割的两列,所以只要用逗号将两者分隔开就好,得到作为连接键的url项lst[0]和自营联营识别标识项lst[1]。Map的的输出key是TextPair类型,输出value是Put类型。这两个重点说一下:

?(1)TextPair类型

?map的输出即reduce的输入,有相同key的数据会被运送到同一个reduce来处理。为了在reduce端能够区分unionseller.txt和shangpin_hbase.txt类型,在TextPair中增加了一个识别字段,unionseller.txt是 “0”,即new
TextPair(lst[0],”0”);而shangpin_hbase.txt字段是”1”,这样在reduce端就可以有依据来区分数据的类型。

?TextPair是一个自定义的数据类型,其包含两个成员变量,first和second,两个成员变量都是Hadoop自带的Text类型。自己实现自定义的Key是要有些限制的,原因是:

(a)Mapper会根据key进行hash操作来决定记录被分配到哪一个reducer中去;

(b)在MapReduce内部机制中,在Mapper以及Reducer阶段都涉及到将数据写到磁盘的IO操作(spill阶段)和根据key对数据进行排序的操作。所以这就要求Mapper中的key是可以比较的,并且key和value是都是可以序列化的。Hadoop对自带的数据类型(Text,IntWritable等)都实现了序列化方法和内置的比较方法,但是对于自定义类型,就必须自己去实现相应的接口,并且重写方法。TextPair代码如下:

由于key需要进行比较并且需要序列化,所以实现了WritableComparable接口,其中compareTo()是实现了比较的方法,而readFields和write方法则是用于序列化的,告诉hadoop怎样读和写自定义的数据结构。这里解决了对key比较和序列化的问题,那么mapper对数据进行分发(决定到将记录发送到哪个reduce)的操作该怎样实现,这个需要集成Partitioner类并重写getPartition方法,实现自己的分发策略。

??这里面自己定义了hash方法。

?(2)介绍完了TextPair类,说一下Put类型,说Put类型之前,先看下处理shangpin_hbase.txt的mapper实现:

Put类型是Hbase自带的一个类型,由于shangpin_hbase.txt是从Hbase中导出来的,需要Hbase中的HBaseDocInputFormat.class进行解析,解析之后会成为<ImmutableBytesWritable,
Put>的键值对,所以Mapper的key和value分别为ImmutableBytesWritable和Put类型。Put的”row”就是这条数据的rowkey。Put本质上是一个<byte[],
List<KeyValue>结构的map,其中KeyValue也是Hbase所定义的一个数据类型,KeyValue是将Hbase中timestamp,family,qualifier,value扁平化存储的数据结构,要细讲就多了,感兴趣的可以看一下hbase的源代码。本文在后面会一个将Put转化为String的代码。处理shangpin_hbase.txt的map输出的textpair的标识是”1”?。

??

?2. 两个Mapper的工作介绍完了,接下来就要编写Reducer了。想一想,Reducer的任务比较简单,含有同样key的数据都在一起了。只要根据”0”和”1”的,取出自营,联营的识别标识,然后将这个塞到商品信息中就好了。

?但这里面还有一个坑,就是shuffle的group过程,其实MapReduce框架中,在Reduce之前有一个group操作,将数据进行分组,同一个分组的数据会在一次reduce函数中被处理。group默认会使用key的compareTo方法来进行分组操作,按照上面TextPair的compareTo方法,url相同的”0”和”1”数据是分不到一个group里面的。这样从业务逻辑上分析是有问题的,所以我们需要对group的比较方法进行调整,MapReduce框架中也可以自定义group的比较方法:

?

?这里我们设置,只有url相同,数据就会被放到同一个group里面。

下面是Reduce的代码以及 将Hbase中Put类型转化为String的方法:

至此,这次join操作就讲完了。附件中有实例代码。其实这个需求实现起来也可以不使用这么多的自定义函数,只不过文章中的实现更有助于了解MapReduce的原理。

By the way,用hive实现join是更简单的。。。

时间: 2024-12-23 02:41:34

MapReduce 实现数据join操作的相关文章

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204).有空再看看 实际上实现过程是不是和他写的代码一样. 前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天

MapReduce中的Reduce join操作

-------file1[ID NAME]-------- 1 zhangsan2 lisi3 wangwu -------file2[ID VALUE]--------1 452 563 89 -------结果[NAME VALUE]------------zhagnsan 45lisi 56wangwu 89 一般数据库的join操作 a join b  on a.id = b.id 后面的条件在reduce中指的是相同的key,在sql中很容易区分出后面条件的字段到底来自那张表 而在Ma

MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来 map端的join 适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作 在一个TaskTracker中可以运行多个map任务.每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了.使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从

MapReduce三种join实例分析

本文引自吴超博客 实现原理 1.在Reudce端进行连接. 在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下: Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录.然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出. reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,

Linq之Join操作

1 摘要 文章通过一个简单的实例对Linq中的Join操作进行演示,并在文章的最后对Join操作相关知识点进行简单的总结. 2 实例演示 1) 新建数据库MyTestDB,在数据库中新建数据表tb_Class和tb_Student,两表的定义如下图所示.                                        图1  tb_Class的定义                                                                    

数据仓库,数据基础操作

1 /// <summary> 2 /// 此类作用:数据仓库,数据基础操作 3 /// </summary> 4 /// <typeparam name="TEntity">实体</typeparam> 5 /// <remarks> 6 /// zhangqc 2016.08.08 新建 7 /// </remarks> 8 public partial class Repository<TEntity&

MySql 中Join操作的用法

SQL标准中的Join的类型: 首先,设置表employees和department的数据为: 1.inner join - on操作类型 内连接inner join是基于连接谓词将两张表(如A和B)的列组合在一起的,产生新的结果表. 例子: SELECT * FROM employees a inner join department b ON a.department_id = b.department_id 查询结果为: 注意:inner join 可以简写为join,该查询得出的结果为两

Hadoop第7周练习—MapReduce进行数据查询和实现推简单荐系统(转)

1  运行环境说明 1.1 硬软件环境 1.2 机器网络环境 2  书面作业1:计算员工相关 2.1 书面作业1内容 2.2  实现过程 2.2.1   准备测试数据 2.2.2   问题1:求各个部门的总工资 2.2.3   问题2:求各个部门的人数和平均工资 2.2.4   问题3:求每个部门最早进入公司的员工姓名 2.2.5   问题4:求各个城市的员工的总工资 2.2.6   问题5:列出工资比上司高的员工姓名及其工资 2.2.7   问题6:列出工资比公司平均工资要高的员工姓名及其工资