MapReduce过程详解

Hadoop越来越火,围绕Hadoop的子项目更是增长迅速,光Apache官网上列出来的就十几个,但是万变不离其宗,大部分项目都是基于Hadoop Common。

MapReduce更是核心中的核心。那么到底什么是MapReduce,它具体是怎么工作的呢?

关于它的原理,说简单也简单,随便画个图喷一下Map和Reduce两个阶段似乎就完了。但其实这里面还包含了很多的子阶段,尤其是Shuffle,很多资料里都把它称为MapReduce的“心脏”,和所谓“奇迹发生的地方”。真正能说清楚其中关系的人就没那么多了。可是了解这些流程对我们理解和掌握MapReduce并对其进行调优是非常有用的。

首先我们看一幅图,包含了从头到尾的整个过程,后面对所有步骤的解释都以此图作为参考(此图100%原创)

这张图简单来说,就是说在我们常见的Map和Reduce之间还有一系列的过程,其中包括partition、Sort、Combine、copy、merge等,而这些过程往往被统称为“Shuffle”也就是“混洗”,而Shuffle的目的就是对数据进行梳理,排序,以便科学的方式分发给每个Reducer,以便高效的进行计算和处理(难怪人家说这事奇迹发生的地方,原理这里面有这么多花花,能没奇迹嘛?)

如果您是Hadoop的大牛,看了这幅图可能马上就要跳出来了,不对!还有一个spill过程云云。。。

且慢,关于spill,我认为只是一个实现细节,其实就是MapReduce利用内存缓冲的方式提高效率,整个的过程和原理并没有受影响,所以在此忽略掉spill过程,以便更好了解。

光看原理图还是有点费解是吧?没错!我一直认为,没有例子的文章就是耍流氓,所以我们就用大家耳熟能详的WordCount作为例子,开始我们的讨论。

先创建两个文本文件,作为我们例子的输入:

file1 内容为:

[html] view plain copy

  1. My name is Tony
  2. My company is pivotal

file2 内容为:

[html] view plain copy

  1. My name is Lisa
  2. My company is EMC

第一步:对输入分片进行map()处理

首先我们的输入就是两个文件,默认情况下是两个split,对应前面图中的split0,split1。两个split默认会分给两个Mapper来处理,WordCount例子相当地暴力,这一步里面就是把文件内容分解为单词和1,其中的单词就是我们的key,后面的数字就是对应的值,也就是value【在此假设各位都对WordCount程序烂熟于心】。

那么对应两个Mapper的输出就是:

split0被处理后的数据为:

[html] view plain copy

  1. My       1
  2. name    1
  3. is         1
  4. Tony     1
  5. My          1
  6. company     1
  7. is       1
  8. Pivotal   1

split1被处理后的数据为:

[html] view plain copy

  1. My       1
  2. name    1
  3. is       1
  4. Lisa     1
  5. My       1
  6. company  1
  7. is       1
  8. EMC     1

第二步:对map()的输出结果进行Partition

partition是什么?partition就是分区。

为什么要分区?因为有时候会有多个Reducer,partition就是提前对输入进行处理,根据将来的Reducer进行分区,到时候Reducer处理的时候,只需要处理分给自己的数据就可以了。

如何分区?主要的分区方法就是按照key不同,把数据分开,其中很重要的一点就是要保证key的唯一性,因为将来做Reduce的时候很可能是在不同的节点上做的,如果一个key同时存在两个节点上,Reduce的结果就会出问题,所以很常见的partition方法就是哈希。

结合我们的例子,我们这里假设有两个Reducer,前面两个Split做完Partition的结果就会如下:

split0的数据经过map()后再进行分区

partition 1:

[html] view plain copy

  1. company 1
  2. is      1
  3. is    1

Partition 2:

[html] view plain copy

  1. My     1
  2. My    1
  3. name  1
  4. Pivotal   1
  5. Tony    1

注:按Key进行hash并且对reducer数量(这里设置为2)取模,所以结果只能是两个。


split1的数据经过map()后再进行分区(同split0):

partition 1:

[html] view plain copy

  1. company 1
  2. is    1
  3. is      1
  4. EMC   1

Partition 2:

[html] view plain copy

  1. My     1
  2. My       1
  3. name   1
  4. Lisa     1

注:其中partition1是给Reducer1处理的,partition2是给Reducer2处理的。这里我们可以看到,partition只是把所有的条目按照key分了一个区,没有其他任何处理,每个区里面的key都不会出现在另外一个区里面。

