概念:
Hadoop有一个叫DataJoin的包为Data Join提供相应的框架。它的Jar包存在于contrib/datajoin/hadoop-*-datajoin。
为区别于其他的data join技术,我们称其为reduce-side join。(因为我们在reducer上作大多数的工作)
reduce-side join引入了一些术语及概念:
1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)
Customers Orders
1,Stephanie Leung,555-555-5555 3,A,12.95,02-Jun-2008
2,Edward Kim,123-456-7890 1,B,88.25,20-May-2008
3,Jose Madriz,281-330-8004 2,C,32.00,30-Nov-2007
4,David Stork,408-555-0000 3,D,25.02,22-Jan-2009
2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。
3.Group Key:Group Key类似于关系数据库中的链接键(join
key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group
key,所以其较之关系数据库中的join key更一般、平常。
流程:(详见《Hadoop in Action》Chapter 5.2)
Advanced MapReduce:
Joining Data from different sources:
利用datajoin包来实现join:
Hadoop的datajoin包中有三个需要我们继承的
类:DataJoinMapperBase,DataJoinReducerBase,TaggedMapOutput。正如其名字一样,我们的
MapClass将会扩展DataJoinMapperBase,Reduce类会扩展DataJoinReducerBase。这个datajoin包
已经实现了map()和reduce()方法,因此我们的子类只需要实现一些新方法来设置一些细节。
在用DataJoinMapperBase和DataJoinReducerBase之前,我们需要弄清楚我们贯穿整个程序使用的新的虚数据类TaggedMapOutput。
根据之前我们在图Advance
MapReduce的数据流中所展示的那样,mapper输出一个包(由一个key和一个value(tagged
record)组成)。datajoin包将key设置为Text类型,将value设置为TaggedMapOutput类型
(TaggedMapOutput是一个将我们的记录使用一个Text类型的tag包装起来的数据类型)。它实现了getTag()和setTag(Text tag)
方法。它还定义了一个getData()方法,我们的子类将实现这个方法来处理record记录。我们并没有明确地要求子类实现setData()方法,
但我们最好还是实现这个方法以实现程序的对称性(或者在构造函数中实现)。作为Mapper的输出,TaggedMapOutput需要是
Writable类型,因此的子类还需要实现readFields()和write()方法。
DataJoinMapperBase:
回忆join数据流图,mapper的主要功能就是打包一个record使其能够和其他拥有相同group key的记录去向一个Reducer。DataJoinMapperBase完成所有的打包工作,这个类定义了三个虚类让我们的子类实现:
protected abstract Text generateInputTag(String inputFile);
protected abstract TaggedMapOutput generateTaggedMapOutut(Object value);
protected abstract Text generateGroupKey(TaggedMapOutput aRecored);
在一个map任务开始之前为所有这个map任务会处理的记录定义一个tag(Text),结果将保存到DataJoinMapperBase的inputTag变量中,我们也可以保存filename至inputFile变量中以待后用。
在map任务初始化之后,DataJoinMapperBase的map()方法会对每一个记录执行。它调用了两个我们还没有实现的虚方法:generateTaggedMapOutput()以及generateGroupKey(aRecord);(详见代码)
DataJoinReducerBase:
DataJoinMapperBase将我们所需要做的工作以一个full outer join的方式简化。我们的Reducer子类只需要实现combine()方法来滤除掉我们不需要的组合来得到我们需要的(inner join, left outer join等)。同时我们也在combiner()中将我们的组合格式化为输出格式。