环境:CentOS6.6 Hadoop1.2.1
样例数据:第一列是 child ,第二列是 parent ,用空格分开,要求输出 grandchild grandparent
[[email protected] ~]$ hadoop fs -cat ./in/genealogy.txt Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma
程序:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SelfJoinMapper extends Mapper<Text, Text, Text, Text> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(value, new Text("1" + key.toString())); //左表的 parent 做 key context.write(key, new Text("2" + value.toString())); //右表的 child 做 key } } import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> childList = new ArrayList<String>(); List<String> grandList = new ArrayList<String>(); for (Text value : values) { if (value.toString().startsWith("1")) { childList.add(value.toString().substring(1)); } else { grandList.add(value.toString().substring(1)); } } for (String child : childList) { for (String grand : grandList) { context.write(new Text(child), new Text(grand)); } } } } import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SelfJoin { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: SelfJoin <input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); //设置分割符 //conf.set("mapred.jar", "./out/SelfJoin.jar"); //conf.set("fs.default.name", "hdfs://hadoop1:9000"); //conf.set("mapred.job.tracker", "hadoop1:9001"); Job job = new Job(conf); job.setInputFormatClass(KeyValueTextInputFormat.class); //设置 InputFormat job.setJarByClass(SelfJoin.class); job.setJobName("SelfJoin"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(SelfJoinMapper.class); job.setReducerClass(SelfJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
查看程序输出如下:
[[email protected] ~]$ hadoop fs -cat ./out/9/part-r-00000 Tom Alice Tom Jesse Jone Alice Jone Jesse Tom Mary Tom Ben Jone Mary Jone Ben Philip Alice Philip Jesse Mark Alice Mark Jesse
时间: 2024-10-09 19:04:26