Mapreduce TopK

思想比较简单,就是每个通过map来获取当前的数据块中的的topk个数据,然后将他们以相同的key值放到reduce中,最后通过reduce来对这n*k个数据排序并获得topk个数据。具体的就是建立一个k个大小的数组,一开始初始化为都是100(假定这里的100是最大的数),然后往里面插数据小的数据即可。

PS:有几个小细节以及当时写代码的时候出错的地方。

1 map和reduce都是在每个键值对来的时候会被调用。当时觉得应该把这k的数组放在哪,以及怎么初始化。如果放在map方法里面,那每次都会被初始化,岂不是白搞了。如果把这数组当作局部变量,那肯定是不行的,因为当作局部变量就无法实现存放k个数据了。只能存放当前的数据。后来查了资料发现,有个setup这个函数,就是用于mapper中的某些数据的初始化,这样就可以把数组作为mapper的属性,然后在setup中进行初始化了。

2 当我全部遍历完这个数据分片的数据后,并且已经获得了当前mapper中的topk了,我如何把数据传到reducer呢,最理想的就是在遍历完后才把数据发送过去,但是以前都是处理一个键值对就发送一个,然后查了下,发现有个cleanup函数,就是用于mapper或者reducer结束后用的,那么就可以通过这个函数来发送键值对了。

3 这是个逻辑上的问题,我这里的topk是选最小的几个,然后当时写的是,先将数组排序,然后从前往后查询,如果发现value<list[i]那么就将该数组中数据替换,但是这个有问题,例如有这样的

45
21
75
94
1
34
56
7
67

按照我一开始的逻辑是,

45,100,100

21,100,100显然这一步就错了,应该是21,45,100所以应该是从后往前的查询,每次查询能替换的最大的数据,而不是从前往后的查询替换最小的数据

具体代码:

Map

public void setup(Context context){

Configuration conf=context.getConfiguration();

int k=Integer.parseInt(conf.get( "k" ));

list =new int[k];

for (int i=0;i<k;i++){

list [i]=100;

}

}

public void cleanup(Context context) throws IOException, InterruptedException{

for (int i=0;i< list. length ;i++){

context.write( new IntWritable(0), new IntWritable( list[i]));

System. out .println("                                 ");

System. out .println("map is " + list[i]);

System. out .println("                                 ");

}

}

public void map(LongWritable ikey, Text ivalue, Context context)

throws IOException, InterruptedException {

Configuration conf=context.getConfiguration();

int k=Integer.parseInt(conf.get( "k" ));

int value=Integer.parseInt(ivalue.toString());

Arrays. sort( list);

System. out .println("                                 ");

System. out .println("n is " + n);

System. out .println("                                 ");

for (int j=k-1;j>=0;j--){

if (value<list [j]){

list [j]=value;

break ;

}

}

}

}

Reducer

public void setup(Context context){

Configuration conf=context.getConfiguration();

int k=Integer.parseInt(conf.get( "k" ));

list =new int[k];

for (int i=0;i<k;i++){

list [i]=100;

}

}

public void cleanup(Context context) throws IOException, InterruptedException{

Arrays. sort( list);

for (int i=0;i< list. length ;i++){

context.write( new IntWritable(i), new IntWritable( list[i]));

}

}

public void reduce(IntWritable _key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {

// process values

Configuration conf=context.getConfiguration();

int k=Integer.parseInt(conf.get( "k" ));

for (IntWritable val : values) {

/*

System.out.println("                                 ");

System.out.println("value is "+val.get());

System.out.println("                                 ");

*/

Arrays. sort( list);

for (int j=k-1;j>=0;j--){

if (val.get()<list [j]){

list [j]=val.get();

break ;

}

}

}

}

时间: 2024-11-07 08:02:46

Mapreduce TopK的相关文章

MapReduce TopK问题实际应用

一:背景 TopK问题应该是海量数据处理中应用最广泛的了,比如在海量日志数据处理中,对数据清洗完成之后统计某日访问网站次数最多的前K个IP.这个问题的实现方式并不难,我们完全可以利用MapReduce的Shuffle过程实现排序,然后在Reduce端进行简单的个数判断输出即可.这里还涉及到二次排序,不懂的同学可以参考我之前的文章. 二:技术实现 #我们先来看看一条Ngnix服务器的日志: [java] view plain copy 181.133.250.74 - - [06/Jan/2015

MapReduce TopK统计加排序

Hadoop技术内幕中指出Top K算法有两步,一是统计词频,二是找出词频最高的前K个词.在网上找了很多MapReduce的Top K案例,这些案例都只有排序功能,所以自己写了个案例. 这个案例分两个步骤,第一个是就是wordCount案例,二就是排序功能. 一,统计词频 1 package TopK; 2 import java.io.IOException; 3 import java.util.StringTokenizer; 4 5 import org.apache.hadoop.co

MapReduce TopK 文件

问题描述:对于每日访问google 的ip做个记录 对应计算出当天前K个访问次数最多的ip地址. 对应此问题 先自定制一个ip格式的数据类型 继承WritableComparable接口. package reverseIndex; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.

MapReduceTopK TreeMap

MapReduce TopK统计加排序中介绍的TopK在mapreduce的实现. 本案例省略的上面案例中的Sort步骤,改用TreeMap来实现获取前K个词 package TopK1; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apa

MapReduce实现TopK的示例

由于开始学习MapReduce编程已经有一段时间了,作为一个从编程中寻找自信和乐趣以及热爱编程的孩子来讲,手开始变得很“痒”了,很想小试一下身手.于是自己编写了TopK的代码.TopK的意思就是从原文件中找出词频排名前K的所有单词.首先分析该问题,从中我们可以得到启发:要想知道词频排名前K的所有单词,那么是不是要对所有的单词进行词频的统计啊?于是我们就联想到了一个比较经典的例子:WordCount的例子.是的,没错.就是它,统计原文件中每个单词的个数就靠它. 但是,我们词频统计出来了,接下来需要

mapreduce求前k个最大值(topk 问题)

需要先统计词频,再进行排序 ----------词频统计--------- package TopK; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apach

Hadoop读书笔记(十四)MapReduce中TopK算法(Top100算法)

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 (系列文章会逐步修整完成,添加数据文件格式预计相关注释) 1.说明: 从给定的文件中的找到最大的100个值,给定的数据文件格式如下: 533 16565 17800 2929 11374 9826 6852 20679 18224 21222 8227 5336 912 29525 3382 2100 10673 12284 31634 27405 1

mahout中map-reduce版的itembased推荐算法思想

最近想写一个map-reduce版的userbased,于是先研究mahout中已实现的itembased算法.itembased看起来简单,但是深入到实现细节还是有点复杂的,用map-reduce实现就更复杂了. itembased的本质: 预测某用户user对某物品item的打分, 看看该用户对其他item的打分,如果其他item跟该item越相似,则权重越高. 最后加权平均. itembased核心步骤: 1 计算item相似度矩阵(利用两个矩阵相乘) 2 user打分矩阵 乘以 item

Pig、Hive、MapReduce 解决分组 Top K 问题(转)

问题: 有如下数据文件 city.txt (id, city, value) cat city.txt 1 wh 5002 bj 6003 wh 1004 sh 4005 wh 2006 bj 1007 sh 2008 bj 3009 sh 900需要按 city 分组聚合,然后从每组数据中取出前两条value最大的记录. 1.这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决: 1 a = load '/data/city.txt'  using PigSto