hadoop1-TopK问题实现

1、对于排名,一般都是很热衷的,那么如何实现在数据量多的情况下,得到所需要的数据呢,选取前几名的实际应用中,也会有许多,形成统一的算法实现,比着参考就可以了。

2、数据文件a.txt:

2

4

6

7

9

6

4

3、输出数据为(例如取前三名,前面为数据,后面为名次,名次可通过输入参数配置):

9        1

7        2

6        3

4、设计思路:

Map端:

1)因为是topK问题,自然想到可以自己设计key,重写比较方法,按自己所想要的比较方法实现即可。

2)map端输出,仅需输出最后的n个最大数即可,所以需要在最后的那个cleanup方法里实现,通过一个成员变量list保存数据。

3)最后输出此list中的数据即可

    4)list为自定义的一个容器,在往里面添加元素的时候,会进行比较。

    5)在这个例子中,没有使用list,而是直接采用map的suffle对key进行的排序。一个map的输出量会很多。下一篇优化里面使用了list

Reduce端:

  1)  类似一个map,只针对key进行处理即可,输出前n的数值(此n个key已是经过suffle排过序的,取前n个即可,其它丢弃)。

5、程序实现:

  

package test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TopK {

	public static class Map extends Mapper<Object, Text, MyKey, NullWritable>{
		//此处可以进行优化,只需输出此map处理的最大的那几个值即可。
		protected void map(Object key, Text value, Context context)
				throws java.io.IOException ,InterruptedException {
			try {
				int num = Integer.parseInt(value.toString());
				context.write(new MyKey(num), NullWritable.get());
			} catch (Exception e) {
				// TODO: handle exception
				return ;
			}
		};
	}
	public static class Reduce extends Reducer<MyKey, NullWritable, Text, NullWritable>{
		private static Text k = new Text();
		//默认值,获取的个数
		private static int count = 5;
		//开始值
		private static int start = 1;

		//初始化时获取配置的参数
		protected void setup(Context context)
				throws IOException ,InterruptedException {
			//取得配置的参数,需要获取几个值
			count = Integer.parseInt(context.getConfiguration().get("top_num"));
		};

		protected void reduce(MyKey key, Iterable<NullWritable> values, Context context)
				throws IOException ,InterruptedException {
			//因为key是排序过来的,所以输出count个值就可以了
			if(start <= count){
				k.set(key.getNum()+"\t"+start);
				context.write(k, NullWritable.get());
				start++;
			}
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 3){
			System.err.println("Usage:TopK");
			System.exit(2);
		}
		//参数3 为要获取的最大个数
		conf.set("top_num", args[2]);
		Job job = new Job(conf, "TopK");
		job.setJarByClass(TopK.class);

		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);

		job.setMapOutputKeyClass(MyKey.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	private static class MyKey implements WritableComparable<MyKey>{

		private int num;

		public int getNum() {
			return num;
		}

		public MyKey() {
		}

		public MyKey(int num) {
			super();
			this.num = num;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			num = in.readInt();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeInt(num);
		}

		@Override
		public int compareTo(MyKey o) {
			// TODO Auto-generated method stub
			//反序输出
			return o.num - this.num;
		}
	}
}

计算结果:

9        1

7        2

6        3

hadoop1-TopK问题实现,布布扣,bubuko.com

时间: 2024-08-11 09:55:59

hadoop1-TopK问题实现的相关文章

topk记录

[email protected]:~/hadoop-1.0.1/bin$ ./hadoop dfs -rmr output Deleted hdfs://localhost:9000/user/lk/output [email protected]:~/hadoop-1.0.1/bin$ ./hadoop jar ~/mytopk.jar top.Top  input output ****hdfs://localhost:9000/user/lk/input 14/05/12 05:14:0

hadoop记录topk

[email protected]:~/hadoop-1.0.1/bin$ ./hadoop jar ~/hadoop-1.0.1/to.jar top.Top input output 14/05/12 03:44:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. ****hdfs://loc

hadoop-1.2.1运行过程中遇到的问题

在hadoop-1.2.1中运行所遇到的问题: 2014-11-14   22:43:42  在服务器上运行hadoop-1.2.1中的datanode,出现了内存占用过大,导致ssh登陆出现如下问题: 1.# There is insufficient memory for the Java Runtime Environment to continue. 2./etc/bashrc: fork: retry: Resource temporarily unavailable 出现上述问题的原

Hadoop1.X 与 Hadoop2.X区别及改进

一:Haddop版本介绍 0.20.x版本最后演化成了现在的1.0.x版本 0.23.x版本最后演化成了现在的2.x版本 hadoop 1.0 指的是1.x(0.20.x),0.21,0.22 hadoop 2.0 指的是2.x,0.23.x CDH3,CDH4分别对应了hadoop1.0 hadoop2.0 二.Hadoop1.X与Hadoop2.X区别 1.HDFS的改进 1.1 Hadoop1.x时代的HDFS架构 在Hadoop1.x中的NameNode只可能有一个,虽然可以通过Seco

大话Hadoop1.0、Hadoop2.0与Yarn平台

2016年12月14日21:37:29 Author:张明阳 博文链接:http://blog.csdn.net/a2011480169/article/details/53647012 近来这几天一直在忙于Hbase的实验,也没有太静下心来沉淀自己,今天打算写一篇关于Hadoop1.0.Hadoop2.0与Yarn的博文,从整体上把握三者之间的联系,博客内容如有问题,欢迎留言指正!OK,进入本文正题-- 在开始接触Hadoop的时候,也许大家对于Hadoop是下面的一个概念:Hadoop由两部

Hadoop1.2.1插件编译

本文介绍了Hadoop 1.2.1插件的编译步骤及编译后插件的配置. 1. 将Haoop 1.2.1源文件hadoop-1.2.1.tar.gz解压到D:\hadoop-1.2.1 2. 将项目导入Eclipse 3. 将build-contrib.xml拷贝到项目下. 4. 修改build.xml 1). 指定build-contrib.xml路径 <import file="./build-contrib.xml"/> 2). 添加Hadoop相关包到classpath

