YARN源码分析(一)-----ApplicationMaster

前言

在之前两周主要学了HDFS中的一些模块知识,其中的许多都或多或少有我们借鉴学习的地方,现在将目光转向另外一个块,被誉为MRv2,就是yarn,在Yarn中,解决了MR中JobTracker单点的问题,将此拆分成了ResourceManager和NodeManager这样的结构,在每个节点上,还会有ApplicationMaster来管理应用程序的整个生命周期,的确在Yarn中,多了许多优秀的设计,而今天,我主要分享的就是这个ApplicationMaster相关的一整套服务,他是隶属于ResoureManager的内部服务中的.了解了AM的启动机制,你将会更进一步了解Yarn的任务启动过程.

ApplicationMaster管理涉及类

ApplicationMaster管理涉及到了4大类,ApplicationMasterLauncher,AMLivelinessMonitor,ApplicationMasterService,以及ApplicationMaster自身类.下面介绍一下这些类的用途,在Yarn中,每个类都会有自己明确的功能模块的区分.

1.ApplicationMasterLauncher--姑且叫做AM启动关闭事件处理器,他既是一个服务也是一个处理器,在这个类中,只处理2类事件,launch和cleanup事件.分别对应启动应用和关闭应用的情形.

2.AMLivelinessMonitor--这个类从名字上可以看出他是监控类,监控的对象是AM存活状态的监控类,检测的方法与之前的HDFS一样,都是采用heartbeat的方式,如果有节点过期了,将会触发一次过期事件.

3.ApplicationMasterService--AM请求服务处理类.AMS存在于ResourceManager,中,服务的对象是各个节点上的ApplicationMaster,负责接收各个AM的注册请求,更新心跳包信息等.

4.ApplicationMaster--节点应用管理类,简单的说,ApplicationMaster负责管理整个应用的生命周期.

简答的描述完AM管理的相关类,下面从源码级别分析一下几个流程.

AM启动

要想让AM启动,启动的背景当然是有用户提交了新的Application的时候,之后ApplicationMasterLauncher会生成Launch事件,与对应的nodemanager通信,让其准备启动的新的AM的Container.在这里,就用到了ApplicationMasterLauncher这个类,之前在上文中已经提到,此类就处理2类事件,Launch启动和Cleanup清洗事件,先来看看这个类的基本变量设置

