Hadoop Combiner

转自:http://blog.csdn.net/jokes000/article/details/7072963

众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。

在上述过程中,我们看到至少两个性能瓶颈:

  1. 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  2. 使用专利中的国家一项来阐述数据倾斜这 个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的 键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网

络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。

如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:

Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。

由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。

代码如下:

[java] view plaincopy

  1. package com;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.DoubleWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. import org.apache.hadoop.util.Tool;
  17. import org.apache.hadoop.util.ToolRunner;
  18. public class AveragingWithCombiner extends Configured implements Tool {
  19. public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
  20. static enum ClaimsCounters { MISSING, QUOTED };
  21. // Map Method
  22. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  23. String fields[] = value.toString().split(",", -20);
  24. String country = fields[4];
  25. String numClaims = fields[8];
  26. if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
  27. context.write(new Text(country), new Text(numClaims + ",1"));
  28. }
  29. }
  30. }
  31. public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {
  32. // Reduce Method
  33. public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  34. double sum = 0;
  35. int count = 0;
  36. for (Text value : values) {
  37. String fields[] = value.toString().split(",");
  38. sum += Double.parseDouble(fields[0]);
  39. count += Integer.parseInt(fields[1]);
  40. }
  41. context.write(key, new DoubleWritable(sum/count));
  42. }
  43. }
  44. public static class Combine extends Reducer<Text,Text,Text,Text> {
  45. // Reduce Method
  46. public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  47. double sum = 0;
  48. int count = 0;
  49. for (Text value : values) {
  50. String fields[] = value.toString().split(",");
  51. sum += Double.parseDouble(fields[0]);
  52. count += Integer.parseInt(fields[1]);
  53. }
  54. context.write(key, new Text(sum+","+count));
  55. }
  56. }
  57. // run Method
  58. public int run(String[] args) throws Exception {
  59. // Create and Run the Job
  60. Job job = new Job();
  61. job.setJarByClass(AveragingWithCombiner.class);
  62. FileInputFormat.addInputPath(job, new Path(args[0]));
  63. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  64. job.setJobName("AveragingWithCombiner");
  65. job.setMapperClass(MapClass.class);
  66. job.setCombinerClass(Combine.class);
  67. job.setReducerClass(Reduce.class);
  68. job.setInputFormatClass(TextInputFormat.class);
  69. job.setOutputFormatClass(TextOutputFormat.class);
  70. job.setOutputKeyClass(Text.class);
  71. job.setOutputValueClass(Text.class);
  72. System.exit(job.waitForCompletion(true) ? 0 : 1);
  73. return 0;
  74. }
  75. public static void main(String[] args) throws Exception {
  76. int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
  77. System.exit(res);
  78. }
  79. }
时间: 2024-10-10 18:32:44

Hadoop Combiner的相关文章

Hadoop Combiner组件

一:背景 在MapReduce模型中,reduce的功能大多是统计分类类型的总量.求最大值最小值等,对于这些操作可以考虑在Map输出后进行Combiner操作,这样可以减少网络传输负载,同时减轻reduce任务的负担.Combiner操作是运行在每个节点上的,只会影响本地Map的输出结果,Combiner的输入为本地map的输出结果,很多时候Combiner的逻辑和reduce的逻辑是相同的,因此两者可以共用reducer体. 二:什么时候运行Combiner (1):当job设置了Combin

Hadoop学习笔记—8.Combiner与自定义Combiner

一.Combiner的出现背景 1.1 回顾Map阶段五大步凑 在第四篇博文<初始MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步凑,如下图所示: 其中,step1.5是一个可选步凑,它就是我们今天需要了解的 Map规约 阶段.现在,我们再来看看前一篇博文<计数器与自定义计数器>中的第一张关于计数器的图: 我们可以发现,其中有两个计数器:Combine output records和Combine input records,他们的计数都是

Hadoop读书笔记(十)MapReduce中的从计数器理解combiner归约

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.combiner 问:什么是combiner: 答:Combiner发生在Mapper端,对数据进行归约处理,使传到reducer端的数据变小了,传输时间变端,作业时间变短,Combiner不能夸Mapper执行,(只有reduce可以接受多个Mapper的任务). 并不是所有的算法都适合归约处理,例如求平均数 2.代码实现 WordCount.j

Hadoop中Combiner的使用

文章转载于:http://blog.csdn.net/ipolaris/article/details/8723782 Hadoop中Combiner的使用 在MapReduce中,当map生成的数据过大时,带宽就成了瓶颈,怎样精简压缩传给Reduce的数据,有不影响最终的结果呢.有一种方法就是使用Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出.下面以<Hadoop in action>中的专利数据为例.我们打算统计每个国家的专利数目

Hadoop初学指南(8)--MapReduce中的Combiner操作

本文主要介绍了MapReduce中的Combiner操作. 在MapReduce的执行步骤中,我们一共分了8步,其中Map中的最后一步规约操作就是今天要讲的Combiner. 首先看一下前文中的计数器: 我们可以发现,其中有两个计数器:Combine output records和Combine input records,他们的计数都是0,这是因为我们在代码中没有进行规约操作. 现在我们加入规约操作. 在前文代码(参看http://xlows.blog.51cto.com/5380484/14

辛星笔记之Hadoop权威指南第三篇combiner

集群上的可用宽带限制了MapReduce作业的数量,因此最重要的一点是尽量避免map任务和reduce任务之间的数据传输.Hadoop允许用户针对map任务的输出指定一个合并函数,有时候我们也称作combiner,它就像mapper和reducer一样. 合并函数的输出作为reduce函数的输入,由于合并函数是一个优化方案,所以Hadoop无法确定针对map任务输出中任一条记录需要调用多少次合并函数.不管我们调用多少次合并函数,reducer的输出结果都应该一致.合并函数的规则限定了可以使用的函

hadoop学习;Streaming,aggregate;combiner

hadoop streaming允许我们使用任何可执行脚本来处理按行组织的数据流,数据取自UNIX的标准输入STDIN,并输出到STDOUT 通过设定mapper为'RandomSample.py 10',我们按十分之一的采样率,没有设定特殊的reducer,一般默认使用IdentityReducer(把输入直接转向输出) 通过HDFS的命令getMerge(输出合并)或其他文件操作,可以获得正确数目的输出文件 随机采样脚本用Python实现,但是只要基于STDIN和STDOUT的脚本语言都可以

Hadoop之——Combiner编程

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46135857 一.Mapper类的实现 /** * KEYIN 即k1 表示行的偏移量 * VALUEIN 即v1 表示行文本内容 * KEYOUT 即k2 表示行中出现的单词 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 */ static class MyMapper extends Mapper<LongWritable, Text, Text, Lon

Hadoop 高级程序设计(三)---自定义Partition和Combiner

Hadoop提供了缺省的Partition来完成map的输出向reduce分发处理.有时也需要自定义partition来将相同key值的数据分发到同一个reduce处理,为了减少map过程输出的中间结果键值对的数量,降低网络数据通信开销,用户也可以自定制combiner过程. 自定制Partition过程: 在mapreduce中,partition用于决定Map节点输出将被分到哪个Reduce节点,MapReduce提供的缺省Partition是HashPartition,他根据每条数据的主键