Combiner

Combiner编程(1.5可选步骤,视情况而定!)

  • 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
  • combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。 如果不用combiner,那么,所有的结果

  都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

  • 注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该

  用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

  1 package combine;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15
 16 /**
 17  * 问:为什么使用Combiner?
 18  * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
 19  *
 20  * 问:为什么Combiner不作为MR运行的标配,而是可选步骤呢?
 21  * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
 22  *
 23  * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作呢?
 24  * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
 25  *
 26  */
 27 public class WordCountApp {
 28     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
 29     static final String OUT_PATH = "hdfs://chaoren:9000/out";
 30
 31     public static void main(String[] args) throws Exception {
 32         Configuration conf = new Configuration();
 33         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 34         final Path outPath = new Path(OUT_PATH);
 35         if(fileSystem.exists(outPath)){
 36             fileSystem.delete(outPath, true);
 37         }
 38
 39         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
 40         //1.1指定读取的文件位于哪里
 41         FileInputFormat.setInputPaths(job, INPUT_PATH);
 42         //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
 43         //job.setInputFormatClass(TextInputFormat.class);
 44
 45         //1.2 指定自定义的map类
 46         job.setMapperClass(MyMapper.class);
 47         //map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
 48         //job.setMapOutputKeyClass(Text.class);
 49         //job.setMapOutputValueClass(LongWritable.class);
 50
 51         //1.3 分区
 52         //job.setPartitionerClass(HashPartitioner.class);
 53         //有一个reduce任务运行
 54         //job.setNumReduceTasks(1);
 55
 56         //1.4 TODO 排序、分组
 57
 58         //1.5 规约
 59         job.setCombinerClass(MyCombiner.class);
 60
 61         //2.2 指定自定义reduce类
 62         job.setReducerClass(MyReducer.class);
 63         //指定reduce的输出类型
 64         job.setOutputKeyClass(Text.class);
 65         job.setOutputValueClass(LongWritable.class);
 66
 67         //2.3 指定写出到哪里
 68         FileOutputFormat.setOutputPath(job, outPath);
 69         //指定输出文件的格式化类
 70         //job.setOutputFormatClass(TextOutputFormat.class);
 71
 72         //把job提交给JobTracker运行
 73         job.waitForCompletion(true);
 74     }
 75
 76     /**
 77      * KEYIN    即k1        表示行的偏移量
 78      * VALUEIN    即v1        表示行文本内容
 79      * KEYOUT    即k2        表示行中出现的单词
 80      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
 81      */
 82     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 83         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 84             final String[] splited = v1.toString().split("\t");
 85             for (String word : splited) {
 86                 context.write(new Text(word), new LongWritable(1));
 87                 System.out.println("Mapper输出<"+word+","+1+">");
 88             }
 89         };
 90     }
 91
 92     /**
 93      * KEYIN    即k2        表示行中出现的单词
 94      * VALUEIN    即v2        表示行中出现的单词的次数
 95      * KEYOUT    即k3        表示文本中出现的不同单词
 96      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
 97      *
 98      */
 99     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
100         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
101             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
102             System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
103             long times = 0L;
104             for (LongWritable count : v2s) {
105                 times += count.get();
106                 //显示次数表示输入的k2,v2的键值对数量
107                 System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">");
108             }
109             ctx.write(k2, new LongWritable(times));
110         };
111     }
112
113
114     static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
115         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
116             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
117             System.out.println("Combiner输入分组<"+k2.toString()+",...>");
118             long times = 0L;
119             for (LongWritable count : v2s) {
120                 times += count.get();
121                 //显示次数表示输入的k2,v2的键值对数量
122                 System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">");
123             }
124
125             ctx.write(k2, new LongWritable(times));
126             //显示次数表示输出的k2,v2的键值对数量
127             System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
128         };
129     }
130 }

时间: 2024-10-22 00:20:49

Combiner的相关文章

Partitioner和Combiner两个阶段

Partitioner编程 将有一些共同特性的数据,写入到同一个文件里. 排序和分组 在map和reduce阶段进行排序时,比较的是k2.v2是不参与排序比较的. 如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2, 才能参与比较.如果想自定义排序规则,被排序的对象要实现 WritableComparable接口,在compareTo方法中实现排序规则, 然后将这个对象当做k2,即可完成排序分组时也是按照k2进行比较的. Combiners编程 1.每一个map会产生大量的输出,c

MapReduce之Combiner组件

简述 Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入: 在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载: 并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了

hadoop1中partition和combiner作用

---恢复内容开始--- 1.解析Partiton 把map任务的输出的中间结果按照key的范围进行划分成r份,r代表reduce任务的个数.hadoop默认有个类HashPartition实现分区,通过key对reduce的个数取模(key%r),这样可以保证一段范围内的key交由一个reduce处理.以此来实现reduce的负载均衡.不至于使有些reduce处理的任务压力过大,有些reduce空闲. 如果我们对hadoop本身的分区算法不满意,或者我们因为我们的业务需求,我们可以自定义一个类

Hadoop Combiner组件

一:背景 在MapReduce模型中,reduce的功能大多是统计分类类型的总量.求最大值最小值等,对于这些操作可以考虑在Map输出后进行Combiner操作,这样可以减少网络传输负载,同时减轻reduce任务的负担.Combiner操作是运行在每个节点上的,只会影响本地Map的输出结果,Combiner的输入为本地map的输出结果,很多时候Combiner的逻辑和reduce的逻辑是相同的,因此两者可以共用reducer体. 二:什么时候运行Combiner (1):当job设置了Combin

Hadoop学习笔记—8.Combiner与自定义Combiner

一.Combiner的出现背景 1.1 回顾Map阶段五大步凑 在第四篇博文<初始MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步凑,如下图所示: 其中,step1.5是一个可选步凑,它就是我们今天需要了解的 Map规约 阶段.现在,我们再来看看前一篇博文<计数器与自定义计数器>中的第一张关于计数器的图: 我们可以发现,其中有两个计数器:Combine output records和Combine input records,他们的计数都是

Hadoop读书笔记(十)MapReduce中的从计数器理解combiner归约

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.combiner 问:什么是combiner: 答:Combiner发生在Mapper端,对数据进行归约处理,使传到reducer端的数据变小了,传输时间变端,作业时间变短,Combiner不能夸Mapper执行,(只有reduce可以接受多个Mapper的任务). 并不是所有的算法都适合归约处理,例如求平均数 2.代码实现 WordCount.j

mapreduce中的combiner、partitioner、Shuffle

一.combiner combiner不是mapreduce的一个必备过程,是由开发者选择是否使用的,是mapreduce的一种优化手段. combiner的作用:combiner是为了解决mapreduce过程中的两个性能瓶颈,1.网络宽带严重被占降低程序效率,2.单一节点承载过重降低程序效率.所以性能有以下两个作用: 1.combiner实现本地key的聚合,对map输出的key排序value进行迭代 2.combiner还有本地reduce功能(其本质上就是一个reduce). 什么时候运

MapReduce的规约--&gt;自定义Combiner

wordCount例子 输入处理文件 hello me hello you 没有加入Combiner之前 设置combiner //加入Combiner //map产生的输出在这个Combiner运行 运行完成交给myreduce job.setCombinerClass(MyReducer.class); Combiner 位于map的reduce中间,会处理下数据 Combiner 位于map段的后面 ================流程==================== 原始 hel

Hadoop2.4.1 MapReduce通过Map端shuffle(Combiner)完成数据去重

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWrita