我在学习hadoop, 在看 陆嘉恒编著的hadoop实战,其中有单表连接的程序,我现在整理一下思路。这个问题是课本上的例子。
给出 child-parent 表, 要求输出 grandchild-grandparent 表
样例输入:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesee
Terry Alice
Terry Jesee
Philip Terry
Philip Alma
Mark Terry
Mark Alma
样例输出:
grandChildgrandParent
TomAlice
TomJesee
JoneAlice
JoneJesee
TomMary
TomBen
JoneMary
JoneBen
PhilipAlice
PhilipJesee
MarkAlice
MarkJesee
其实这个案例只要想通了里面的关键,还是很简单的。
解题思路: 进行单表连接
从样例输入文件中,我们可以看到 child--parent(child)--parent ,通过这样连接就会找出 grandchild -- grandparent
如:
child parent
Tom Lucy
Tom Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesee
这样我们可以很容易的找到下面的关系:
grandchild grandparent
Tom Mary
Tom Ben
Tom Alice
Tom Jesee
我们可以这样连接:
表一: 表二:
child parent child parent
Tom Lucy Lucy Mary
Lucy Ben
Tom Jack Jack Alice
Jack Jesee
我们可以将表一和表二进行连接,然后去掉 表一的第二列 和表二的第一列, 剩下的就是 结果了。
这里我们可以看到 ,其实表一和表二是一个表,这就是单表连接
这里可以将将这个表设置为左表和右表
Map 阶段:
将读入的数据 分割为child 和 parent , 为了区分左右表可以在 输出的value 里面加上标记左右表的信息, 左表 将 parent 作为key , 左表标记+child 作为 value 为map输出, 右表 child 作为key ,右表标记+parent 作为value 为输出。
在Map 阶段完成了左右表的划分,在shuffle 阶段完成了左右表连接
Reduce 阶段:
(相同key 的 会汇聚在一起,如 <Lucy ,<leftTag:Tom , rightTag:Mary , rightTag:Ben> >)
像这样在Reduce 阶段收到的结果中,每个key的value-list 中就包含了grandchild (left:Tom)和grandparnet (rightTag : Mary , rgihtTag : Ben)的关系,然后将value解析, 有leftTag标记的存入 grandChild[] 数组中,将有rightTag 标记的 存入 grandParent[] 数组中,然后对 grandChild[] 和grandParent[] 求笛卡尔积 即可
下面是程序代码:
package cn.edu.ytu.botao.singletablejoin; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * * 单表连接 * * child parent * Tom Lucy * Tom Jack * Lucy Mary * Lucy Ben * * 左表 : 反向输出 <key parent, value chlid> * Lucy Tom * Jack Tom * * 右表 正向输出 <key child, value parent> * Lucy Mary * Lucy Ben * * 连接后: * * <Tom, <Mary , Ben> > * * @author botao * */ public class STjoin { private static int time = 0; public static class STJMapper extends Mapper<Object, Text, Text, Text>{ //标记表 private Text leftTag = new Text("1"); //左表 private Text rightTag = new Text("2"); //右表 @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String childName = new String(); String parentName = new String(); //读取内容 String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); //截取的字符串数组 String[] values = new String[2]; int i = 0; while (tokenizer.hasMoreElements()) { values[i++] = (String) tokenizer.nextElement(); } if (values[0].compareTo("child") != 0) { childName = values[0]; parentName = values[1]; //左表输出 反向 value的值为 grandchild context.write(new Text(parentName), new Text(leftTag.toString() + ":" + childName)); //右表输出 正向 context.write(new Text(childName), new Text(rightTag.toString() + ":" + parentName)); } } } public static class STJoinReduce extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //记录grandchild的信息 和存储 int grandChlidNum = 0; String[] grandChild = new String[20]; //记录grandparent 的信息 和存储 int grandParentNum = 0; String[] grandParent = new String[20]; if (time == 0) { context.write(new Text("grandChild"), new Text("grandParent")); time++; } /** * 对于右表 将values的值存入 grandChild[] 中 * 对于左表 将values 的值存入 grandParnet[] 中 */ for (Text text : values) { String value = text.toString(); //System.out.println(key.toString() + "..." + value); String temp[] = value.split(":"); //System.out.println(key.toString() + "....." + temp[0] + "..." + temp[1]); //左表 if (temp[0].compareTo("1") == 0) { grandChild[grandChlidNum++] = temp[1]; } if (temp[0].compareTo("2") == 0) { grandParent[grandParentNum++] = temp[1]; } } //对 grandChild[] 和 grandParent[]进行求笛卡尔积 if (0 != grandChlidNum && 0 != grandParentNum) { //System.out.println(grandChlidNum + "..." + grandParentNum); for (int i = 0; i < grandChlidNum; i++) { for (int j = 0; j < grandParentNum; j++) { context.write(new Text(grandChild[i]), new Text(grandParent[j])); } } } } } @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } // 如果out文件夹存在,现将该文件夹删除 Path path = new Path("out"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path); } Job job = new Job(conf , "STJoin"); job.setJarByClass(STjoin.class); job.setMapperClass(STJMapper.class); job.setReducerClass(STJoinReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }