MapReduce的reduce函数里的key用的是同一个引用

最近写MapReduce程序,出现了这么一个问题,程序代码如下:



 1 package demo;
 2
 3 import java.io.IOException;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6 import java.util.Map.Entry;
 7
 8 import org.apache.hadoop.fs.FSDataOutputStream;
 9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.IntWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Reducer;
14
15 public class ReducerDemo extends Reducer<Text, IntWritable, Text, IntWritable>{
16
17     private FileSystem fs = null;
18     private FSDataOutputStream outs = null;
19     public Map<Text, Integer> wordNumMap = new HashMap<Text, Integer>();
20
21
22
23     @Override
24     protected void setup(Context context)
25             throws IOException, InterruptedException {
26         String logFile = context.getConfiguration().get(HdpDemo.LOG_FILE);
27         fs = FileSystem.get(context.getConfiguration());
28         if(null != logFile){
29             int taskId = context.getTaskAttemptID().getTaskID().getId();
30             logFile += ("_"+taskId);
31             outs = fs.create(new Path(logFile));
32         }
33     }
34
35 /*    public void reduce(Text key, IntWritable value, Context context){
36
37     }*/
38
39     public void reduce(Text key, Iterable<IntWritable> numberIter, Context context)
40             throws IOException, InterruptedException {
41         Text word = key;
42         Integer currNum = wordNumMap.get(word);
43         if(null == currNum){
44             currNum = 0;
45         }
46         for(IntWritable num:numberIter){
47             currNum += num.get();
48         }
49         wordNumMap.put(word, currNum);
50
51     }
52
53     @Override
54     protected void cleanup(Context context)
55             throws IOException, InterruptedException {
56         for(Entry<Text, Integer> entry : wordNumMap.entrySet()){
57             IntWritable num = new IntWritable(entry.getValue());
58             context.write(entry.getKey(), num);
59         }
60         outs.close();
61     }
62
63     private void log(String content) throws IOException{
64         if(null != outs){
65             outs.write(content.getBytes());
66         }
67     }
68
69 }

 

这是个单词统计的reducer类,按理说打印出来的结果应该是如下结果:

world   2
ccc     2
of      1
best    1
the     1
is      1
bbb     2
james   2
ddd     2
hello   2
aaa     1

而实际上的打印结果却为:

world:2
world:2
world:1
world:1
world:1
world:1
world:2
world:2
world:2
world:2
world:1

原因分析如下:

Hadoop的MapReduce框架每次调用reducer的reduce函数,代码中的第39行,每次传入的key都是对同一个地址的引用,导致了插入wordNumMap中的那些key都被修改了。

而如果把第41行的

Text word = key;

改为

Text word = new Text();
word.set(key);

这样结果就正确了,也印证了我的猜测。

时间: 2024-10-20 06:17:27

MapReduce的reduce函数里的key用的是同一个引用的相关文章

使用ES6的reduce函数,根据key去重

最近很着迷于ES6的函数,让代码变得更优雅.ES6里的reduce函数,平时用的不是特别多,真正用起来发现还是挺好用的. 想要实现的效果为: 原数组: let rawArr = [{id:'123'},{id:'456'},{id:'789'},{id:'123'}]; 根据id去重后的结果为 let rawArr = [{id:'123'},{id:'456'},{id:'789'}]; reduce函数介绍 在说如何去重之前,先来介绍一下reduce函数: array.reduce(call

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

Entity Framework 6 Recipes 2nd Edition(11-4)译 -&gt; 在”模型定义”函数里调用另一个”模型定义”函数

11-4.在”模型定义”函数里调用另一个”模型定义”函数 问题 想要用一个”模型定义”函数去实现另一个”模型定义”函数 解决方案 假设我们已有一个公司合伙人关系连同它们的结构模型,如Figure 11-4所示: Figure 11-4. A model representing the associate types in a company together with the reporting association 在我们的虚拟的公司里, , team members被一个team lea

map函数或reduce函数中如何调用第三方jar包

一般我们在mapreduce程序中调用第三方jar包时会出现找不到jar包的问题,检查发现jar包就在相应路径,mapreduce任务就是找不到.仔细想想会发现,这个jar包是放在执行mapreduce主程序机器上的内存中,一般为客户端机器.而我们在map或者reduce函数中调用该jar包时是在集群的机器上的内存中调用,这样怎么可以调用.可以使用以下方法: 1 把jar包提前放在集群每天机器上. 2 和集群调用mysql驱动程序一样,先将jar包放入hdfs,然后通过mysql的distrib

map和reduce函数的使用

map和reduce函数是定义在Array中的两个方法,可用于提高处理数组的性能和简化代码. 关于它们的概念,参考Google这篇大名鼎鼎的论文"MapReduce: Simplified Data Processing on Large Clusters". map map函数是一个高阶函数,它会调用传入的函数来隐式的处理Array中的每一个元素,因此当我们确定好需要对数组中的每个元素进行的操作时,可先定义好处理的方法, 然后在通过map函数将定义好的函数作为参数传进去,这样浏览器就

Python3版本中的filter函数,map函数和reduce函数

一.filter函数: filter()为已知的序列的每个元素调用给定的布尔函数,调用中,返回值为非零的元素将被添加至一个列表中 1 def f1(x): 2 if x>20: 3 return True 4 else: 5 return False 6 7 l1 = [ 1, 2, 3, 42, 67, 16 ] 8 print(filter(f1, l1)) 9 #输出如下: 10 #<filter object at 0x000000000117B898> 11 l2 = filt

python reduce 函数

reduce 函数,是对一个列表里的元素做累计计算的一个函数.接收两个参数(函数,序列)例如 1 def num(x,y) 2 return x+y 3 4 reduce(num,[1,2,3,4,5,6]) 5 6 返回21 就是对一个序列做累计操作

第三百零五节,Django框架,Views(视图函数),也就是逻辑处理函数里的各种方法与属性

Django框架,Views(视图函数),也就是逻辑处理函数里的各种方法与属性 Views(视图函数)逻辑处理,最终是围绕着两个对象实现的 http请求中产生两个核心对象: http请求:HttpRequest对象 http响应:HttpResponse对象 所在位置:django.http 之前我们用到的参数request就是HttpRequest     HttpRequest对象 逻辑处理函数的第一个形式参数,接收到的就是HttpRequest对象,这个对象里封装着用户的各种请求信息,通过

弄明白python reduce 函数

作者:Panda Fang 出处:http://www.cnblogs.com/lonkiss/p/understanding-python-reduce-function.html 原创文章,转载请注明作者和出处,未经允许不可用于商业营利活动 reduce() 函数在 python 2 是内置函数, 从python 3 开始移到了 functools 模块. 官方文档是这样介绍的 reduce(...) reduce(function, sequence[, initial]) -> valu