Hadoop - MapReduce MRAppMaster-剖析

一 概述

MRv1主要由编程模型(MapReduce API)、资源管理与作业控制块(由JobTracker和TaskTracker组成)和数据处理引擎(由MapTask和ReduceTask组成)三部分组成。而YARN出现之后,资源管理模块则交由YARN实现,这样为了让MapReduce框架运行在YARN上,仅需要一个ApplicationMaster组件完成作业控制模块功能即可,其它部分,包括编程模型和数据处理引擎等,可直接采用MRv1原有的部分。

二 MRAppMaster组成

MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce应用程序可以直接运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括作业管理、资源申请与再分配、Container启动与释放、作业恢复等。

MRAppMaster 主要由已下几种组件/服务组成:

ConainterAllocator

与RM通信,为MapReduce作业申请资源。作业的每个任务资源需求可描述为5元组:

<Priority,hostname,capacity,containers,relax_locality>,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存和CPU两种资源)、Container数据是否松弛本地化

ClientService

ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,客户端可以通过该协议获取作业的执行状态(不必通过RM)和控制作业(比如杀死作业、改变作业优先级等)。

Job

表示一个MapReduce作业,与MRv1中的JobInProgress功能是一样的,负责监控作业的运行状态。它维护了一个作业的状态机,以实现异步执行各种作业相关的操作。

Task

表示一个MapReduce作业的某个任务,与MRv1中的TaskInProgress功能类似,负责监控一个任务的运行状态。它维护了一个任务状态机,以实现异步执行各种任务相关的操作。

TaskAttempt

表示一个任务运行实例,它的执行逻辑与MRV1中的MapTask和ReduceTask运行实例完全一致,实际上,它直接使用了MRv1中的数据处理引擎,但经过了一些优化。

TaskCleaner

负责清理失败任务或被杀死任务使用的目录和产生的临时结果(统称为垃圾数据),它维护了一个线程池和一个共享队列,异步删除任务产生的垃圾数据。

Speculator

完成推测执行功能,当同一个作业的某个任务运行速度明显慢于其他任务时,会为该任务启动一个备份任务。

ContainerLauncher

负责与NM通信,以启动一个Container.当RM为作业分配资源后,ContainerLauncher会将任务执行相关信息填充到Container中,包括任务运行所需资源、任务运行命令、任务运行环境、任务依赖的外部文件等,然后与对应的NodeManager通信,要求它启动Container.

TaskAttemptListener

负责管理各个任务的心跳信息,如果一个任务一段时间内未汇报心跳,则认为它死掉了,会将其从系统中移除。

JobHistoryEventHandler

负责对作业的各个事件记录日志。当MRApMaster出现故障时,YARN会将其重新调度到另一个节点上。未了避免重新计算,MRAppMaster首先从HDFS上读取上次运行产生的日志,以恢复已经完成的任务,进而能够只运行尚未运行完成的任务。

三 MapReduce客户端

MapReduce客户端是MapReduce用户与YARN进行通信的唯一途径,通过该客户端,用户可以向YARN提交作业,获取作业的运行状态和控制作业(比如杀死作业、杀死任务等).MapReduce客户端涉及两个RPC通信协议:

1.ApplicationClientProtol

在YARN中,RM实现了ApplicationClientProtocol协议,任何客户端需要使用该协议完成提交作业、杀死作业、改变作业的优先级等操作。

2.MRClientProtocol

当作业的ApplicationMaster成功启动后,它会启动MRClientService服务,该服务实现了MRClientProtoclo协议,从而允许客户端直接通过该协议与ApplicationMater通信以控制作业和查询作业运行状态,以减轻ResourceManager负载。

四 MRAppMaster工作流程

按照作业的大小不同,MRAppMaster提供了三种作业运行模式:

本地模式(通常用于作业调试,同MRv1一样,不再赘述)、Uber模式和Non-Uber模式。

对于小作业为了降低延迟,可采用Uber模式,在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在的Container)中顺次执行;对于大作业,则采用Non-Uber模式,在该模式下,MRAppMaster先为Map Task申请资源,当Ma Task运行完成数目达到一定比例之后再为Reduce Task申请资源。

对于Map Task而言,它的生命周期为Scheduled->assigned->completed;

而对于Reduce Task而言,它的生命周期为pending->scheduled->assigned->completed.

在YARN之上运行MapReduce作业需要解决两个关键问题:如何确定Reduce Task启动时机以及如何完成Shuffle功能。

为了避免Reduce Task过早启动造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态,以便能够根据Map Task运行情况决定是否对其进行调度。

