package com.billstudy.mr.friends; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 找朋友 * 共同好友 原始数据:每个人的好友列表 A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J …… 输出结果:每个人和其他各人所拥有的功能好友 A-B C,E, A-C D,F, A-D E,F, A-E B,C,D, A-F B,C,D,E,O, A-G C,D,E,F, A-H C,D,E,O, A-I O, A-J B,O, A-K C,D, A-L D,E,F, A-M E,F, B-C A, B-D A,E, …… * @author Bill * @since V1.0 2015年6月24日 - 下午4:53:01 */ public class ShareFriends { /** * * 把拥有同一个朋友的放到同一组 * * 将 * A:B,C,D,F,E,O * * 输出: * B A * C A * D A * F A * .... * * @author Bill * @since V1.0 2015年6月24日 - 下午5:15:21 */ static class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { private final Text k = new Text(); private final Text v = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] persons = value.toString().split(":"); if(persons.length != 2){ return; } // 切分字段 String self = persons[0]; String[] friends = persons[1].split(","); v.set(self); for (int i = 0; i < friends.length; i++) { k.set(friends[i]); context.write(k, v); } } } /** * * 把拥有同一个朋友的拼接到一起,输出 * * 将 * A {B,C,D} * * 输出: * A B-C-D * @author Bill * @since V1.0 2015年6月24日 - 下午5:14:26 */ static class Reducer1 extends Reducer<Text, Text, Text, Text>{ // private final Text k = new Text(); private final Text v = new Text(); @Override protected void reduce(Text key, Iterable<Text> friends, Context context) throws IOException, InterruptedException { StringBuilder friendNames = new StringBuilder(); for (Text friend : friends) { friendNames.append(friend.toString() + "-"); } // 去掉最后一个杠杠 v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "" ); context.write(key, v); } } /** * 将拥有同一个朋友的人排序后两两拼接输出,让朋友任意的一对组合都可以分到同一组 * * 将 * A B-C-D-E-F * * 输出: * B-C A * B-D A * B-E A * B-F A * C-D A * .... * @author Bill * @since V1.0 2015年6月24日 - 下午5:12:44 */ static class Mapper2 extends Mapper<LongWritable, Text, Text, Text>{ private final Text k = new Text(); private final Text v = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] persons = value.toString().split("\t"); String self = persons[0]; String[] friends = persons[1].split("-"); // 此处必须要对其朋友排序,否则交叉输出时会导致A-B:D / B-A:F 的问题出现,实际上述两个key为同一组。应该为:A-B:D,F Arrays.sort(friends); v.set(self); // 交叉打印 for (int i = 0; i < friends.length - 1; i++) { for (int j = i + 1; j < friends.length; j++) { k.set(friends[i] + "-" + friends[j]); context.write(k, v); } } } } /** * * 把分到同一组组合的朋友拼接输出 * * 将类似: * A-B B * A-B C * A-B D * * 输出: * A-B B,C,D * @author Bill * @since V1.0 2015年6月24日 - 下午5:11:24 */ static class Reducer2 extends Reducer<Text,Text,Text,Text>{ // private final Text k = new Text(); private final Text v = new Text(); @Override protected void reduce(Text pair, Iterable<Text> friends,Context context) throws IOException, InterruptedException { StringBuilder friendNames = new StringBuilder(); for (Text friend : friends) { friendNames.append(friend.toString() + ","); } // 去掉逗号 v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : ""); context.write(pair, v); } } public static void main(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage:<job1-inpath> <job1-outPath> <job2-outPath>"); System.exit(1); } Configuration conf = new Configuration(); // 创建路径,清除旧数据 Path job1InputPath = new Path(args[0]); Path job1OutputPath = new Path(args[1]); Path job2OutputPath = new Path(args[2]); FileSystem fs = FileSystem.get(conf); if (fs.exists(job1OutputPath)) { fs.delete(job1OutputPath, true); } if (fs.exists(job2OutputPath)) { fs.delete(job2OutputPath, true); } // job1 Job job1 = Job.getInstance(conf); job1.setJarByClass(ShareFriends.class); job1.setMapperClass(Mapper1.class); job1.setReducerClass(Reducer1.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, job1InputPath); FileOutputFormat.setOutputPath(job1, job1OutputPath); // job2 Job job2 = Job.getInstance(conf); job2.setJarByClass(ShareFriends.class); job2.setMapperClass(Mapper2.class); job2.setReducerClass(Reducer2.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, job1OutputPath); FileOutputFormat.setOutputPath(job2, job2OutputPath); // 控制依赖 ControlledJob controlledJob1 = new ControlledJob(conf); ControlledJob controlledJob2 = new ControlledJob(conf); controlledJob1.setJob(job1); controlledJob2.setJob(job2); controlledJob2.addDependingJob(controlledJob1); JobControl jobControl = new JobControl("share-friends"); jobControl.addJob(controlledJob1); jobControl.addJob(controlledJob2); // 创建线程,开始执行任务 Thread shareFriendExecuteThread = new Thread(jobControl); shareFriendExecuteThread.start(); while(!jobControl.allFinished()){ TimeUnit.SECONDS.sleep(1); } jobControl.stop(); // 弹出两个job的输出结果文件夹 Runtime.getRuntime().exec("cmd.exe /c start " + job1OutputPath.toUri().getPath().substring(1)); Runtime.getRuntime().exec("cmd.exe /c start " + job2OutputPath.toUri().getPath().substring(1)); } }
时间: 2024-10-13 08:20:38