YARN中MRAppMaster的事件驱动模型与状态机处理消息过程的分析

在MRv1中,对象之间的作用关系是基于函数调用实现的,当一个对象向另外一个对象传递消息时,会直接采用函数调用的方式,并且这个过程是串行的。比如,当TaskTracker需要执行一个Task的时候,将首先下载Task依赖的文件(JAR包,二进制文件等,字典文件等),然后执行Task。在整个过程中,下载依赖文件是阻塞式的,也就是说,前一个任务未完成文件下载之前,后一个新任务将一直处于等待状态,只有在下载完成之后,才会启动一个独立进程运行该任务。基于函数调用式的编程模型是低效的,它隐含着整个过程是串行,同步进行的。

相比之下,MRv2引入的时间驱动变成模型则是一种更加高效的方式。在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间通过事件相互关联。每种事件处理一种类型的事件,同时根据需要出发另外一种事件。相比于基于函数调用的编程模型,这种编程方式具有异步并发等特点,更加高效,更加适合大型分布式系统。

下面,我们以负责控制MapReduce作业的AppllicationMaster,也就是MRAppMaster为例,看一下整个事件驱动模型与状态机。

首先,根据源代码,我们可以看到在MRAppMaster中有一个最重要的中央异步消息调度器 AsyncDispatcher,它负责整个MRAppMaster模块的消息调度。除此之外,还有多个其他的消息调度器,比如 JobEventDispatcher, TaskEventDispatcher,TaskAttemptEventDispatcher,SpeculatorEventDispatcher 等等。在MRAppMaster的初始时,在serviceInit() 函数中有以下关键代码

this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
dispatcher.register(CommitterEventType.class, committerEventHandler);

这里的意思是,当中央消息调度器dispatcher如果收到JobEventType的消息后,将会把这个消息转发给名为jobEventDispatcher的JobEventDispatcher的消息调度器。这个JobEventDispatcher是专门用来处理JobEventType事件的消息处理器。底层实现其实就是将一个pair<JobEventType.class, JobEventDispatcher>这个KV放入中央消息调度器dispatcher的HashMap中。剩下的几个消息调度器一样,注册相应的消息类型和其相对应的消息调度器。

之后dispatcher启动一个处理消息线程,可以在下面代码看到中央消息处理器处理消息。

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();   // 从中央消息处理器dispatcher的消息队列中获得消息
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);   // 分发该消息
          }
        }
      }
    };
  }
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();  //获取event的类型

    try{
      EventHandler handler = eventDispatchers.get(type);  // 根据event类型从HashMap中获取对应的消息处理器
      if(handler != null) {
        handler.handle(event);  // 使用获得的消息处理器处理这个event,跳转至相应的处理器实现
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

当Client提交一个Job之后,MRAppMaster创建一个job,并发送一个JOB_INIT事件给中央消息处理器dispatcher。下面我们以JOB_INIT事件为例,看看发生了什么。

首先dispatcher收到JOB_INIT事件之后,根据JOB_INIT的事件类型(JobEventType.class)获得事件的处理器应该是jobEventDispatcher,便将其发送给jobEventDispatcher。

private class JobEventDispatcher implements EventHandler<JobEvent> {
  @SuppressWarnings("unchecked")
  @Override
  public void handle(JobEvent event) {
    ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);  // 由这个job处理这个event
  }
}

JobEventDispatcher处理这个JOB_INIT消息。之后进入JobImpl 的消息处理函数

public void handle(JobEvent event) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Processing " + event.getJobId() + " of type "
        + event.getType());
  }
  try {
    writeLock.lock();
    JobStateInternal oldState = getInternalState();
    try {
       getStateMachine().doTransition(event.getType(), event);   // 获取job的当前的状态机,根据当前消息event进行状态变换。
    } catch (InvalidStateTransitonException e) {                 // 会跳转至 StateMachine 实现的 doTransition()函数。
      LOG.error("Can't handle this event at current state", e);
      addDiagnostic("Invalid event " + event.getType() +
          " on Job " + this.jobId);
      eventHandler.handle(new JobEvent(this.jobId,
          JobEventType.INTERNAL_ERROR));
    }
    //notify the eventhandler of state change
    if (oldState != getInternalState()) {
      LOG.info(jobId + "Job Transitioned from " + oldState + " to "
               + getInternalState());
      rememberLastNonFinalState(oldState);
    }
  }

  finally {
    writeLock.unlock();
  }
}

