MapReduce规约

深入了解Combiners编程(相当于Map端的Reduce)

  • 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
  • combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
  • 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
  • 注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。
  • 所以,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

在程序中仅需要在主函数中添加如下代码:

//规约的例子,足以
job.setCombinerClass(MyReducer.class);  

其中:MyReducer为自定的Reducer任务。

以单词技术为例:

(1)在没有Combine情况下

输入2个键值对<0,hello you><10,hello me>

map端产生4个键值对<k2,v2>,<hello,1><you,1><hello,1><me,1>

3个分组,<hello,{1,1}><me,{1}><you,{1}>

传输到reduce有3个键值对<hello,{2}><me,{1}><you,{1}>,3个分组

-----------------

hello you

hello  me

-----------------

(2)有Combine的情况

输入2个键值对<0,hello you><10,hello me>

map端产生4个键值对<k2,v2>,<hello,1><you,1><hello,1><me,1>

Combine输入键值对有4个 <hello,1><you,1><hello,1><me,1>

Combine输出键值对有三个<hello,{1,1}><me,{1}><you,{1}>

3个分组,<hello,{1,1}><me,{1}><you,{1}>

传输到reduce有3个键值对<hello,{2}><me,{1}><you,{1}>,3个分组

测试代码:

package Mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 规约:Combiner
 *
 */
public class CombinerTest {
    public static void main(String[] args) throws Exception {
        //必须要传递的是自定的mapper和reducer的类,输入输出的路径必须指定,输出的类型<k3,v3>必须指定
        //2将自定义的MyMapper和MyReducer组装在一起
        Configuration conf=new Configuration();
        String jobName=CombinerTest.class.getSimpleName();
        //1首先寫job,知道需要conf和jobname在去創建即可
        Job job = Job.getInstance(conf, jobName);

        //*13最后,如果要打包运行改程序,则需要调用如下行
        job.setJarByClass(CombinerTest.class);

        //3读取HDFS內容:FileInputFormat在mapreduce.lib包下
        FileInputFormat.setInputPaths(job, new Path("hdfs://neusoft-master:9000/data/hellodemo"));
        //4指定解析<k1,v1>的类(谁来解析键值对)
        //*指定解析的类可以省略不写,因为设置解析类默认的就是TextInputFormat.class
        job.setInputFormatClass(TextInputFormat.class);
        //5指定自定义mapper类
        job.setMapperClass(MyMapper.class);
        //6指定map输出的key2的类型和value2的类型  <k2,v2>
        //*下面两步可以省略,当<k3,v3>和<k2,v2>类型一致的时候,<k2,v2>类型可以不指定
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //7分区(默认1个),排序,分组,规约 采用 默认

        //规约的例子
        job.setCombinerClass(MyReducer.class);
        //接下来采用reduce步骤
        //8指定自定义的reduce类
        job.setReducerClass(MyReducer.class);
        //9指定输出的<k3,v3>类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //10指定输出<K3,V3>的类
        //*下面这一步可以省
        job.setOutputFormatClass(TextOutputFormat.class);
        //11指定输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://neusoft-master:9000/out1"));

        //12写的mapreduce程序要交给resource manager运行
        job.waitForCompletion(true);
    }
    private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{
        Text k2 = new Text();
        LongWritable v2 = new LongWritable();
        @Override
        protected void map(LongWritable key, Text value,//三个参数
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] splited = line.split("\t");//因为split方法属于string字符的方法,首先应该转化为string类型在使用
            for (String word : splited) {
                //word表示每一行中每个单词
                //对K2和V2赋值
                k2.set(word);
                v2.set(1L);
                context.write(k2, v2);
            }
        }
    }
    private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        LongWritable v3 = new LongWritable();
        @Override //k2表示单词,v2s表示不同单词出现的次数,需要对v2s进行迭代
        protected void reduce(Text k2, Iterable<LongWritable> v2s,  //三个参数
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            long sum =0;
            for (LongWritable v2 : v2s) {
                //LongWritable本身是hadoop类型,sum是java类型
                //首先将LongWritable转化为字符串,利用get方法
                sum+=v2.get();
            }
            v3.set(sum);
            //将k2,v3写出去
            context.write(k2, v3);
        }
    }
}

Combine测试代码

[[email protected] filecontent]# hadoop dfs -rm -R /out1

[[email protected] filecontent]# hadoop jar CombineTest.jar

Map-Reduce Framework

Map input records=2
Map output records=4
Map output bytes=51
Map output materialized bytes=49
Input split bytes=106
Combine input records=4
Combine output records=3
Reduce input groups=3
Reduce shuffle bytes=49
Reduce input records=3
Reduce output records=3
Spilled Records=6
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=53
CPU time spent (ms)=1760
Physical memory (bytes) snapshot=455901184
Virtual memory (bytes) snapshot=3118538752

使用Combiner有什么好处?

在map端执行reduce操作,可以减少map最终的数据量,减少传输到reduce的数据量,减少网络带宽。

为什么Combiner不是默认配置?

因为有的算法不适合Combiner

什么算法不适合Combiner?

