使用hadoop实现平均数~并输出top N

转载请注明出去:http://blog.csdn.net/xiaojimanman/article/details/41117357

更多hadoop内容请访问:http://blog.csdn.net/xiaojimanman/article/category/2640707

对于求每个学生成绩的平均数和top N问题在数据库中可以通过sql语句就实现出来,这里就不在做介绍。本文主要通过实例介绍hadoop如何求平均数以及输出TOP N。

需求描述:

求文件中每个学生的平均成绩,并将平均成绩最高的N个输出。

数据格式:

文件中的一行数据为一门成绩记录,简化模型结果为“学生唯一标识 成绩”,eg: "zs 90",本次测试数据如下图所示:

需求分析:

平均值:mapreduce程序中的map函数只简单处理记录中的一行数据,输出结果为 key为学生唯一标识,value为学生的单科成绩;ruduce函数中实现对每一个学生的成绩求平均值。(之前博客中有有关于mapreduce程序的输入输出问题,就不再作图分析)

TOP N:在ruduce中,如果将所有的成绩保存到数组中,然后排序输出,这种方式在数据量小的时候还是可行的,但是当数据量非常大的时候,就会造成内存溢出,因此这种方式就不可行。基于数组的思想,所以可以考虑将已经计算的平均值的前N个存储到长度为N的数组中,当计算出下一个平均值,将此平均值插入该数组中,具体算法如下:

private void addTopN(double avg){
	if (avg > topN[N -1]) {
		int i = 0;
		for (i = 0; i < N && avg < topN[i]; i++);
		if (i < N) {
			for (int j = N-1; j > i; j--) {
				topN[j] = topN[j-1];
			}
			topN[i] = avg;
		}
	}
}

通过上述方法,topN数组中就保存已计算出来的top N值,reduce函数执行完毕后,数组中就是需求中的top N。

代码实现:

行数据处理类:

 /**
 *@Description: 成绩单一行数据处理
 */
package com.mapreduce.topn;  

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class Line {
	private String name;//学生唯一标识
	private int score;//成绩
	private boolean right = true;

	public Line(String line) {
		if (line == null || "".equals(line)) {
			right = false;
			return;
		}
		String []ss = line.split(" ");
		if (ss.length != 2) {
			right = false;
			return;
		}
		name = ss[0];
		try {
			score = Integer.parseInt(ss[1]);
		} catch (Exception e) {
			score = 0;
		}
	}

	public Text getKey() {
		return new Text(name);
	}

	public IntWritable getValue() {
		return new IntWritable(score);
	}

	public boolean isRight() {
		return right;
	}
}

mapreduce程序:

 /**
 *@Description: 平均成绩 & Top N
 */