//Application应用事件处理器
public class ApplicationMasterLauncher extends AbstractService implements
    EventHandler<AMLauncherEvent> {
  private static final Log LOG = LogFactory.getLog(
      ApplicationMasterLauncher.class);
  private final ThreadPoolExecutor launcherPool;
  private LauncherThread launcherHandlingThread;

  //事件队列
  private final BlockingQueue<Runnable> masterEvents
    = new LinkedBlockingQueue<Runnable>();
  //资源管理器上下文
  protected final RMContext context;

  public ApplicationMasterLauncher(RMContext context) {
    super(ApplicationMasterLauncher.class.getName());
    this.context = context;
    //初始化线程池
    this.launcherPool = new ThreadPoolExecutor(10, 10, 1,
        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
    //新建处理线程
    this.launcherHandlingThread = new LauncherThread();
  }

还算比较简单,有一个masterEvents事件队列,还有执行线程以及所需的线程池执行环境。在RM相关的服务中,基本都是继承自AbstractService这个抽象服务类的。ApplicationMasterLauncher中主要处理2类事件,就是下面的展示的

@Override
  public synchronized void  handle(AMLauncherEvent appEvent) {
    AMLauncherEventType event = appEvent.getType();
    RMAppAttempt application = appEvent.getAppAttempt();
    //处理来自ApplicationMaster获取到的请求,分为启动事件和清洗事件2种
    switch (event) {
    case LAUNCH:
      launch(application);
      break;
    case CLEANUP:
      cleanup(application);
    default:
      break;
    }
  }

然后调用具体的实现方法,以启动事件launch事件为例

//添加应用启动事件
  private void launch(RMAppAttempt application) {
    Runnable launcher = createRunnableLauncher(application,
        AMLauncherEventType.LAUNCH);
    //将启动事件加入事件队列中
    masterEvents.add(launcher);
  }

这些事件被加入到事件队列之后,是如何被处理的呢,通过消息队列的形式,在一个独立的线程中逐一被执行

//执行线程实现
  private class LauncherThread extends Thread {

    public LauncherThread() {
      super("ApplicationMaster Launcher");
    }

    @Override
    public void run() {
      while (!this.isInterrupted()) {
        Runnable toLaunch;
        try {
          //执行方法为从事件队列中逐一取出事件
          toLaunch = masterEvents.take();
          //放入线程池池中进行执行
          launcherPool.execute(toLaunch);
        } catch (InterruptedException e) {
          LOG.warn(this.getClass().getName() + " interrupted. Returning.");
          return;
        }
      }
    }
  }

如果论到事件的具体执行方式,就要看具体AMLauch是如何执行的,AMLauch本身就是一个runnable实例。

/**
 * The launch of the AM itself.
 * Application事件执行器
 */
public class AMLauncher implements Runnable {

  private static final Log LOG = LogFactory.getLog(AMLauncher.class);

  private ContainerManagementProtocol containerMgrProxy;

  private final RMAppAttempt application;
  private final Configuration conf;
  private final AMLauncherEventType eventType;
  private final RMContext rmContext;
  private final Container masterContainer;

在里面主要的run方法如下,就是按照事件类型进行区分操作

@SuppressWarnings("unchecked")
  public void run() {
  	//AMLauncher分2中事件分别处理
    switch (eventType) {
    case LAUNCH:
      try {
        LOG.info("Launching master" + application.getAppAttemptId());
        //调用启动方法
        launch();
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));
      ...
      break;
    case CLEANUP:
      try {
        LOG.info("Cleaning master " + application.getAppAttemptId());
        //调用作业清洗方法
        cleanup();
      ...
      break;
    default:
      LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
      break;
    }
  }

后面的launch操作会调用RPC函数与远程的NodeManager通信来启动Container。然后到了ApplicationMaster的run()启动方法,在启动方法中,会进行应用注册的方法,

@SuppressWarnings({ "unchecked" })
  public boolean run() throws YarnException, IOException {
    LOG.info("Starting ApplicationMaster");

    Credentials credentials =
        UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    // Now remove the AM->RM token so that containers cannot access it.
    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      Token<?> token = iter.next();
      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    //与ResourceManager通信,周期性发送心跳信息,包含了应用的最新信息
    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(conf);
    amRMClient.start();
    .....

    // Register self with ResourceManager
    // This will start heartbeating to the RM
    //启动之后进行AM的注册
    appMasterHostname = NetUtils.getHostname();
    RegisterApplicationMasterResponse response = amRMClient
        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
            appMasterTrackingUrl);
    // Dump out information about cluster capability as seen by the
    // resource manager
    int maxMem = response.getMaximumResourceCapability().getMemory();
    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

在这个操作中,会将自己注册到AMLivelinessMonitor中,此刻开始启动心跳监控。

AMLiveLinessMonitor监控

在这里把重心从ApplicationMaster转移到AMLivelinessMonitor上,首先这是一个激活状态的监控线程,此类线程都有一个共同的父类

//应用存活状态监控线程
public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {

在AbstractlinessMonitor中定义监控类线程的一类特征和方法

//进程存活状态监控类
public abstract class AbstractLivelinessMonitor<O> extends AbstractService {

  private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);

  //thread which runs periodically to see the last time since a heartbeat is
  //received.
  //检查线程
  private Thread checkerThread;
  private volatile boolean stopped;
  //默认超时时间5分钟
  public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
  //超时时间
  private int expireInterval = DEFAULT_EXPIRE;
  //监控间隔检测时间,为超时时间的1/3
  private int monitorInterval = expireInterval/3;

  private final Clock clock;

  //保存了心跳检验的结果记录
  private Map<O, Long> running = new HashMap<O, Long>();

心跳检测本身非常的简单,做一次通信记录检查,然后更新一下,记录时间,当一个新的节点加入监控或解除监控操作

//新的节点注册心跳监控
  public synchronized void register(O ob) {
    running.put(ob, clock.getTime());
  }

  //节点移除心跳监控
  public synchronized void unregister(O ob) {
    running.remove(ob);
  }

每次做心跳周期检测的时候,调用下述方法

//更新心跳监控检测最新时间
  public synchronized void receivedPing(O ob) {
    //only put for the registered objects
    if (running.containsKey(ob)) {
      running.put(ob, clock.getTime());
    }
  }

非常简单的更新方法,O ob对象在这里因场景而异,在AM监控中,为ApplicationID应用ID。在后面的AMS和AM的交互中会看到。新的应用加入AMLivelinessMonitor监控中后,后面的主要操作就是AMS与AM之间的交互操作了。

AM与AMS

在ApplicationMaster运行之后,会周期性的向ApplicationMasterService发送心跳信息,心跳信息包含有许多资源描述信息。

//ApplicationMaster心跳信息更新
  @Override
  public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {

    ApplicationAttemptId appAttemptId = authorizeRequest();
    //进行心跳信息时间的更新
    this.amLivelinessMonitor.receivedPing(appAttemptId);
    ....

每次心跳信息一来,就会更新最新监控时间。在AMS也有对应的注册应用的方法

  //ApplicationMaster在ApplicationMasterService上服务上进行应用注册
  @Override
  public RegisterApplicationMasterResponse registerApplicationMaster(
      RegisterApplicationMasterRequest request) throws YarnException,
      IOException {

    ApplicationAttemptId applicationAttemptId = authorizeRequest();

    ApplicationId appID = applicationAttemptId.getApplicationId();
    .....

      //在存活监控线程上进行心跳记录,更新检测时间,key为应用ID
      this.amLivelinessMonitor.receivedPing(applicationAttemptId);
      RMApp app = this.rmContext.getRMApps().get(appID);

      // Setting the response id to 0 to identify if the
      // application master is register for the respective attemptid
      lastResponse.setResponseId(0);
      responseMap.put(applicationAttemptId, lastResponse);
      LOG.info("AM registration " + applicationAttemptId);
      this.rmContext

如果在心跳监控中出现过期的现象,就会触发一个expire事件,在AMLiveLinessMonitor中,这部分的工作是交给CheckThread执行的

//进程存活状态监控类
public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
  ...
  //thread which runs periodically to see the last time since a heartbeat is
  //received.
  //检查线程
  private Thread checkerThread;
  ....
  //默认超时时间5分钟
  public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
  //超时时间
  private int expireInterval = DEFAULT_EXPIRE;
  //监控间隔检测时间,为超时时间的1/3
  private int monitorInterval = expireInterval/3;
  ....
  //保存了心跳检验的结果记录
  private Map<O, Long> running = new HashMap<O, Long>();
  ...

  private class PingChecker implements Runnable {

    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        synchronized (AbstractLivelinessMonitor.this) {
          Iterator<Map.Entry<O, Long>> iterator =
            running.entrySet().iterator();

          //avoid calculating current time everytime in loop
          long currentTime = clock.getTime();

          while (iterator.hasNext()) {
            Map.Entry<O, Long> entry = iterator.next();
            //进行超时检测
            if (currentTime > entry.getValue() + expireInterval) {
              iterator.remove();
              //调用超时处理方法,将处理事件交由调度器处理
              expire(entry.getKey());
              LOG.info("Expired:" + entry.getKey().toString() +
                      " Timed out after " + expireInterval/1000 + " secs");
            }
          }
        }

check线程主要做的事件就是遍历每个节点的最新心跳更新时间,通过计算差值进行判断是否过期,过期调用expire方法。此方法由其子类实现

//应用存活状态监控线程
public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
  //中央调度处理器
  private EventHandler dispatcher;
  ...

  @Override
  protected void expire(ApplicationAttemptId id) {
  	 //一旦应用过期,处理器处理过期事件处理
    dispatcher.handle(
        new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
  }
}

产生应用超期事件,然后发给中央调度器去处理。之所以采用的这样的方式,是因为在RM中,所有的模块设计是以事件驱动的形式工作,最大程度的保证了各个模块间的解耦。不同模块通过不同的事件转变为不同的状态,可以理解为状态机的改变。最后用一张书中的截图简单的展示AM模块相关的调用过程。

全部代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。

参考文献

《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-12 12:45:27

YARN源码分析(一)-----ApplicationMaster的相关文章

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点. 通过MRAppMaster类的定义我们就能看出

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括: 1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo: 2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks: 3.确定

YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复

前言 任何系统即使做的再大,都会有可能出现各种各样的突发状况.尽管你可以说我在软件层面上已经做到所有情况的意外处理了,但是万一硬件出问题了或者说物理层面上出了问题,恐怕就不是多写几行代码能够立刻解决的吧,说了这么多,无非就是想强调HA,系统高可用性的重要性.在YARN中,NameNode的HA方式估计很多人都已经了解了,那本篇文章就来为大家梳理梳理RM资源管理器HA方面的知识,并不是指简单的RM的HA配置,确切的说是RM的应用状态存储于恢复. RM应用状态存储使用 RM应用状态存储是什么意思呢,

YARN源码分析(四)-----Journalnode

前言 最近在排查公司Hadoop集群性能问题时,发现Hadoop集群整体处理速度非常缓慢,平时只需要跑几十分钟的任务时间一下子上张到了个把小时,起初怀疑是网络原因,后来证明的确是有一部分这块的原因,但是过了没几天,问题又重现了,这次就比较难定位问题了,后来分析hdfs请求日志和Ganglia的各项监控指标,发现namenode的挤压请求数持续比较大,说明namenode处理速度异常,然后进而分析出是因为写journalnode的editlog速度慢问题导致的,后来发现的确是journalnode

Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)

v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了四件事: 1.通过设置作业Job的成员变量setupProgress为1,标记作业setup已完成: 2.调度作业Job的Map Task: 3.调度作业的JobReduce Task: 4.如果没有task了,则生成J

Yarn源码分析之事件异步分发器AsyncDispatcher

AsyncDispatcher是Yarn中事件异步分发器,它是ResourceManager中的一个基于阻塞队列的分发或者调度事件的组件,其在一个特定的单线程中分派事件,交给AsyncDispatcher中之前注册的针对该事件所属事件类型的事件处理器EventHandler来处理.每个事件类型类可能会有多个处理渠道,即多个事件处理器,可以使用一个线程池调度事件.在Yarn的主节点ResourceManager中,就有一个Dispatcher类型的成员变量rmDispatcher,定义如下: pr

Hadoop2源码分析-YARN RPC 示例介绍

1.概述 之前在<Hadoop2源码分析-RPC探索实战>一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制.下面是今天的分享目录: YARN的RPC介绍 YARN的RPC示例 截图预览 下面开始今天的内容分享. 2.YARN的RPC介绍 我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口.客户端实现及服务端实现.如下图所示: 图中是Hadoop的RPC的一个类的关系图,大家可以到<Hado

MapReduce源码分析之JobSubmitter(一)

JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter. 首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 private FileSystem

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.