开发工具:eclipse
目标:对下面文档phone_numbers进行倒排索引:
13599999999 10086
13899999999 120
13944444444 13800138000
13722222222 13800138000
18800000000 120
13722222222 10086
18944444444 10086
代码:
1 import java.io.IOException; 2 import org.apache.hadoop.conf.Configured; 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.util.Tool; 6 import org.apache.hadoop.util.ToolRunner; 7 import org.apache.hadoop.io.*; 8 import org.apache.hadoop.mapreduce.*; 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 12 13 public class Test_1 extends Configured implements Tool 14 { 15 enum Counter 16 { 17 LINESKIP, // error lines 18 } 19 20 public static class Map extends Mapper<LongWritable, Text, Text, Text> 21 { 22 public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException 23 { 24 String line = value.toString(); // read original data 25 26 try 27 { 28 // process data 29 String[] lineSplit = line.split(" "); 30 String anum = lineSplit[0]; 31 String bnum = lineSplit[1]; 32 33 context.write(new Text(bnum), new Text(anum)); // map output 34 } 35 catch(java.lang.ArrayIndexOutOfBoundsException e) 36 { 37 context.getCounter(Counter.LINESKIP).increment(1); 38 return; 39 } 40 41 } 42 } 43 public static class Reduce extends Reducer<Text, Text, Text, Text> 44 { 45 public void reduce(Text key, Iterable<Text>values, Context context)throws IOException, InterruptedException 46 { 47 String valueString; 48 String out = ""; 49 50 for (Text value : values) 51 { 52 valueString = value.toString(); 53 out += valueString + "|"; 54 } 55 56 context.write(key, new Text(out)); // reduce output 57 } 58 } 59 public int run(String[] args)throws Exception 60 { 61 Configuration conf = getConf(); 62 63 Job job = new Job(conf, "Test_1"); // task name 64 job.setJarByClass(Test_1.class); // specified task 65 66 FileInputFormat.addInputPath(job, new Path(args[0])); // input path 67 FileOutputFormat.setOutputPath(job, new Path(args[1])); // output path 68 69 job.setMapperClass(Map.class); 70 job.setReducerClass(Reduce.class); 71 job.setOutputFormatClass(TextOutputFormat.class); 72 job.setOutputKeyClass(Text.class); 73 job.setOutputValueClass(Text.class); 74 75 job.waitForCompletion(true); 76 77 return job.isSuccessful() ? 0 : 1; 78 } 79 80 public static void main(String[] args)throws Exception 81 { 82 int res = ToolRunner.run(new Configuration(), new Test_1(), args); 83 System.exit(res); 84 } 85 }
运行结果:
时间: 2024-10-08 02:42:51