MapReduce之Combiner组件

简述

Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入;

在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载

并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。

什么时候运行Combiner?

1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行;

2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行;

3、Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。

实例代码:

package MyCombiner;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CombinerExp {
    private final static String INPUT_PATH = "hdfs://master:8020/input";
    private final static String OUTPUT_PATH = "hdfs://master:8020/output.txt";
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private IntWritable one = new IntWritable(1);//1
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] str = value.toString().split("\\s+");

            for (String string : str) {
                System.out.println(string);
                word.set(string);
                context.write(word, one);
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum+=val.get();
            }
            result.set(sum);
            context.write(key,result);
        }
    }

    public static void main(String[] args) throws Exception {
        //1、配置
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "word count"); 

        //2、打包运行必须执行的方法
        job.setJarByClass(CombinerExp.class);

        //3、输入路径
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        //4、Map
        job.setMapperClass(MyMapper.class);

        //5、Combiner
        job.setCombinerClass(MyReducer.class);

        //6、Reducer
        //job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(0);//reduce个数默认是1

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
[[email protected] liguodong]# hdfs dfs -ls -R /input/
-rw-r--r--   1 root supergroup         27 2015-06-13 22:15 /input/input1
-rw-r--r--   1 root supergroup         38 2015-06-13 22:15 /input/input2

当我们只有map和combine而没有reduce时,combine并不会执行。
而输出的结果并没有被求和。
[[email protected] liguodong]# hdfs dfs -ls -R /output/
-rw-r--r--   3 liguodong supergroup          0 2015-06-13 22:17 /output/_SUCCESS
-rw-r--r--   3 liguodong supergroup         50 2015-06-13 22:17 /output/part-m-00000
-rw-r--r--   3 liguodong supergroup         39 2015-06-13 22:17 /output/part-m-00001

[[email protected] liguodong]# hdfs dfs -cat /output/part-m-00000
hello   1
you     1
hello   1
everyone        1
hello   1
hadoop  1
[[email protected] liguodong]# hdfs dfs -cat /output/part-m-00001
hello   1
you     1
hello   1
me      1
hi      1
baby    1

当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 32
    File System Counters
        ......
    Map-Reduce Framework
        Map input records=6
        Map output records=12
        ......
        Input split bytes=192
        Combine input records=12
        Combine output records=9
        ......
        Reduce input records=9
        Reduce output records=7
        Spilled Records=18
        ......
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=457912320
    File Input Format Counters
        Bytes Read=65
    File Output Format Counters
        Bytes Written=51

[[email protected] hadoop]# hdfs dfs -ls -R /output/
-rw-r--r--   3 liguodong supergroup          0 2015-06-13 22:41 /output/_SUCCESS
-rw-r--r--   3 liguodong supergroup         51 2015-06-13 22:41 /output/part-r-00000

[[email protected] hadoop]# hdfs dfs -cat /output/pa*
baby    1
everyone        1
hadoop  1
hello   5
hi      1
me      1
you     2
时间: 2025-01-14 14:53:06

MapReduce之Combiner组件的相关文章

[MapReduce_5] MapReduce 中的 Combiner 组件应用

0. 说明 Combiner 介绍 &&  在 MapReduce 中的应用 1. 介绍 Combiner: Map 端的 Reduce,有自己的使用场景 在相同 Key 过多的情况下,在 Map 端进行的预聚合,大大缓解了网络间的 K-V 全分发 Combiner 适用场景: 最大值 求和 最小值 Combiner 不适用平均值的计算 2. 结合 Combiner 实现 Word Count 在 [MapReduce_1] 运行 Word Count 示例程序 代码基础上在 WCApp.

Mapreduce的combiner

每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一.l combiner是MR程序中Mapper和Reducer之外的一种组件l combiner组件的父类就是Reducerl combiner和reducer的区别在于运行的位置:Combiner是在每一个maptask所在的节点运行Reducer是接收全局所有Mapper的输出结果:l co

MapReduce之Partitioner组件

简述 Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理: 你可以自定义key的一个分发规则,如数据文件包含不同的大学,而输出的要求是每个大学输出一个文件: Partitioner组件提供了一个默认的HashPartitioner. package org.apache.hadoop.mapreduce.lib.partition; public class HashPartitioner<K, V> extends Partit

MapReduce之RecordReader组件源码解析及实例

简述 无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类: 系统默认的RecordReader是LineRecordReader,TextInputFormat: LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value: 而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader: 应用场景:自定义读取每一条记录的方式:自定义读入key的类型,如

Hadoop Combiner组件

一:背景 在MapReduce模型中,reduce的功能大多是统计分类类型的总量.求最大值最小值等,对于这些操作可以考虑在Map输出后进行Combiner操作,这样可以减少网络传输负载,同时减轻reduce任务的负担.Combiner操作是运行在每个节点上的,只会影响本地Map的输出结果,Combiner的输入为本地map的输出结果,很多时候Combiner的逻辑和reduce的逻辑是相同的,因此两者可以共用reducer体. 二:什么时候运行Combiner (1):当job设置了Combin

mapreduce(2)--combiner使用和mr流程解析

一.准备工作 1.需求 在wordcount程序中使用自定义combiner 解析mapreduce的流程 2.环境配置 (1)hadoop为本地模式 (2)pom文件代码如下 <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</v

MapReduce中Combiner规约的作用以及不能作为MR标配的原因

作用:在Mapper端对数据进行Combine归约处理,Combine业务逻辑与Reducer端做的完全相同.处理后的数据再传送到Reducer端,再做一次归约.这样的好处是减少了网络传输的数量.在Mapper进行归约后,数据量变小了,这样再通过网络传输时,传输时间就变短了,减少了整个作业的运行时间.(注意:Mapper端的数据仅仅是本节点处理的数据,而Reducer端处理的数据是来自于多个Mapper任务的输出.因此在Mapper不能归约的数据,在Reducer端有可能归约处理). 不能作为标

大数据-Hadoop生态(19)-MapReduce框架原理-Combiner合并

1. Combiner概述 2. 自定义Combiner实现步骤 1). 定义一个Combiner继承Reducer,重写reduce方法 public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context)

第2节 mapreduce深入学习:7、MapReduce的规约过程combiner

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一. ?   combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件 ?   combiner 组件的父类就是 Reducer ?   combiner 和 reducer 的区别在于运行的位置: Combiner 是在每一个 maptask