MapReduce中shuffle过程

shuffle是MapReduce的核心,map和reduce的中间过程。

Map负责过滤分发,reduce归并整理,从map输出到reduce输入就是shuffle过程。

实现的功能

分区

决定当前key交给哪个reduce处理

默认:按照key的hash值对reduce的个数取余进行分区

分组

将相同key的value合并

排序

按照key对每一个keyvalue进行排序,字典排序

过程

map端shuffle

spill阶段:溢写

每一个map task处理的结果会进入环形缓冲区(内存100M)

分区

对每一条key进行分区(标上交给哪个reduce)

hadoop      1       reduce0
hive        1       reduce0
spark       1       reduce1
hadoop      1       reduce0
hbase       1       reduce1
排序

按照key排序,将相同分区的数据进行分区内排序

hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hbase       1       reduce1
spark       1       reduce1
溢写

当整个缓冲区达到阈值80%,开始进行溢写

将当前分区排序后的数据写入磁盘变成一个文件file1最终生成多个spill小文件

可以在mapred-site.xml中设置内存的大小和溢写的阈值

在mapred-site.xml中设置内存的大小
?
    <property>
?
      <name>mapreduce.task.io.sort.mb</name>
?
      <value>100</value>
?
    </property>
?
在mapred-site.xml中设置内存溢写的阈值  
?
    <property>
?
      <name>mapreduce.task.io.sort.spill.percent</name>
?
      <value>0.8</value>
?
    </property>

merge:合并

将spill生成的多个小文件进行合并

排序:将相同分区的数据进行分区内排序,实现comparator比较器进行比较。最终形成一个文件。



file1
hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hbase       1       reduce1
spark       1       reduce1
?
file2
hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hbase       1       reduce1
spark       1       reduce1
?
end_file:
hadoop      1       reduce0
hadoop      1       reduce0
hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hive        1       reduce0
hbase       1       reduce1
hbase       1       reduce1
spark       1       reduce1
spark       1       reduce1

map task 结束,通知app master,app master通知reduce拉取数据

reduce端shuffle

map task1
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hive        1       reduce0
        hive        1       reduce0
        hbase       1       reduce1
        hbase       1       reduce1
        spark       1       reduce1
        spark       1       reduce1
map task2
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hive        1       reduce0
        hive        1       reduce0
        hbase       1       reduce1
        hbase       1       reduce1
        spark       1       reduce1
        spark       1       reduce1

reduce启动多个线程通过http到每台机器上拉取属于自己分区的数据

reduce0:
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hive        1       reduce0
    hive        1       reduce0
    hive        1       reduce0
    hive        1       reduce0

merge:合并,将每个map task的结果中属于自己的分区数据进行合并

排序:对整体属于我分区的数据进行排序

分组:对相同key的value进行合并,使用comparable完成比较。

hadoop,list<1,1,1,1,1,1,1,1>
hive,list<1,1,1,1>

  

优化

combine

在map阶段提前进行一次合并。一般等同于提前执行reduce



job.setCombinerClass(WCReduce.class);

compress

压缩中间结果集,减少磁盘IO以及网络IO

压缩配置方式

1.default:所有hadoop中默认的配置项
2.site:用于自定义配置文件,如果修改以后必须重启生效
3.conf对象配置每个程序的自定义配置
4.运行时通过参数实现用户自定义配置
bin/yarn jar xx.jar -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec main_class input_path ouput_path

查看本地库支持哪些压缩

bin/hadoop checknative

通过conf配置对象配置压缩

public static void main(String[] args) {
        Configuration configuration = new Configuration();
        //配置map中间结果集压缩
        configuration.set("mapreduce.map.output.compress","true");
        configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec");
        //配置reduce结果集压缩
        configuration.set("mapreduce.output.fileoutputformat.compress","true");
        configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec");
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
}

  

原文地址:https://www.cnblogs.com/whcwkw1314/p/8970985.html

时间: 2024-10-10 23:12:34

MapReduce中shuffle过程的相关文章

MapReduce中Shuffle过程整理