package com.mapreduce.topn;  

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgTop extends Configured implements Tool{

	/**
	 *@Description: map函数,输出的结果为 “学生姓名 成绩” eg "zs 90"
	 *@Author:lulei
	 */
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			Line line = new Line(value.toString());
			if (line.isRight()) {
				context.write(line.getKey(), line.getValue());
			}
		}

	}

	/**
	 *@Description: reduce函数,计算avg & top N
	 *@Author:lulei
	 */
	public static class Reduce extends Reducer<Text, IntWritable, Text, DoubleWritable> {
		private static double[] topN;
		private static int N = 1;
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			try {
				N = Integer.parseInt(context.getConfiguration().get("N"));
			} catch (Exception e){
				N = 1;
			}
			topN = new double[N];
		}

		/**
		 * @param avg
		 * @Author:lulei
		 * @Description: 将avg插入到topN中
		 */
		private void addTopN(double avg){
			if (avg > topN[N -1]) {
				int i = 0;
				for (i = 0; i < N && avg < topN[i]; i++);
				if (i < N) {
					for (int j = N-1; j > i; j--) {
						topN[j] = topN[j-1];
					}
					topN[i] = avg;
				}
			}
		}

		/**
		 * @Author:lulei
		 * @Description: 输出top N的数据
		 */
		private void print() {
			for (double n : topN){
				System.out.print(n);
				System.out.print("->");
			}
			System.out.println();
		}

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int count = 0;
			int sum = 0;
			for (IntWritable value : values) {
				count++;
				sum += value.get();
			}
			//计算平均值
			double avg = (sum * 1.0D)/ count;
			//加入top N
			addTopN(avg);
			context.write(key, new DoubleWritable(avg));
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			//输出topN
			print();
			super.cleanup(context);
		}

	}
	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = new Configuration();
		conf.set("N", arg0[2]);
		@SuppressWarnings("deprecation")
		Job job = new Job(conf);
		job.setJobName("avg&topn");
		job.setInputFormatClass(TextInputFormat.class);

		//将输出设置为TextOutputFormat
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);

		//Mapper Reducer
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);

		//输入 输出路径
		FileInputFormat.addInputPath(job, new Path(arg0[0]));
		FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

		job.waitForCompletion(true);

		return job.isSuccessful() ? 0 : 1;
	}

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		if (args.length != 3) {
			System.out.println("hadoop jar **.jar com.mapreduce.topn.AvgTop [input] [output] [N]");
			System.exit(-1);
		}
		try {
			int res = ToolRunner.run(new Configuration(), new AvgTop(), args);
			System.exit(res);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

上传运行:

打包jar上传服务器,执行命令

hadoop jar avgtop.jar com.mapreduce.topn.AvgTop /root/avgtop/ /out/1 3

top N输出结果为:90.0->67.5->56.0->

输出结果如下:

时间: 2024-10-25 15:04:50

使用hadoop实现平均数~并输出top N的相关文章

Hadoop项目没有日志输出

问题描述:在eclipse下运行hadoop项目,没有输出日志,警告如下,意为没有找到log4j文件. log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#

Hadoop:输入,输出,key,value格式

map: (K1, V1) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) (K1, V1): jobConf.setInputKeyClass(K1. class ); jobConf.setInputValueClass(V1. class ); list(K2, V2): job.setMapOutputKeyClass(K2.class); job.setMapOutputValueClass(V2.class); list(K3

hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

代码測试环境:Hadoop2.4 应用场景:当须要定制输出数据格式时能够採用此技巧,包含定制输出数据的展现形式.输出路径.输出文件名称称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  经常使用的父类. 2)TextOutputFormat<K,V> 默认输出字符串输出格式. 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 能够把输出数据

hadoop streaming 多路输出 [转载]

转载 http://www.cnblogs.com/shapherd/archive/2012/12/21/2827860.html hadoop 支持reduce多路输出的功能,一个reduce可以输出到多个part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在输出<key,value>对的时候,在value的后面追加"#X"后缀,比如#A,输出的文件就是part-00000-A,不同的后缀可以把key,value输出到不同的文件中,方便做输出类型分类, #X仅

hadoop编程小技巧(7)---自定义输出文件格式以及输出到不同目录

代码测试环境:Hadoop2.4 应用场景:当需要定制输出数据格式时可以采用此技巧,包括定制输出数据的展现形式,输出路径,输出文件名称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  常用的父类: 2)TextOutputFormat<K,V> 默认输出字符串输出格式: 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 可以把输出数据输送到

Hadoop 实现 TF-IDF 计算

学习Hadoop 实现TF-IDF 算法,使用的是CDH5.13.1 VM版本,Hadoop用的是2.6.0的jar包,Maven中增加如下即可 <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> <scope>provided</

Hadoop快速入门

传说中的Hadoop,我终于来对着你唱"征服"了,好可爱的小象,!J 总的来说,hadoop的思路比较简单(map-reduce),就是将任务分开进行,最后汇总.但这个思路实现起来,比较复杂,但相对于几年前Intel等硬件公司提出的网格运算等方式,显得更加开放. 你难任你难,哥就是头铁! Tip:实践应用是核心,本文概念为主,有些部分可能会有些晦涩,直接跳过就好(不是特别重要). 本文代码实践在:https://github.com/wanliwang/cayman/tree/mast

Hadoop.2.x_MR-Shuffle过程

1.map到reduce中间的一个过程 洗牌,打乱(打乱我们传递的所有元素)(流程:input->map->reduce->output) 2.map()->shuffle->reduce() map()接收数据,以wc为例,其中数据可是为<key,value> 在map()中获取每一行文本内容使用String.split或其他分隔方法分隔文本内容,如<0,hadoop spark hdfs hadoop> 分隔之后:<hadoop,1>&

hadoop 常用配置项【转】

hadoop 常用配置项[转] core-site.xml  name value  Description   fs.default.name hdfs://hadoopmaster:9000 定义HadoopMaster的URI和端口  fs.checkpoint.dir /opt/data/hadoop1/hdfs/namesecondary1 定义hadoop的name备份的路径,官方文档说是读取这个,写入dfs.name.dir  fs.checkpoint.period 1800 定