第三步:sort

sort就是排序咯,其实这个过程在我看来并不是必须的,完全可以交给客户端自己的程序来处理。那为什么还要排序呢?可能是写MapReduce的大牛们想,“大部分reduce程序应该都希望输入的是已经按key排序号的数据,如果是这样,那么我们就干脆顺手帮你做掉啦!”

那么我们假设对前面的数据再进行排序,结果如下:

split0 的partition 1中的数据排序后如下:

[html] view plain copy

  1. company 1
  2. is      1
  3. is    1

split0的partition 2中的数据排序后如下:

[html] view plain copy

  1. My     1
  2. My    1
  3. name  1
  4. Pivotal   1
  5. Tony    1

split1的partition 1中的数据排序后如下:

[html] view plain copy

  1. company 1
  2. EMC   1
  3. is    1
  4. is      1

split1的partition 2中的数据排序后如下:

[html] view plain copy

  1. Lisa   1
  2. My     1
  3. My       1
  4. name   1

注:这里可以看到,每个partition里面的条目都按照key的顺序做了排序。

第四步:Combine

什么是Combine呢?combine其实可以理解为一个mini Reduce过程,它发生在前面Map的输出结果之后,目的就是在结果送到Reducer之前先对其进行一次计算,以减少文件的大小,方便后面的传输。但这一步不是必须的。

按照前面的输出,执行Combine:

split0的partition 1的数据为:

[html] view plain copy

  1. company 1
  2. is      2

split0的partition 2的数据为:

[html] view plain copy

  1. My     2
  2. name  1
  3. Pivotal   1
  4. Tony    1

split1的partition1 的数据为:

[html] view plain copy

  1. Partition 1:
  2. company 1
  3. EMC   1
  4. is    2

split1的partition 2的数据为:

[html] view plain copy

  1. Lisa   1
  2. My     2
  3. name   1

注:针对前面的输出结果,我们已经局部地统计了is和My的出现频率,减少了输出文件的大小。

第五步:copy

下面就要准备把输出结果传送给Reducer了。这个阶段被称为Copy,但事实上我认为叫它Download更为合适,因为实现的时候,是通过http的方式,由Reducer节点向各个Mapper节点下载属于自己分区的数据。

那么根据前面的partition,下载完的结果如下:

Reducer 节点1的共包含两个文件(split0的partition1和split1的partition1):

[html] view plain copy

  1. Partition 1:
  2. company 1
  3. is      2
  4. Partition 1:
  5. company  1
  6. EMC    1
  7. is    2

Reducer 节点2也是两个文件(split0的partition1和split1的partition2):

[html] view plain copy

  1. Partition 2:
  2. My     2
  3. name  1
  4. Pivotal   1
  5. Tony    1
  6. Partition 2:
  7. Lisa   1
  8. My     2
  9. name   1

注:通过Copy,相同Partition的数据落到了同一个节点上。

第六步:Merge

如上一步所示,此时Reducer得到的文件是从不同的Mapper那里下载到的,需要对他们进行合并为一个文件,所以下面这一步就是Merge,结果如下:

Reducer 节点1的数据如下:

[html] view plain copy

  1. company 1
  2. company  1
  3. EMC    1
  4. is      2
  5. is    2

Reducer节点2的数据如下:

[html] view plain copy

  1. Lisa  1
  2. My     2
  3. My    2
  4. name  1
  5. name  1
  6. Pivotal   1
  7. Tony    1

注:Map端也有merge的过程,发生在环形缓冲区部分。


第七步:reduce处理

终于可以进行最后的Reduce啦,这步相当简单咯,根据每个文件中的内容最后做一次统计,结果如下:

Reducer节点1的数据:

[html] view plain copy

  1. company 2
  2. EMC    1
  3. is      4

Reducer节点2的数据:

[html] view plain copy

  1. Lisa  1
  2. My     4
  3. name  2
  4. Pivotal   1
  5. Tony    1

至此大功告成!我们成功统计出两个文件里面每个单词的数目,同时把他们存入到两个输出文件中,这两个输出文件也就是传说中的part-r-00000和part-r-00001,看看两个文件的内容,再回头想想最开始的Partition,应该是清楚了其中的奥秘吧。

如果你在你自己的环境中运行的WordCount只有part-r-00000一个文件的话,那应该是因为你使用的是默认设置,默认一个Job只有一个Reducer。

