Combiners编程
1.每一个map会产生大量的输出,combiner的作用就是在map端对输出,先做一次合并,以减少传输到reducer的数据量.
2.combiner最基本是实现本地key的归并,具有类似本地reduce功能如果不用combiner,那么所有的结果都是reduce完成,效率会相对 降低
3.使用combiner,先完成的map会在本地聚合,提升速度.
combiner阶段是可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。
代码实现:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { long count = 0; // 迭代所有的value,进行累加 for (IntWritable value : values) { count += value.get(); } // 再将这个key及其汇总value-count输出 context.write(key, new LongWritable(count)); } }
在mian方法中加入代码实现:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 创建一个job对象来组装我们这个wordcount程序 Job wcjob = Job.getInstance(conf); /** * 加入一个combiner组件 */ // wcjob.setCombinerClass(WordcountCombiner.class); wcjob.setCombinerClass(WordCountReducer.class); } }
第11行和12行意思是一样的只不过在CombinerClass调用的Reduce是在maptaks 本地运行 Combiner继承了reduce所以两者怎么写都行
时间: 2024-10-29 19:11:15