设计思路
分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。
连接结果中除去连接的两列就是所需要的结果——"grandchild--grandparent"表。要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接;其次就是连接列的设置;最后是结果的整理。
考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来:
要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,并作为左表;再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"grandchild--grandparent"关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了
程序代码
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class STjoin
{
public static int time =
0;
/*
* map将输出分割child和parent,然后正序输出一次作为右表,
* 反序输出一次作为左表,需要注意的是在输出的value中必须
* 加上左右表的区别标识。
*/
public static class Map extends Mapper<Object,
Text, Text, Text> {
// 实现map函数
public void map(Object
key, Text value, Context context)
throws IOException,
InterruptedException {
String
line=value.toString();
String[] strs= line.split("\t");
context.write(new Text(strs[1]),new Text("1+"+strs[0]));//输出左表
context.write(new Text(strs[0]),new Text("2+"+strs[1]));//输出右表
}
}
}
public static class Reduce extends Reducer<Text,
Text, Text, Text> {
// 实现reduce函数
public void reduce(Text
key, Iterable<Text> values, Context context)
throws IOException,
InterruptedException {
// 输出表头
if (0
== time) {
context.write(new Text("grandchild"), new Text("grandparent"));
time++;
}
String[] grandchild = null;
int grandchildnum = 0;
String[] grandparent =
null;
int grandparentnum = 0;
Iterator iter = values.iterator();
while (iter.hasNext()) {
String record = ite.next().toString();
String[] st=record.split("+");
if(st[0]==1){
grandchild[grandchildnum ] =st[1];
grandchildnum ++;
} else if(st[0]==2){
grandparent [grandparentnum
]=st[1];
grandparentnum ++;
}
}
// grandchild和grandparent数组求笛卡尔儿积
if (0
!= grandchildnum && 0 != grandparentnum) {
for (int m
= 0; m < grandchildnum; m++) {
for (int n
= 0; n < grandparentnum; n++) {
// 输出结果
context.write(new Text(grandchild[m]), new Text(grandparent[n]));
}
}
}
}
}
public static void main(String[]
args) throws Exception
{
Configuration conf = new Configuration();
// 这句话很关键
conf.set("mapred.job.tracker", "192.168.1.2:9001");
String[] ioArgs = new String[]
{ "STjoin_in", "STjoin_out" };
String[] otherArgs = new GenericOptionsParser(conf,
ioArgs).getRemainingArgs();
if (otherArgs.length !=
2) {
System.err.println("Usage:
Single Table Join <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Single
Table Join");
job.setJarByClass(STjoin.class);
// 设置Map和Reduce处理类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)
? 0 : 1);
}
}