Hadoop读书笔记(十四)MapReduce中TopK算法(Top100算法)

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 (系列文章会逐步修整完成,添加数据文件格式预计相关注释)

1.说明:

从给定的文件中的找到最大的100个值,给定的数据文件格式如下:

533
16565
17800
2929
11374
9826
6852
20679
18224
21222
8227
5336
912
29525
3382
2100
10673
12284
31634
27405
18015
...

2.下文代码中使用到TreeMap类,所以先写一个demo

TreeMapDemo.java

package suanfa;

import java.util.Map.Entry;
import java.util.TreeMap;

public class TreeMapDemo {
	public static void main(String[] args) {
		TreeMap<Long, Long> tree = new TreeMap<Long, Long>();
		tree.put(1333333L, 1333333L);
		tree.put(1222222L, 1222222L);
		tree.put(1555555L, 1555555L);
		tree.put(1444444L, 1444444L);
		for (Entry<Long, Long> entry : tree.entrySet()) {
			System.out.println(entry.getKey()+":"+entry.getValue());
		}
		System.out.println(tree.firstEntry().getValue()); //最小值
		System.out.println(tree.lastEntry().getValue()); //最大值
		System.out.println(tree.navigableKeySet());	//从小到大的正序key集合
		System.out.println(tree.descendingKeySet());//从大到小的倒序key集合
	}
}

3.MapReduce代码

TopKAapp.java

package suanfa;

import java.io.IOException;
import java.net.URI;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 *
 * <p>
 * Title: TopKAapp.java Package suanfa
 * </p>
 * <p>
 * Description: 从算1000w个数据中找到最大的100个数
 * <p>
 *
 * @author Tom.Cai
 * @created 2014-12-10 下午10:56:44
 * @version V1.0
 *
 */
public class TopKAapp {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/topk_input";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/topk_out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		final Job job = new Job(conf, TopKAapp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setMapperClass(MyMapper.class);
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(LongWritable.class);
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);
		job.waitForCompletion(true);
	}

	static class MyMapper extends Mapper<LongWritable, Text, NullWritable, LongWritable> {
		public static final int K = 100;
		private TreeMap<Long, Long> tree = new TreeMap<Long, Long>();

		public void map(LongWritable key, Text text, Context context) throws IOException, InterruptedException {
			long temp = Long.parseLong(text.toString());
			tree.put(temp, temp);
			if (tree.size() > K)
				tree.remove(tree.firstKey());
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			for (Long text : tree.values()) {
				context.write(NullWritable.get(), new LongWritable(text));
			}
		}
	}

	static class MyReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> {
		public static final int K = 100;
		private TreeMap<Long, Long> tree = new TreeMap<Long, Long>();

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			for (Long val : tree.descendingKeySet()) {
				context.write(NullWritable.get(), new LongWritable(val));
			}
		}

		@Override
		protected void reduce(NullWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
			for (LongWritable value : values) {
				tree.put(value.get(), value.get());
				if (tree.size() > K)
					tree.remove(tree.firstKey());
			}
		}
	}
}

欢迎大家一起讨论学习!有用的自己收!

记录与分享,让你我共成长!

欢迎查看我的其他博客;

我的个人博客:http://blog.caicongyang.com ;

我的CSDN博客地址: http://blog.csdn.net/caicongyang ;

时间: 2024-12-21 19:01:37

Hadoop读书笔记(十四)MapReduce中TopK算法(Top100算法)的相关文章

Hadoop读书笔记(四)HDFS体系结构

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Java读书笔记十四(java中的抽象类)

前言 在java中,有时候需要对类的行为进行提取,因此也就有了抽象类和接口的概念,这篇博客,小编来剖一下java中的抽象类和接口.  抽象方法和抽象类 抽象方法和抽象类的规则如下: 1.抽象类必须使用abstract修饰符来修饰,抽象方法也必须使用abstract修饰符来修饰,抽线方法不能有方法体 2.抽象类不能被实例化,无法使用new关键字来调用抽象类的构造器创建抽象类的实例.即使抽象类里不可包含抽象方法,这个抽象类也不能被创建实例. 3.抽象类可以包含成员变量.方法(普通方法和抽象方法都可以

[hadoop读书笔记] 第四章 Hadoop I/O操作

P92 压缩 P102 序列化 序列化:将结构化对象转为字节流便于在网上传输或写到磁盘进行永久性存储的过程 用于进程之间的通信或者数据的永久存储 反序列化:将字节流转为结构化对象的逆过程 Hadoop中的序列化:在Hadoop中,系统中多个节点上进程间的通信是通过远程过程传输RPC来实现的. RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化成原始信息. Avro:一个独立于编程语言,并基于 IDL的序列化框架,非常适合用于Hadoop的大规模数据处理

How tomcat works 读书笔记十四 服务器组件和服务组件

之前的项目还是有些问题的,例如 1 只能有一个连接器,只能处理http请求,无法添加另外一个连接器用来处理https. 2 对容器的关闭只能是粗暴的关闭Bootstrap. 服务器组件 org.apache.catalina.Server接口的实例用来表示Catalina的整个servlet引擎. 我们使用Server就是因为,它用一种优雅的方式来启动/关闭整个系统. 下面是启动和停止机制是如何工作的.当服务器启动的时候,它启动它内部的所有组件.然后无限期的等待关闭命令,如果你想要关闭系统,发送

Hadoop读书笔记(六)MapReduce自定义数据类型demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(五)MapReduce统计单词demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(八)MapReduce 打成jar包demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(七)MapReduce 0.x版本API使用demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(十)MapReduce中的从计数器理解combiner归约

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.combiner 问:什么是combiner: 答:Combiner发生在Mapper端,对数据进行归约处理,使传到reducer端的数据变小了,传输时间变端,作业时间变短,Combiner不能夸Mapper执行,(只有reduce可以接受多个Mapper的任务). 并不是所有的算法都适合归约处理,例如求平均数 2.代码实现 WordCount.j