MapReduce中的Shuffle过程分为Map端和Reduce端两个过程. Map端: 1.(Hash Partitioner)执行完Map函数后,根据key进行hash,并对该结果进行Reduce的数量取模(该键值对将会由某个reduce端处理)得到一个分区号. 2.(Sort Combiner)将该键值对和分区号序列化之后的字节写入到内存缓存区(大小为100M,装载因子为0.8)中,当内存缓冲区的大小超过100*0.8 = 80M的时候,将会spill(溢出):在溢出之前会在内存缓冲区中

MapReduce的Shuffle过程介绍

MapReduce的Shuffle过程介绍 Shuffle的本义是洗牌.混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好.MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据. 为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发:Reduce是规约,负责数据的计算归并.Reduce的数据来源于Map,Map的输出即是Reduce

【转】mapreduce的shuffle过程

转自http://langyu.iteye.com/blog/992916 Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混.前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在看相关资料而看不懂时很恼火,所以在这 里我尽最大的可能试着把S

MapReduce的shuffle过程

本文是学习时的自我总结,用于日后温习.如有错误还望谅解,不吝赐教. 此处附上一篇个人认为写的比较好的博客,转自枝叶飞扬的博文:http://blog.sina.com.cn/s/blog_605f5b4f010188lp.html### 将Map的输出作为Reduce的输入的过程就是Shuffle了,这个是MapReduce优化的重点地方 Shuffle 过程 ①   Map在内存中开启一个默认大小100MB的环形内存缓冲区用于输出 ②   当缓冲区内存达到默认阈值 80% 时,Map 会启动守

Hadoop Mapreduce中shuffle 详解

MapReduce 里面的shuffle:描述者数据从map task 输出到reduce task 输入的这段过程 Shuffle 过程: 首先,map 输出的<key,value > 会放在内存中,内存有一定的大小,超过之后,会将内存里的东西溢写(spill) 到磁盘(disk)中 .在从内存溢写到磁盘的过程中,会有两个操作:分区(parttition),排序(sort).map结束之后,磁盘中会有很多文件 . 有很多小文件,需要将文件进行文件的合并,并且排序.map 中的一些map任务可

Mapreduce中maptask过程详解

一.Maptask并行度与决定机制 1.一个job任务的map阶段的并行度默认是由该任务的大小决定的: 2.一个split切分分配一个maprask来并行处理: 3.默认情况下,split切分的大小等于blocksize大小: 4.切片不是mapper类中对单词的切片,而是对每一个处理文件的单独切片. eg.  默认情况下,一个maptask处理的文件大小为128M,比如一个400M的数据文件,就需要4个maptask并行来处理,而500M的数据文件也是需要4个maptask. 二.Maptas

MapReduce中的shuffle过程

MapReduce的shuffle过程介绍 Shuffle的语义是洗牌.混洗,即把一组有一定规则的数据尽量转换成一组无规则的数据,随机性越高越好. MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据. 为什么MapReduce计算模型需要Shuffle过程? MapReduce计算模型一般包括两个重要的阶段: Map是映射,负责数据的过滤分发: Reduce是规约,负责数据的计算归并. Reduce的数据来源于Map,Map的输出即Redu

【Big Data - Hadoop - MapReduce】通过腾讯shuffle部署对shuffle过程进行详解

摘要: 通过腾讯shuffle部署对shuffle过程进行详解 摘要:腾讯分布式数据仓库基于开源软件Hadoop和Hive进行构建,TDW计算引擎包括两部分:MapReduce和Spark,两者内部都包含了一个重要的过程—Shuffle.本文对Shuffle过程进行解析,并对两个计算引擎的Shuffle过程进行比较. 腾讯分布式数据仓库(Tencent distributed Data Warehouse, 简称TDW)基于开源软件Hadoop和Hive进行构建,并且根据公司数据量大.计算复杂等

Hadoop学习之路(二十三)MapReduce中的shuffle详解

概述 1.MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle 2.Shuffle: 数据混洗 ——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并 排序) 3.具体来说:就是将 MapTask 输出的处理结果数据,按照 Partitioner 组件制定的规则分发 给 ReduceTask,并在分发的过程中,对数据按 key 进行了分区和排序 MapReduce的Shuffle