转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287805
通过Hadoop的自定义排序算法可实现从海量数字中获取最大值,不多说,直接上代码
1、Mapper类的实现
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{ long max = Long.MIN_VALUE; protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { final long temp = Long.parseLong(v1.toString()); if(temp>max){ max = temp; } }; protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; }
2、Reducer类的实现
static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{ long max = Long.MIN_VALUE; protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException { final long temp = k2.get(); if(temp>max){ max = temp; } }; protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; }
3、程序入口Main
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final Job job = new Job(conf , TopKApp.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); }
4、完整代码
package com.lyz.hadoop.suanfa; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 通过Hadoop自定义排序算法实现在海量数字信息中获取最大值 * @author liuyazhuang * */ public class TopKApp { //要统计的文件位置 static final String INPUT_PATH = "hdfs://liuyazhuang:9000/input"; //统计结果输出的位置 static final String OUT_PATH = "hdfs://liuyazhuang:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final Job job = new Job(conf , TopKApp.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{ long max = Long.MIN_VALUE; protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { final long temp = Long.parseLong(v1.toString()); if(temp>max){ max = temp; } }; protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; } static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{ long max = Long.MIN_VALUE; protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException { final long temp = k2.get(); if(temp>max){ max = temp; } }; protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; } }
5、自己随意构造任意数量的数字,本人是随机构造了100万个Long类型的数据用于测试
6、控制台输出
7、运行结果
时间: 2024-11-05 23:47:41