如果你想设置两个,你可以:

1.在源代码中加入 job.setNumReduceTasks(2),设置这个Job的Reducer为两个。

或者

2.mapred-site.xml中设置下面参数并重启服务

[html] view plain copy

  1. <property>
  2. <name>mapred.reduce.tasks</name>
  3. <value>2</value>
  4. </property>

如果在配置文件中设置,整个集群都会默认使用两个Reducer了。

结束语:

本文大致描述了一下MapReduce的整个过程以及每个阶段所做的事情,并没有涉及具体的Job,resource的管理和控制,因为那个是第一代MapReduce框架和Yarn框架的主要区别。而两代框架中上述MapReduce的原理是差不多的。

注:文章来自 这里。另外这篇文章并没有说清楚Map端环形缓冲区spill的过程,详情请参考:MapReduce Shuffle详解

时间: 2024-10-17 05:53:44

MapReduce过程详解的相关文章

MapReduce 过程详解 (用WordCount作为例子)

本文转自 http://www.cnblogs.com/npumenglei/ .... 先创建两个文本文件, 作为我们例子的输入: File 1 内容: My name is Tony My company is pivotal File 2 内容: My name is Lisa My company is EMC 1. 第一步, Map 顾名思义, Map 就是拆解. 首先我们的输入就是两个文件, 默认情况下就是两个split, 对应前面图中的split 0, split 1 两个spli

Hadoop MapReduce执行过程详解(带hadoop例子)

https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任

Hadoop学习之MapReduce执行过程详解

转自:http://my.oschina.net/itblog/blog/275294 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我

MapReduce阶段源码分析以及shuffle过程详解

MapReducer工作流程图: 1. MapReduce阶段源码分析 1)客户端提交源码分析 解释:   - 判断是否打印日志   - 判断是否使用新的API,检查连接   - 在检查连接时,检查输入输出路径,计算切片,将jar.配置文件复制到HDFS   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片   - 通过block大小和最小切片数.最大切片数计算出切片大小  

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

使用HeartBeat实现高可用HA的配置过程详解

使用HeartBeat实现高可用HA的配置过程详解 一.写在前面 HA即(high available)高可用,又被叫做双机热备,用于关键性业务.简单理解就是,有2台机器 A 和 B,正常是 A 提供服务,B 待命闲置,当 A 宕机或服务宕掉,会切换至B机器继续提供服务.常见的实现高可用的开源软件有 heartbeat 和 keepalived. 这样,一台 web 服务器一天24小时提供web服务,难免会存在 web 服务挂掉或服务器宕机宕机的情况,那么用户就访问不了服务了,这当然不是我们期望

Nginx实现集群的负载均衡配置过程详解

Nginx实现集群的负载均衡配置过程详解 Nginx 的负载均衡功能,其实实际上和 nginx 的代理是同一个功能,只是把代理一台机器改为多台机器而已. Nginx 的负载均衡和 lvs 相比,nginx属于更高级的应用层,不牵扯到 ip 和内核的修改,它只是单纯地把用户的请求转发到后面的机器上.这就意味着,后端的 RS 不需要配置公网. 一.实验环境 Nginx 调度器 (public 172.16.254.200 privite 192.168.0.48)RS1只有内网IP (192.168

加密 解密过程详解及openssl自建CA &nbsp;

            加密 解密过程详解及openssl自建CA 为了数据信息能够安全的传输要求数据要有一定的安全性那么数据的安全性包含哪些方面的特性呢?    NIST(美国信息安全署)做了如下的定义:    保密性:       1,数据的保密性 指的是数据或隐私不向非授权者泄漏                   2,隐私性  信息不被随意的收集    完整性:       1,数据的的完整性:信息或程序只能被指定或授权的方式改变不能被随意的             修改        

SVN中基于Maven的Web项目更新到本地过程详解

环境 MyEclipse:10.7 Maven:3.1.1 概述 最近在做项目的时候,MyEclipse下载SVN上面基于Maven的Web项目总是出现很多问题,有时候搞了很半天,Maven项目还是出现叉号,最后总结了方法步骤,终于可以将出现的问题解决,在此,将重现从SVN上将基于Maven的Web项目变成本地MyEclipse中项目的过程,问题也在其中进行解决. 问题补充 在使用Myeclipse的部署Web项目的时候,在点击部署按钮的时候,没有任何反应,在此提供两种解决方法,问题如图1所示: