Mapreduce中maptask过程详解

一、Maptask并行度与决定机制

  1.一个job任务的map阶段的并行度默认是由该任务的大小决定的;

  2.一个split切分分配一个maprask来并行处理;

  3.默认情况下,split切分的大小等于blocksize大小;

  4.切片不是mapper类中对单词的切片,而是对每一个处理文件的单独切片。

  eg.  默认情况下,一个maptask处理的文件大小为128M,比如一个400M的数据文件,就需要4个maptask并行来处理,而500M的数据文件也是需要4个maptask。

二、Maptask运行机制

  1.读数据文件:执行类Driver通过InputFormat类读取文件中的数据;

  2.mapper阶段:通过文件的大小决定了maptask的数量,然后mapper进行逻辑运行(读数据、切分、封装);

  3.OutputCollector阶段:mapper方法通过OutputCollector接口将KV对写入到环形缓冲区中(这个过程不需要我们处理我们);

  4.溢写阶段:环形缓冲区默认的大小为100M,当环形缓冲区中数据量到达阈值的80%的时候发生溢写,溢写的过程中会保证数据KV对使用默认的分区和排序(HashPartitioner分区、字典排序,而环形缓冲区大小和阈值的大小都是可以通过配置来修改的);

  5.归并排序:将溢写的数据进行合并排序。

三、MR的小文件优化案例

  当许多个小文件上传到HDFS集群上时,每个小文件都将会占用一个blocksize的大小(128M),而且在对它们进行MR计算时,一个文件就会开启一个maptask,这样会浪费很多的资源,下面有两种解决方案:

  1.在文件上传到HDFS集群前,先将文件进行合并成一个大的文件,再上传到HDFS集群进行存储和计算;

  2.若文件已经上传到HDFS集群,需要直接进行计算时,

  可以再Driver类中设置输入流之前设置InputFormatClass属性为CombinerTextInputFormat(它的默认为TextInputFormat),

  原理是:CombineTextInputFormat类可以将多个小文件交给一个split切片,然后交给一个maptask来处理,即再Driver类中设置输入流FileInputFormat前加入代码:

job.setInputFormatClass(CombinerTextInputFormat.class);
CombinerTextInputFormat.setMaxInputSplitSize(job,4194304);    //设置切片最大值为4M
CombinerTextInputFormat.setMinInputSplitSize(job,3145725);    //设置切片最大值为3M

  表示大小在3M~4M的文件会被方法一个切片中,那么如果有无数的小文件,一个maptask中大概会有28~42个小文件一起处理。

四、自定义分区Partitioner

  在MR程序中,默认分区为HashPartitioner,以下为源码:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public HashPartitioner() {
    }

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
    }
}

  HashPartitioner继承了父类Partitioner,其中getPartition方法返回int值0(注释:分区数量决定了reducetask的数量,不分区reducetask值为1,所以一直返回int值0,也就只会产生一个结果文件!!!)

  而如果我们想要进行自定义分区,就要重新定义一个分区类继承Partitioner类:

public class FlowPartitioner extends Partitioner<Text,FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int i) {
        //获取用来分区的电话号码前三位
        String phoneNum = key.toString().substring(0, 3);
        //设置分区逻辑
        int partitionNum = 4;
        if ("135".equals(phoneNum)){
            return 0;
        }else if ("137".equals(phoneNum)){
            return 1;
        }else if ("138".equals(phoneNum)){
            return 2;
        }else if ("139".equals(phoneNum)){
            return 3;
        }
        return partitionNum;
    }
}

  我在流量统计案例中也写了该分区类,然后再Driver类中的InputFormat类之前加入设置的自定义分区代码:

job.setPartitionClass(PhoneNumPartitioner.class);
job.setNumReduceTasks(5);		(注意:输出文件数量要大于partitioner分区的数量)

  总结:MR程序运算过程中,决定maptask个数的有块大小(blocksize)、数据文件大小、文件输入方式(小文件优化);而决定reducetask个数的是分区(无分区时reducetask个数为1,生成一个结果文件)。

  

  

  

原文地址:https://www.cnblogs.com/HelloBigTable/p/10591105.html

时间: 2024-10-16 15:09:27