MRAppMaster消息分发过程到此结束,这之后就进入了Job状态机的操作,当然根据任务的不同,也许会进入Task状态机,TaskAttempt状态机等等。总之,状态机以及状态机中的各种hook进行对应Job/Task/TaskAttempt的控制。

我们知道一个job对应了一个状态机,我们也应该知道在YARN的实现中一个状态机由以下三个部分组成: 1. 状态(节点)  2. 事件(弧)  3. Hook(触发事件后的处理)。

在JobImpl.java文件中,我们可以看到构建job状态机的过程:

protected static final
  StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
     stateMachineFactory
   = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
            (JobStateInternal.NEW)       // 构造JobImpl状态机,初始状态是 NEW 状态

        // Transitions from NEW state
        .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,          // 添加一个状态变化
            JobEventType.JOB_DIAGNOSTIC_UPDATE,
            DIAGNOSTIC_UPDATE_TRANSITION)
        .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,          // 添加一个状态变化
            JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
        .addTransition                                                      // 添加一个状态变化
            (JobStateInternal.NEW,
            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
            JobEventType.JOB_INIT,
            new InitTransition())
        .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
            JobEventType.JOB_KILL,
            new KillNewJobTransition())
        .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
            JobEventType.INTERNAL_ERROR,
            INTERNAL_ERROR_TRANSITION)
        .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
            JobEventType.JOB_AM_REBOOT,
            INTERNAL_REBOOT_TRANSITION)
            ...
            ...   

后面还有很多,job状态机是比较一个复杂的状态机,涉及到很多状态与事件,可以通过 对YARN状态机可视化深入了解,在此不做更多讨论。

调用addTransition()函数可以添加一个状态变化。addTransition()函数中有四个参数,分别为: 1.preState  2.postState  3.eventType 4.hook。  它们表示 1.事件发生前的状态,2.事件发生后的状态, 3.事件, 4.事件发生时触发的hook

代码中  addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()) 这一个状态变化表示,事件发生前是 NEW状态,事件发生后是 INITED状态 或者 FAILED状态,事件是JOB_INIT, hook是InitTransition对象。也就是如果Job状态机当前状态是NEW,这是发生了一个JOB_INIT事件,那么状态机会触发InitTransition对象中的doTransition()函数,如果transition()函数返回
INITED状态,那么状态机最新状态就是 INITED, 如果doTransition()函数返回 FAILED状态,那么状态机最新状态就是FAILED。

那么我们来看一下状态机的状态变换的过程。

