转载请注明出去:http://blog.csdn.net/xiaojimanman/article/details/41117357
更多hadoop内容请访问:http://blog.csdn.net/xiaojimanman/article/category/2640707
对于求每个学生成绩的平均数和top N问题在数据库中可以通过sql语句就实现出来,这里就不在做介绍。本文主要通过实例介绍hadoop如何求平均数以及输出TOP N。
需求描述:
求文件中每个学生的平均成绩,并将平均成绩最高的N个输出。
数据格式:
文件中的一行数据为一门成绩记录,简化模型结果为“学生唯一标识 成绩”,eg: "zs 90",本次测试数据如下图所示:
需求分析:
平均值:mapreduce程序中的map函数只简单处理记录中的一行数据,输出结果为 key为学生唯一标识,value为学生的单科成绩;ruduce函数中实现对每一个学生的成绩求平均值。(之前博客中有有关于mapreduce程序的输入输出问题,就不再作图分析)
TOP N:在ruduce中,如果将所有的成绩保存到数组中,然后排序输出,这种方式在数据量小的时候还是可行的,但是当数据量非常大的时候,就会造成内存溢出,因此这种方式就不可行。基于数组的思想,所以可以考虑将已经计算的平均值的前N个存储到长度为N的数组中,当计算出下一个平均值,将此平均值插入该数组中,具体算法如下:
private void addTopN(double avg){ if (avg > topN[N -1]) { int i = 0; for (i = 0; i < N && avg < topN[i]; i++); if (i < N) { for (int j = N-1; j > i; j--) { topN[j] = topN[j-1]; } topN[i] = avg; } } }
通过上述方法,topN数组中就保存已计算出来的top N值,reduce函数执行完毕后,数组中就是需求中的top N。
代码实现:
行数据处理类:
/** *@Description: 成绩单一行数据处理 */ package com.mapreduce.topn; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class Line { private String name;//学生唯一标识 private int score;//成绩 private boolean right = true; public Line(String line) { if (line == null || "".equals(line)) { right = false; return; } String []ss = line.split(" "); if (ss.length != 2) { right = false; return; } name = ss[0]; try { score = Integer.parseInt(ss[1]); } catch (Exception e) { score = 0; } } public Text getKey() { return new Text(name); } public IntWritable getValue() { return new IntWritable(score); } public boolean isRight() { return right; } }
mapreduce程序:
/** *@Description: 平均成绩 & Top N */ package com.mapreduce.topn; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AvgTop extends Configured implements Tool{ /** *@Description: map函数,输出的结果为 “学生姓名 成绩” eg "zs 90" *@Author:lulei */ public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Line line = new Line(value.toString()); if (line.isRight()) { context.write(line.getKey(), line.getValue()); } } } /** *@Description: reduce函数,计算avg & top N *@Author:lulei */ public static class Reduce extends Reducer<Text, IntWritable, Text, DoubleWritable> { private static double[] topN; private static int N = 1; @Override protected void setup(Context context) throws IOException, InterruptedException { try { N = Integer.parseInt(context.getConfiguration().get("N")); } catch (Exception e){ N = 1; } topN = new double[N]; } /** * @param avg * @Author:lulei * @Description: 将avg插入到topN中 */ private void addTopN(double avg){ if (avg > topN[N -1]) { int i = 0; for (i = 0; i < N && avg < topN[i]; i++); if (i < N) { for (int j = N-1; j > i; j--) { topN[j] = topN[j-1]; } topN[i] = avg; } } } /** * @Author:lulei * @Description: 输出top N的数据 */ private void print() { for (double n : topN){ System.out.print(n); System.out.print("->"); } System.out.println(); } @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; int sum = 0; for (IntWritable value : values) { count++; sum += value.get(); } //计算平均值 double avg = (sum * 1.0D)/ count; //加入top N addTopN(avg); context.write(key, new DoubleWritable(avg)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //输出topN print(); super.cleanup(context); } } @Override public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); conf.set("N", arg0[2]); @SuppressWarnings("deprecation") Job job = new Job(conf); job.setJobName("avg&topn"); job.setInputFormatClass(TextInputFormat.class); //将输出设置为TextOutputFormat job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); //Mapper Reducer job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //输入 输出路径 FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) { // TODO Auto-generated method stub if (args.length != 3) { System.out.println("hadoop jar **.jar com.mapreduce.topn.AvgTop [input] [output] [N]"); System.exit(-1); } try { int res = ToolRunner.run(new Configuration(), new AvgTop(), args); System.exit(res); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
上传运行:
打包jar上传服务器,执行命令
hadoop jar avgtop.jar com.mapreduce.topn.AvgTop /root/avgtop/ /out/1 3
top N输出结果为:90.0->67.5->56.0->
输出结果如下: