原始数据
/*
* 原始数据
* 子 父
* Tom Lucy
Tom Jack
Jone Locy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
TerryAlice
TerryJesse
PhilipAlma
Mark Terry
Mark Alma
*/
要求通过子父关系找出子-祖母关系
/*
* 设计方法:连接的左表的parent列(key),右表的child列(key),且左右表属于同一张表
* 所以在map阶段将读入数据分割成child,parent后,会将parent设置成key,child设置成value输出,并作为左表
* 再将同一对child和parent中的child作为key,parent作为value进行输出,作为右表
* 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的string最开始出加上字符1表示左表,加上2表示右表。
* 然后在shuffle过程中完成连接,reduce接收到连接的结果,其中每个key的value-list就包含了“grandchild-grandparent”关系。
* 取出每个key的value-list进行解析,将左表中的child放入一个数组(就一个key),右表中的grandparent放入一个数组,然后对两个数组求笛卡儿积就ko了
*
*/
1.Map类
package test.mr.selfrelated; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * 表的自连结(grandchild-grandparend表) */ /* * 原始数据 * 子 父 * Tom Lucy Tom Jack Jone Locy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse TerryAlice TerryJesse PhilipAlma Mark Terry Mark Alma */ /* * 设计方法:连接的左表的parent列(key),右表的child列(key),且左右表属于同一张表 * 所以在map阶段将读入数据分割成child,parent后,会将parent设置成key,child设置成value输出,并作为左表 * 再将同一对child和parent中的child作为key,parent作为value进行输出,作为右表 * 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的string最开始出加上字符1表示左表,加上2表示右表。 * 然后在shuffle过程中完成连接,reduce接收到连接的结果,其中每个key的value-list就包含了“grandchild-grandparent”关系。 * 取出每个key的value-list进行解析,将左表中的child放入一个数组(就一个key),右表中的grandparent放入一个数组,然后对两个数组求笛卡儿积就ko了 * */ public class selfRelatedMap extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.trim().length() > 0) { String str[] = line.split("\t"); if (str.length == 2) { context.write(new Text(str[1]), new Text("1_" + str[0])); // 左表 context.write(new Text(str[0]), new Text("2_" + str[1])); // 右表 } } } }
2.Reduce类
package test.mr.selfrelated; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class selfRelatedRedu extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { List<String> grandsons = new ArrayList<String>(); List<String> grandparents = new ArrayList<String>(); for (Text t : values) { // 进行value字符串切分 String str[] = t.toString().split("_"); if ("1".equals(str[0])) { // 左表 //作为孙 grandsons.add(str[1]); } else if ("2".equals(str[0])) { // 右表 //作为祖母辈 grandparents.add(str[1]); } } // 做笛卡尔积 for (String gc : grandsons) { for (String gp : grandparents) { context.write(new Text(gc), new Text(gp)); } } } }
3.job类
package test.mr.selfrelated; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class selfRelatedMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(selfRelatedMain.class); job.setMapperClass(selfRelatedMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(selfRelatedRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
时间: 2024-10-20 13:27:03