private class InternalStateMachine
      implements StateMachine<STATE, EVENTTYPE, EVENT> {
  private final OPERAND operand;
  private STATE currentState;

  InternalStateMachine(OPERAND operand, STATE initialState) {
    this.operand = operand;
    this.currentState = initialState;
    if (!optimized) {
      maybeMakeStateMachineTable();
    }
  }

  @Override
  public synchronized STATE getCurrentState() {
    return currentState;
  }

  @Override
  public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
       throws InvalidStateTransitonException  {
    currentState = StateMachineFactory.this.doTransition                       // 发生状态机变化
        (operand, currentState, eventType, event);
    return currentState;
  }
}
private STATE doTransition
         (OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
    throws InvalidStateTransitonException {
  // We can assume that stateMachineTable is non-null because we call
  //  maybeMakeStateMachineTable() when we build an InnerStateMachine ,
  //  and this code only gets called from inside a working InnerStateMachine .
  Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
    = stateMachineTable.get(oldState);                                // 根据当前状态获取所有该状态有可能发生的事件
  if (transitionMap != null) {
    Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
        = transitionMap.get(eventType);                               // 根据事件获取对应的hook,为一个Transition对象(Transition是一个接口)
    if (transition != null) {
      return transition.doTransition(operand, oldState, event, eventType);  // 调用该对象的doTransition()函数
    }
  }
  throw new InvalidStateTransitonException(oldState, eventType);
}
private class SingleInternalArc
                  implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {

  private STATE postState;
  private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

  SingleInternalArc(STATE postState,
      SingleArcTransition<OPERAND, EVENT> hook) {
    this.postState = postState;
    this.hook = hook;
  }

  @Override
  public STATE doTransition(OPERAND operand, STATE oldState,
                            EVENT event, EVENTTYPE eventType) {
    if (hook != null) {
      hook.transition(operand, event);          // 调用 hook 的 transition()函数
    }
    return postState;
  }
}

这个 stateMachineTable 是一个 Map<state, Map<eventType, Transition>> 数据结构。根据当前状态可以获取这个状态的所有<eventType, Transition>对应关系。之后根据event类型可以获得 Transition对象(也就是一个hook)。那么,状态机就可以由当前状态,事件调用hook函数了。本质上就是  StateMachine.hook(preState,
event) 的形式,因为OO的思想发生了变形。之后就是Transition对象的具体实现了,由于在一开始构造状态机的时候,JobImpl 构造了一个状态变化 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()),这里的hook是 InitTransition 对象。那么hook被调用的时候,就会调用
InitTransition 对象的 transition() 函数。InitTransition 对象的 transition() 函数中包括了很多初始化一个job所做的操作,比如创建MapTask,创建ReduceTask,创建MapTask所需要的输入文件Split 等等,在这里就不做过多叙述,可以自行阅读相关文章。

这个就是YARN的状态机工作的基本流程以及如何与 MRAppMaster的消息分发机制相关联的过程。由于YARN相对于MRv1的诸多改进,现在MRAppMaster取代了过去MRv1中的JobTracker对Job与Task进行控制,结构更加清晰,模块化更加明显。总体而言,MRAppMaster是一个相对比较复杂的模块,需要进行更多的更加仔细分析。

如需转载请表明 转载自 http://blog.csdn.net/gjt19910817/article/details/43441801

时间: 2024-10-04 06:03:27

YARN中MRAppMaster的事件驱动模型与状态机处理消息过程的分析的相关文章

Yarn中的几种状态机

1 概述 为了增大并发性,Yarn采用事件驱动的并发模型,将各种处理逻辑抽象成事件和调度器,将事件的处理过程用状态机表示.什么是状态机? 如果一个对象,其构成为若干个状态,以及触发这些状态发生相互转移的事件,那么此对象称之为状态机. 处理请求作为某种事件发送到系统中,由一个中央调度器传递给对应的事件调度器,进而对事件进行处理,处理完成之后再次发送给中央调度器,再进行处理,直至处理完成. Yarn的资源管理模块ResourceManager,其核心构成就是四类这样的状态机(基于2.4版本),分别是

【事件驱动模型】应用消息队列和状态机改进程序流程

前言 我相信有很多像我一样的小菜朋友在纠结,写程序就像记一本流水账,偶尔用点基础数据结构改进一下程序效率,这完全看不到技术的存在,看不到成长,在下不才,愿做一个敢出头的小菜,分享一下我的体悟,欢迎各路大神来指点.敲打. 正文:我观象山多妩媚 象山本无奇,多情观之现妩媚. 对我们的程序也是这样的,同样的功能要求,大牛看来万种风情,随手拿下:小菜看来欲拒还迎, 直看得心花怒放,却总不得美人心. 比喻不是很恰当,但感觉能说明一些问题:拿到功能需求以后的建模和体系结构的认识决定了小菜无力感,这里和大家讨