Mapreduce中maptask过程详解的相关文章

【转】Hadoop在MapReduce中使用压缩详解

原文链接 http://www.cnblogs.com/ggjucheng/archive/2012/04/22/2465580.html#top hadoop对于压缩文件的支持 hadoop对于压缩格式的是透明识别,我们的MapReduce任务的执行是透明的,hadoop能够自动为我们 将压缩的文件解压,而不用我们去关心. 如果我们压缩的文件有相应压缩格式的扩展名(比如lzo,gz,bzip2等),hadoop就会根据扩展名去选择解码器解压. hadoop对每个压缩格式的支持,详细见下表:  

Hadoop学习之路(二十三)MapReduce中的shuffle详解

概述 1.MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle 2.Shuffle: 数据混洗 ——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并 排序) 3.具体来说:就是将 MapTask 输出的处理结果数据,按照 Partitioner 组件制定的规则分发 给 ReduceTask,并在分发的过程中,对数据按 key 进行了分区和排序 MapReduce的Shuffle

MapReduce阶段源码分析以及shuffle过程详解

MapReducer工作流程图: 1. MapReduce阶段源码分析 1)客户端提交源码分析 解释:   - 判断是否打印日志   - 判断是否使用新的API,检查连接   - 在检查连接时,检查输入输出路径,计算切片,将jar.配置文件复制到HDFS   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片   - 通过block大小和最小切片数.最大切片数计算出切片大小  

Hadoop MapReduce执行过程详解(带hadoop例子)

https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任

Hadoop学习之MapReduce执行过程详解

转自:http://my.oschina.net/itblog/blog/275294 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我

SVN中基于Maven的Web项目更新到本地过程详解

环境 MyEclipse:10.7 Maven:3.1.1 概述 最近在做项目的时候,MyEclipse下载SVN上面基于Maven的Web项目总是出现很多问题,有时候搞了很半天,Maven项目还是出现叉号,最后总结了方法步骤,终于可以将出现的问题解决,在此,将重现从SVN上将基于Maven的Web项目变成本地MyEclipse中项目的过程,问题也在其中进行解决. 问题补充 在使用Myeclipse的部署Web项目的时候,在点击部署按钮的时候,没有任何反应,在此提供两种解决方法,问题如图1所示:

Ossim 中漏洞扫描详解

Ossim 中漏洞扫描详解 Openvas是一套开源漏洞扫描系统,如果手动搭建需要复杂的过程,花费不少人力和时间成本,此文主要针对OSSIM平台下如何以图形化方式操作漏洞扫描的过程. 准备工作:首先确保没有运行的扫描进程和任务 扫描漏洞同时升级漏洞库会导致升级失败. 第一步:同步插件 #openvas-nvt-sync 同步数万个插件时间比较长,可以去喝杯咖啡啦,或者了解一下插件的组成. 表1 Openvas主要脚本分类及分布情况 规则名称 数量 备注 IIS_frontpage_DOS_2.n

使用HeartBeat实现高可用HA的配置过程详解

使用HeartBeat实现高可用HA的配置过程详解 一.写在前面 HA即(high available)高可用,又被叫做双机热备,用于关键性业务.简单理解就是,有2台机器 A 和 B,正常是 A 提供服务,B 待命闲置,当 A 宕机或服务宕掉,会切换至B机器继续提供服务.常见的实现高可用的开源软件有 heartbeat 和 keepalived. 这样,一台 web 服务器一天24小时提供web服务,难免会存在 web 服务挂掉或服务器宕机宕机的情况,那么用户就访问不了服务了,这当然不是我们期望

转:LoadRunner中参数化技术详解

LoadRunner中参数化技术详解 LoadRunner在录制脚本的时候,只是忠实的记录了所有从客户端发送到服务器的数据,而在进行性能测试的时候,为了更接近真实的模拟现实应用,对于某些信息需要每次提交不同的数据,或者使用多个不同的值进行循环输入.这时,在LoadRunner中就可以进行参数化设置,以使用多个不同的值提交应用请求. [参数化]使用指定数据源中的值来替换脚本录制生成的语句中的参数. [好处] l  减少脚本的大小 l  提供使用不同的值执行脚本的能力,更加真实的模拟现实应用. [参