1 /*********************************************** 2 这一百万数据只是在个人电脑上模拟,实际数据可能达到一亿。本人没有测试过 3 这一百万数据的文件存储格式如下: 4 4566 5 1321634 6 132132 7 165446 8 即:一行有一个数字 9 10 下面用MapReduce实现 11 12 ***********************************************/ 13 14 import java.io.IOException; 15 import java.net.URI; 16 import java.util.Arrays; 17 18 import org.apache.hadoop.conf.Configuration; 19 import org.apache.hadoop.fs.FileSystem; 20 import org.apache.hadoop.fs.Path; 21 import org.apache.hadoop.io.LongWritable; 22 import org.apache.hadoop.io.NullWritable; 23 import org.apache.hadoop.io.Text; 24 import org.apache.hadoop.mapreduce.Job; 25 import org.apache.hadoop.mapreduce.Mapper; 26 import org.apache.hadoop.mapreduce.Reducer; 27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 28 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 29 30 /** 31 * 作业:求最大的100个值 32 * 33 */ 34 public class Top100App { 35 36 static final String INPUT = "hdfs://192.168.56.100:9000/input"; 37 static final String OUT = "hdfs://192.168.56.100:9000/out"; 38 39 static final Path INPUT_PATH = new Path(INPUT); 40 static final Path OUT_PATH = new Path(OUT); 41 42 static final int topNum = 100; 43 44 public static void main(String[] args) throws Exception{ 45 46 Configuration conf = new Configuration(); 47 FileSystem fileSystem = FileSystem.get(new URI(OUT),conf); 48 if(fileSystem.exists(OUT_PATH)){ 49 fileSystem.delete(OUT_PATH,true); 50 } 51 52 Job job = new Job(conf,Top100App.class.getSimpleName()); 53 FileInputFormat.setInputPaths(job, INPUT_PATH); 54 job.setMapperClass(MyMapper.class); 55 job.setReducerClass(MyReducer.class); 56 job.setOutputKeyClass(LongWritable.class); 57 job.setOutputValueClass(NullWritable.class); 58 FileOutputFormat.setOutputPath(job, OUT_PATH); 59 job.waitForCompletion(true); 60 } 61 62 static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{ 63 @Override 64 protected void map( 65 LongWritable key, 66 Text value, 67 Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) 68 throws IOException, InterruptedException { 69 context.write(new LongWritable(Long.parseLong(value.toString())), NullWritable.get()); 70 } 71 } 72 73 static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{ 74 long[] topArray = new long[topNum]; 75 int count = 0; 76 @Override 77 protected void reduce( 78 LongWritable k2, 79 Iterable<NullWritable> v2s, 80 Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) 81 throws IOException, InterruptedException { 82 long num = Long.parseLong(k2.toString()); 83 if(count < topNum){ 84 topArray[count] = num; 85 count++; 86 }else{ 87 Arrays.sort(topArray); 88 if(num > topArray[0]){ 89 topArray[0] = num; 90 } 91 } 92 } 93 @Override 94 protected void cleanup( 95 Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) 96 throws IOException, InterruptedException { 97 Arrays.sort(topArray); 98 for(int i = topArray.length -1 ; i > -1 ; i--){ 99 context.write(new LongWritable(topArray[i]), NullWritable.get()); 100 } 101 } 102 } 103 104 105 }
时间: 2024-10-17 03:26:51