Hadoop1.x目录结构及Eclipse导入Hadoop源码项目

这是解压hadoop后,hadoop-1.2.1目录 各目录结构及说明: Eclipse导入Hadoop源码项目: 注意:如果没有ant的包可以去网上下,不是hadoop里面的. 然后如果通过以上还报错的话,可以右键点击项目,然后如下图(配置**/*):

Hadoop1.2.1 配置文件详解

首先我们先回顾一下Hadoop的一些概念: Apache Hdoop 1.x 组成 NameNode(元数据服务器) Secondary NameNode(辅助元数据服务器) JobTracker(任务调度员) DataNodes(块存储) TaskTrackers(任务执行) HDFS文件系统 NameNoode:属于管理层,用于管理数据存储 SecondaryNameNode:也属于管理层,辅助NameNode进行管理 DataNode:属于应用层,用户进行数据的存储,被NameNode进行

Hadoop-1.x安装与配置

1.在安装Hadoop之前,需要先安装JDK和SSH. Hadoop采用Java语言开发,MapReduce的运行和Hadoop的编译都依赖于JDK.因此必须先安装JDK1.6或更高版本(在实际生产环境下一般采用JDK1.6,因为Hadoop的部分组件不支持JDK1.7及以上版本).Hadoop利用SSH来启动Slave机器上的守护进程,对于在单机上运行的伪分布式,Hadoop采用了与集群相同的处理方式.所以SSH也是必须安装的. JDK1.6的安装配置步骤: (1)从网上下载JDK1.6的安装

搭建hadoop1

1.安装JDK su grid sudo -i  passwd root sudo dpkg -l  sudo dpkg -l | grep wget cd /usr mkdir java tar -zxvf eclipse-SDK-4.2-linux-gtk.tar.gz cp ~/Desktop/jdk-6u24-linux-i586.bin /usr/java ./jdk-6u24-linux-i586.bin ln -s /usr/java/jdk-xxx /usr/jdk vi /et