Sample data: student.txt
1,yaoshuya,25
2,yaoxiaohua,29
3,yaoyuanyie,15
4,yaoshupei,26
Sample data:score.txt
1,yuwen,100
1,shuxue,99
2,yuwen,99
2,shuxue,88
3,yuwen,99
3,shuxue,56
4,yuwen,33
4,shuxue,99
输出文件内容:
1 [yaoshuya,25,yuwen,100]
1 [yaoshuya,25,shuxue,99]
2 [yaoxiaohua,29,yuwen,99]
2 [yaoxiaohua,29,shuxue,88]
3 [yaoyuanyie,15,yuwen,99]
3 [yaoyuanyie,15,shuxue,56]
4 [yaoshupei,26,yuwen,33]
4 [yaoshupei,26,shuxue,99]
参数:
args= "-Dio.sort.mb=10
-r 1
-inFormat org.apache.hadoop.mapred.KeyValueTextInputFormat
-outFormat org.apache.hadoop.mapred.TextOutputFormat
-outKey org.apache.hadoop.io.Text
-outValue org.apache.hadoop.mapred.join.TupleWritable
hdfs://namenode:9000/user/hadoop/student/student.txt
hdfs://namenode:9000/user/hadoop/student/score2.txt
hdfs://namenode:9000/user/hadoop/joinout".split(" ");
需要注意的是我使用的输出格式是TextOutputFormat(完全是为了方便观察输出后的数据)
输出的valuetype是org.apache.hadoop.mapred.join.TupleWritable ,这个 类型非常方便,类似于数组类型,可以接受多值。
在源码中添加的一句代码,是用来配置我的数据源文件的keyvalue分隔符是,(comma).
jobConf.set("key.value.separator.in.input.line", ",");
关键代码简析:
job.setInputFormatClass(CompositeInputFormat.class);
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
CompositeInputFormat.compose(op, inputFormatClass,
plist.toArray(new Path[0])));
使用CompositeInputFormat来进行join操作。此类的说明:
/**
* An InputFormat capable of performing joins over a set of data sources sorted
* and partitioned the same way.
*
* A user may define new join types by setting the property
* <tt>mapreduce.join.define.<ident></tt> to a classname.
* In the expression <tt>mapreduce.join.expr</tt>, the identifier will be
* assumed to be a ComposableRecordReader.
* <tt>mapreduce.join.keycomparator</tt> can be a classname used to compare
* keys in the join.
* @see #setFormat
* @see JoinRecordReader
* @see MultiFilterRecordReader
*/
通过op来指定连接类型:inner,outer,tbl等,有其他需要也可以实现。
具体是怎么连接的呢?根据两个source进入mapper的key进行归并连接。所以要求数据源是根据key值有序的。此连接是在map端实现的。
测试中我使用KeyValueTextInputFormat来处理,其默认格式是key\tValue,所以我使用了上面的代码来进行重置这个格式。但如果你的文件不是key放在第一个位置,你就需要自己写FileInputFormat啦。
但明显需要你要处理的数据源都是使用同样的FileInputFormat去读取。
还有一点,这里支持多文件连接,示例中我只使用了两个示例文件,可以添加更多的文件,路径添加到outputdir之前即可。