MRAppMaster在MRv1原有策略基础之上添加了更为严格的资源控制策略和抢占策略。在YARN中,NodeManager作为一种组合服务模式,允许动态加载应用程序临时需要的附属服务,利用这一特性,YARN将Shuffle HTTP Sever组成一种服务,以便让各个NodeManager启动时加载它。

当用户向YARN提交一个MapReduce应用程序后,YARN 将分两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MARppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。

步骤1 用户向YARN中(RM)提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

步骤2 ResourceManager为该应用程序分配第一个Container,ResouceManage与某个NodeManager通信,启动应用程序ApplicationMaster,NodeManager接到命令后,首先从HDFS上下载文件(缓存),然后启动ApplicationMaser。

当ApplicationMaster启动后,它与ResouceManager通信,以请求和获取资源。ApplicationMaster获取到资源后,与对应的NodeManager通信以启动任务。

注:1.如果该应用程序第一次在给节点上启动任务,则NodeManager首先从HDFS上下载文件缓存到本地,这个是由分布式缓存实现的,然后启动该任务。

2. 分布式缓存并不是将文件缓存到集群中各个结点的内存中,而是将文件换到各个结点的磁盘上,以便执行任务时候直接从本地磁盘上读取文件。

步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤4~7。

步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

步骤5 一旦ApplicationMaster申请到资源后,ApplicationMaster就会将启动命令交给NodeManager,要求它启动任务。启动命令里包含了一些信息使得Container可以与Application Master进行通信。

步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务(Container)。

步骤7   在Container内执行用户提交的代码,各个Container通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。

步骤8  在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。

步骤9 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己

五 MRAppMaster 生命周期

MRAppMaster根据InputFormat组件的具体实现(通常是根据数据量切分数据),将作业分解成若干个Map Task和Reduce Task,其中每个Map Task 负责处理一片Inputsplit数据,而每个Reduce Task则进一步处理Map Task产生的中间结果。每个Map/Reduce Task只是一个具体计算任务的描述,真正的任务计算工作则是由运行实例TaskAttempt完成的,每个Map/Reduce
Task可能顺次启动多个运行实例,比如第一个运行实例失败了,则另起一个新的实例重新计算,直到这一份数据处理完成或者尝试次数达到上限。

Job状态机

Job状态机维护了一个MapReduce应用程序的生命周期,即从提交到运行结束的整个过程。一个Job由多个Map Task和Reduce Task构成,而Job状态机负责管理这些任务。Job状态机由类JobImpl实现。

Task状态机

Task维护了一个任务的生命周期,即从创建到运行结束整个过程。一个任务可能存在多次运行尝试,每次运行尝试被称为一个“运行实例”,Task状态机负责管理这些运行实例。Task状态机由TaskImpl实现。

注意:1.MRAppMaster为任务申请到资源后,与对应的NodeManager通信成功启动Container。需要注意的是,在某一个时刻,一个任务可能有多个运行实例,且可能存在运行失败的实例,但是只要有一个实例运行成功,则意味着该任务运行完成。

2. 每个任务的运行实例数目都有一定上限,一旦超过该上限,才认为该任务运行失败,其中Map Task运行实例数目上限默认位4,Reduce Task运行实例默认也是 4.一个任务的失败并不一定导致整个作业运行失败,这取决于作业的错误容错率。

TaskAttempt状态机

TaskAttempt状态机维护了 一个任务运行实例的生命周期,即从创建到运行结束整个过程。它由TaskAttempImpl类实现。

在YARN 中,任务实例是运行在Container中的,因此,Container状态的变化往往伴随任务实例的状态变化,比如任务实例运行完成后,会清理Container占用的空间,而Container空间的清理实际上就是任务实例空间的清理。任务实例运行完后,需向MRAppMaster请求提交最终结果,一旦提交完成后,该任务的其它实例就将被杀死。

总结一个作业的执行过程大致如下:

创建实例=》MRApMaster向ResourceManager申请资源=》获得Container=》启动Container(运行实例)=》提交运行结果=》清理结果

当一个Container运行结束后,MRAppMaster可直接从ResourceManager上获知。各个任务运行实例需定期向MRAppMaster汇报进度和状态,否则MRAppMaster认为该任务处于僵死状态,会将它杀死,每次汇报均会触发一个TA_UPDATE事件。

注:1.MRAppMaster可以由两条路径来得知Conainer的当前运行状态:

a. 通过ResourceManager(MRAppMaster与ResouceManager中维护一个心跳信息)

b. 另一个是直接通过Task Attempt(每个Task Attempt与MRAppMaster之间有专用的协议)