spring事件驱动模型--观察者模式在spring中的应用

spring中的事件驱动模型也叫作发布订阅模式,是观察者模式的一个典型的应用,关于观察者模式在之前的博文中总结过,http://www.cnblogs.com/fingerboy/p/5468994.html  这里主要讲一下Spring中的观察者模式的应用. spring事件驱动模型的结构. 首先明确几个spring提供的类的概念 1.ApplicationEvent public abstract class ApplicationEvent extends EventObject { pr

Yarn中几个专用名称

1. ResourceManager(RM)  RM是一个全局的资源管理器,负责整个系统的资源管理和分配.它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM). 调度器  调 度器根据容量.队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序.需要注意的是,该 调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负

11.python并发入门(part13 了解事件驱动模型))

一.事件驱动模型的引入. 在引入事件驱动模型之前,首先来回顾一下传统的流水线式编程. 开始--->代码块A--->代码块B--->代码块C--->代码块D--->......--->结束 每一个代码块里是完成各种各样事情的代码,但编程者知道代码块A,B,C,D...的执行顺序,唯一能够改变这个流程的是数据.输入不同的数据,根据条件语句判断,流程或许就改为A--->C--->E...--->结束.每一次程序运行顺序或许都不同,但它的控制流程是由输入数据和

事件驱动模型Libev(二)

Libev设计思路 理清了Libev的代码结构和主要的数据结构,就可以跟着示例中接口进入到Libev中,跟着代码了解其设计的思路.这里我们管struct ev_loop称作为事件循环驱动器而将各种watcher称为事件监控器. 1.分析例子中的IO事件 这里在前面的例子中我们先把定时器和信号事件的使用注释掉,只看IO事件监控器,从而了解Libev最基本的逻辑.可以结合Gdb设断点一步一步的跟看看代码的逻辑是怎样的. 我们从main开始一步步走.首先执行 struct ev_loop *main_

由Node.js事件驱动模型引发的思考

引言 近段时间听说了Node.js,很多文章表述这个事件驱动模型多么多么优秀,应用在服务器开发中有很大的优势,本身对此十分感性去,决定深入了解一下,由此也引发了一些对程序设计的思考,记录下来. 什么是Node.js Node.js在官网上是这样定义的:"一个搭建在Chrome JavaScript运行时上的平台,用于构建高速.可伸缩的网络程序.Node.js采用的事件驱动.非阻塞I/O模型使它既轻量又高效,是构建运行在分布式设备上的数据密集型实时程序的完美选择." Node.js的事件

Spring基于事件驱动模型的订阅发布模式代码实例详解

代码下载地址:http://www.zuidaima.com/share/1791499571923968.htm 原文:Spring基于事件驱动模型的订阅发布模式代码实例详解 事件驱动模型简介 事件驱动模型也就是我们常说的观察者,或者发布-订阅模型:理解它的几个关键点: 首先是一种对象间的一对多的关系:最简单的如交通信号灯,信号灯是目标(一方),行人注视着信号灯(多方): 当目标发送改变(发布),观察者(订阅者)就可以接收到改变: 观察者如何处理(如行人如何走,是快走/慢走/不走,目标不会管的

Guava ---- EventBus事件驱动模型

在软件开发过程中, 难免有信息的共享或者对象间的协作. 怎样让对象间信息共享高效, 而且耦合性低. 这是一个难题. 而耦合性高将带来编码改动牵一发而动全身的连锁效应. Spring的风靡正是由于攻克了高耦合问题. 本篇介绍的EventBus中也用到了Spring中的依赖注入. 来进行对象和对象间的解耦(如@Subscribe). Guava解决高耦合採用的是事件驱动模型的思路. 对象能够订阅(subscribe)特定的事件或者公布(publish)特定的事件去被消费. 从以下的代码能够看出, E