不符合幂等性的算法,比如在网络传输时候出现故障,多次执行程序结果是不同的

如:求平均值的算法。

2 2 2  这三个数在一个文件中

1 1 1 1   这四个数在一个文件中

两个文件产生2个inputsplit,每一个inputsplit对应一个mao任务,产生2个mapper任务,如果求平均数,真实值(2+2+2+1+1+1+1)/7=1.4

如果使用Combiner,map端要做一次Reduce,第一个文件平均数为2,第二个文件的平均数为1,之后再reduce再求平均值得到1.5,值不正确。

为什么在map端执行了reduce操作,还需要在reduce端再次执行哪?
      答:因为map端执行的是局部reduce操作,在reduce端执行全局reduce操作。(上述例子中,map端仅仅指定的是单个文件的合并,reduce端执行的是两个文件的合并)

时间: 2024-10-23 10:02:43

MapReduce规约的相关文章

MapReduce的规约--&gt;自定义Combiner

wordCount例子 输入处理文件 hello me hello you 没有加入Combiner之前 设置combiner //加入Combiner //map产生的输出在这个Combiner运行 运行完成交给myreduce job.setCombinerClass(MyReducer.class); Combiner 位于map的reduce中间,会处理下数据 Combiner 位于map段的后面 ================流程==================== 原始 hel

MapReduce程序之combiner规约

[toc] MapReduce程序之combiner规约 前言 前面的wordcount程序,shuffle阶段的数据都是<hello, [1, 1, 1]>这种类型的(可以查看程序的输出),也就是说,交给reduce处理时就是这种类型的数据,这会带来一个问题,什么问题呢?就是网络传输问题,对于[1, 1, 1]这种数据,完全可以在本地就先完成规约,即将相当于在本地做一次reduce,从代码的角度去分析,其实也是一次reduce的操作,只是这个过程是在shuffle的时候就完成的. 程序代码

第2节 mapreduce深入学习:7、MapReduce的规约过程combiner

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一. ?   combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件 ?   combiner 组件的父类就是 Reducer ?   combiner 和 reducer 的区别在于运行的位置: Combiner 是在每一个 maptask

MapReduce中Combiner规约的作用以及不能作为MR标配的原因

作用:在Mapper端对数据进行Combine归约处理,Combine业务逻辑与Reducer端做的完全相同.处理后的数据再传送到Reducer端,再做一次归约.这样的好处是减少了网络传输的数量.在Mapper进行归约后,数据量变小了,这样再通过网络传输时,传输时间就变短了,减少了整个作业的运行时间.(注意:Mapper端的数据仅仅是本节点处理的数据,而Reducer端处理的数据是来自于多个Mapper任务的输出.因此在Mapper不能归约的数据,在Reducer端有可能归约处理). 不能作为标

【Big Data - Hadoop - MapReduce】通过腾讯shuffle部署对shuffle过程进行详解

摘要: 通过腾讯shuffle部署对shuffle过程进行详解 摘要:腾讯分布式数据仓库基于开源软件Hadoop和Hive进行构建,TDW计算引擎包括两部分:MapReduce和Spark,两者内部都包含了一个重要的过程—Shuffle.本文对Shuffle过程进行解析,并对两个计算引擎的Shuffle过程进行比较. 腾讯分布式数据仓库(Tencent distributed Data Warehouse, 简称TDW)基于开源软件Hadoop和Hive进行构建,并且根据公司数据量大.计算复杂等

MapReduce (MRV1)设计理念与基本架构

MapReduce 是一个分布式计算框架,主要由两部分组成:编程模型和运行时环境. 其中,编程模型为用户提供了非常易用的编程接口,用户只需要像编写串行程序一样实现几个简单的函数即可实现一个分布式程序,而其他比较复杂的工作,如节点间的通信.节点失效.数据切分等,全部由MapReduce 运行时环境完成,用户无须关心这些细节. 编程模型: ????它的基本编程模型是将问题抽象成Map 和Reduce 两个阶段.其中,Map 阶段将输入数据解析成key/value,迭代调用map() 函数处理后,再以

文本挖掘之文本聚类(MapReduce)

刘 勇  Email:[email protected] 简介 针对大数量的文本数据,采用单线程处理时,一方面消耗较长处理时间,另一方面对大量数据的I/O操作也会消耗较长处理时间,同时对内存空间的消耗也是很大,因此,本文引入MapReduce计算模型,以分布式方式处理文本数据,以期提高数据处理速率.本文结合Kmeans和DBSCAN算法,对上述算法进行改进,其中借鉴Kmeans聚类方法(类别个数的确定性)以及DBSCAN聚类方法(基于密度),并在数据处理过程中引入多个Reducer对数据进行归并

Hadoop读书笔记(五)MapReduce统计单词demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop初学指南(5)--MapReduce入门

本文将介绍Hadoop中的重点MapReduce的入门知识. (1)MapReduce概述 MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,在Hadoop中用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单.这两个函数的形参是key.value对,表示函数的输入信息. (2)MR执行流程 客户端的代码会提交给JobTracker,也就是JobTracker接受由用户提交