2. 这两条路径是独立的,没有先后顺序之分,如果MRAppMaster直接从ResouceManager获取Container运行完成信息,则任务实例直接从Running转化为SUCCESS_CONTAINER_CLEANUP状态,如果首先从TaskAttempt中获知任务完成信息。则将首先转化为COMMIT_PENDING状态,然后再转化为SUCCESS_CONTAINER_CLEANUP状态。

当任务执行失败或者被杀死时,需清理它占用的磁盘空间和产生的结果。当Reduce Task远程复制一个已经运行完成的Map Task输出数据时,可能因为磁盘或者网络等原因,导致数据损坏或者数据丢失,这是会触发一个TA_TOO_MANY_FETCH_FAILURE事件,从而触发MRAppMaster重新调度执行该Map Task.

六  资源申请和再分配

ContainerAllocator是MRAppMaster中负责资源申请和分配的模块。用户提交的作业被分解成Map Task和Reduce Task后,这些Task所需的资源统一由ContainerAllocator模块负责从ResourceManager中申请,而一旦ContainAllocator得到资源后,需采用一定的策略进一步分配给作业的各个任务。

在YARN中,作业的资源描述可以被描述为五元组:priority,hostname,capabiity,containers,relax_locality分别表示 作业优先级    期望资源所在的host  资源量(当前支持内存与CPU两种资源) 、Containers数目  是否松弛本地化。例如:

<10,"node1","memeory:1G,CPU:1",3,true)//  优先级是一个正整数,优先级值越小,优先级越高

ContainerAllocator周期性的通过心跳与ResourceManager通信,以获取已经分配的Contaienr列表,完成的Container列表、最近更新的节点*+列表等信息,而ContanerAllocator根据这些信息完成相应的操作。

当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列的Map Task和TaskReduce  Task任务,由于Reduce Task依赖于Map Task之间的结果,所以Reduce Task会延后调度。

任务状态描述

Map:  scheduled->assigned->completed

Task:  pending-> scheduled->assigned->completed

pending 表示等待ContainerAllocator发送资源请求的集合

scheduled 标识已经发送了资源申请给RM,但还没收到分配的资源的任务集合

assignd 已经受到RM分配的资源的任务集合

complet 表示已完成的任务集合

三种作业状态:Failed Map Task ,Map Task,Reduce Task分别赋予它们优先级5 20 10也就是说,当三种任务同时有资源请求的时候,会优先分配给Failed Map Task,然后是Reduce Task,最后是Map Task.

如果一个任务运行失败,则会重新为该任务申请资源

如果一个任务运行速度过慢,则会为其额外申请资源已启动备份任务(如果启动了推测执行过程)

如果一个节点的失败任务数目过多,则会撤销对该节点的所有资源的申请请求。

注:在大多数数的情况下,RMAppMaster与RM的心跳信息都是空的,即心跳信息不包含新的资源请求信息,这种心跳信息有一下几个作用:

1. 周期性发送心跳,告诉RM自己还活着

2. 周期性询问RM,以获取新分配的资源和各个Container运行状况。

资源再分配

一旦MRAppMaster收到新分配的Container后,会将这些Container进一步分配给各个任务,Container分配过程如下:

1.判断新收到的Container包含的资源是否满足,如果不满足,则通过下次心跳通知ResourceManager释放该Container.

2.判断收到的Container所在的节点是否被加入到黑名单中,如果是,则寻找一个与该Container匹配的任务,并重新为该任务申请资源,同时通过下次心跳通知ResourceManager释放该Container.

3.根据Container的优先级,将它分配给对应类型的任务。

七 Contianer启动和释放

当ContainerAllocator为某个任务申请到资源后,会将运行该任务相关的所有信息封装到Container中,并要求对应的节点启动该Container。需要注意的是,Container中运行的任务对应的数据处理引擎与MRv1中完全一致,仍为Map
Task和 Reduce Task。正因为如此,MRv1的程序与YARN中的MapReduce程序完全兼容。

ContainerLaunche负责与各个NodeManager通信,已启动或者释放Container。在YARN中,运行的Task所需的全部信息被封装到Container中,包括所需的资源、依赖的外部文件、JAR包、运行时环境变量、运行命令等。ContainerLauncher通过RPC协议ContainerManager与NodeManager通信,以控制Container的启动和释放,进而控制任务的执行(比如启动任务、杀死任务等)。

有多种可能触发停止/杀死一个Container,常见的有:

1.推测执行时一个任务运行完成,需杀死另一个相同输入数据的任务。

2.用户发送一个杀死任务请求。

3.任意一个任务运行结束时,YARN会触发一个杀死任务的命令,以释放对应的Container占用的资源。

八 推测执行机制

为了防止执行速度慢的任务拖慢整体的执行进度,使用推测执行机制,Hadoop会为该任务启动一个备份任务,让该备份任务与原始任务同时处理同一份数据,谁先运行完,则将谁的结果作为最终结果。

注:1.每个任务最多只能有一个备份任务实例

     2. 启动备份的时候,必须保证已经有足够多的Map任务已经完成,根据这些完成的任务来估算是否来启动备份任务。

这种算法的优点是可最大化备份任务的有效率,其中有效率指有效备份任务数与所有备份任务数的比值,有效任务是指完成时间早于原始任务完成时间的备份任务(即带来实际收益的备份任务)。备份任务的有效率越高,推测执行算法越优秀,带来的收益也就越大。

推测执行机制实际上采用了经典的算法优化方法,以空间换时间,它同时启动多个相同的任务处理相同的数据,并让这些任务竞争以缩短数据的处理时间。

八 作业恢复

从作业恢复粒度角度来看,当前存在三种不同级别的恢复机制,级别由低到高依次是作业级别、任务级别和记录级别,其中级别越低实现越简单,但造成的资源浪费也越严重。当前MRAppMaster采用了任务级别的恢复机制,即以任务为基本单位进行恢复,这种机制是基于事务型日志完成作业恢复的,它只关注两种任务:运行完成的任务和未完成的任务。作业执行过程中,MRAppMaster会以日志的形式将作业以及状态记录下来,一旦MRAppMaster重启,则可从日志中恢复作业的运行状态。

当前MRAppMaster的作业恢复机制仅能做到恢复上一次已经运行完成的任务,对于正在运行的任务,则在前一次MRAppMaster运行实例退出时由ResourceManager强制将其杀死并回收资源。

MRAppMaster采用了开源数据序列化工具Apache Avro记录这些事件。Avro是一个数据序列化系统,通常用于支持大批数据交换和跨语言RPC的应用。

九 MRv1与MRv2简单对比

MRAppMaster仍采用了MRv1中的数据处理引擎,分别由数据处理引擎MapTask和ReduceTask完成Map任务和Reduce任务的处理。

MRv1与MRv2的比较

MRv2中在Map端 用Netty代替Jetty. Reduce端采用批拷贝、shuffle和排序插件化

应用程序编程接口                新旧API                                                      新旧API

运行时环境              由JobTracker与TaskTracker组成    YARN (由RM和NM组成)和MRAppMaster

数据处理引擎                   MapTask/Reduce Task                                   MapTask/Reduce Task

需要注意的是,YARN并不会改变MapReduce编程模型,它只是应用开发人员使用的API。YARN提供了一种新的资源管理模型和实现,用来 执行MapReduce任务。因此,在最简单的情况下,现有的MapReduce应用仍然能照原样运行(需要重新编译),YARN只不过能让开发人员更精
确地指定执行参数。

十 小结

MapRecuce On YARN的运行时环境由YARN与ApplicationMaster构成,这种新颖的运行时环境使得MapReduce可以与其他计算框架运行在一个集群中,从而达到共享集群资源、提高资源利用率的目的。随着YARN的程序与完善,MRv1的独立运行模式将被MapRedcue
On YARN取代。

Hadoop - MapReduce MRAppMaster-剖析

时间: 2024-10-27 01:27:37

Hadoop - MapReduce MRAppMaster-剖析的相关文章

hadoop MapReduce Yarn运行机制

原 Hadoop MapReduce 框架的问题 原hadoop的MapReduce框架图 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败.重启等操作. TaskTracker 是 Ma

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop MapReduce编程 API入门系列之处理Excel通话记录(二十)

不多说,直接上代码. 与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示.我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件夹. 2016-12-12 20:04:10,203 INFO [zhouls.bigdata.myMapReduce.ExcelContactCount.ExcelContactCount$ExcelMapper] - Map processing finished2016-12-12 20:04:10,203 I

Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.friend; import org.apache.hadoop.io.Text; public class Fof extends Text{//自定义Fof,表示f1和f2关系 public Fof(){//无参构造 super(); } public Fof(String a,String b){//有参构造 super(getFof(a, b)); } public static Strin

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop MapReduce Next Generation - Setting up a Single Node Cluster

Hadoop MapReduce Next Generation - Setting up a Single Node Cluster. Purpose This document describes how to set up and configure a single-node Hadoop installation so that you can quickly perform simple operations using Hadoop MapReduce and the Hadoop

使用Python实现Hadoop MapReduce程序

转自:使用Python实现Hadoop MapReduce程序 英文原文:Writing an Hadoop MapReduce Program in Python 根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce程序. 尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C+

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)

不多说,直接上代码. 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件. 代码 package zhouls.bigdata.myMapReduce.flowsum; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableCompa