在MRv1中,对象之间的作用关系是基于函数调用实现的,当一个对象向另外一个对象传递消息时,会直接采用函数调用的方式,并且这个过程是串行的。比如,当TaskTracker需要执行一个Task的时候,将首先下载Task依赖的文件(JAR包,二进制文件等,字典文件等),然后执行Task。在整个过程中,下载依赖文件是阻塞式的,也就是说,前一个任务未完成文件下载之前,后一个新任务将一直处于等待状态,只有在下载完成之后,才会启动一个独立进程运行该任务。基于函数调用式的编程模型是低效的,它隐含着整个过程是串行,同步进行的。
相比之下,MRv2引入的时间驱动变成模型则是一种更加高效的方式。在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间通过事件相互关联。每种事件处理一种类型的事件,同时根据需要出发另外一种事件。相比于基于函数调用的编程模型,这种编程方式具有异步并发等特点,更加高效,更加适合大型分布式系统。
下面,我们以负责控制MapReduce作业的AppllicationMaster,也就是MRAppMaster为例,看一下整个事件驱动模型与状态机。
首先,根据源代码,我们可以看到在MRAppMaster中有一个最重要的中央异步消息调度器 AsyncDispatcher,它负责整个MRAppMaster模块的消息调度。除此之外,还有多个其他的消息调度器,比如 JobEventDispatcher, TaskEventDispatcher,TaskAttemptEventDispatcher,SpeculatorEventDispatcher 等等。在MRAppMaster的初始时,在serviceInit() 函数中有以下关键代码
this.jobEventDispatcher = new JobEventDispatcher(); //register the event dispatchers dispatcher.register(JobEventType.class, jobEventDispatcher); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(CommitterEventType.class, committerEventHandler);
这里的意思是,当中央消息调度器dispatcher如果收到JobEventType的消息后,将会把这个消息转发给名为jobEventDispatcher的JobEventDispatcher的消息调度器。这个JobEventDispatcher是专门用来处理JobEventType事件的消息处理器。底层实现其实就是将一个pair<JobEventType.class, JobEventDispatcher>这个KV放入中央消息调度器dispatcher的HashMap中。剩下的几个消息调度器一样,注册相应的消息类型和其相对应的消息调度器。
之后dispatcher启动一个处理消息线程,可以在下面代码看到中央消息处理器处理消息。
Runnable createThread() { return new Runnable() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { Event event; try { event = eventQueue.take(); // 从中央消息处理器dispatcher的消息队列中获得消息 } catch(InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } if (event != null) { dispatch(event); // 分发该消息 } } } }; }
protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString()); } Class<? extends Enum> type = event.getType().getDeclaringClass(); //获取event的类型 try{ EventHandler handler = eventDispatchers.get(type); // 根据event类型从HashMap中获取对应的消息处理器 if(handler != null) { handler.handle(event); // 使用获得的消息处理器处理这个event,跳转至相应的处理器实现 } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) { LOG.info("Exiting, bbye.."); System.exit(-1); } } }
当Client提交一个Job之后,MRAppMaster创建一个job,并发送一个JOB_INIT事件给中央消息处理器dispatcher。下面我们以JOB_INIT事件为例,看看发生了什么。
首先dispatcher收到JOB_INIT事件之后,根据JOB_INIT的事件类型(JobEventType.class)获得事件的处理器应该是jobEventDispatcher,便将其发送给jobEventDispatcher。
private class JobEventDispatcher implements EventHandler<JobEvent> { @SuppressWarnings("unchecked") @Override public void handle(JobEvent event) { ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event); // 由这个job处理这个event } }
JobEventDispatcher处理这个JOB_INIT消息。之后进入JobImpl 的消息处理函数
public void handle(JobEvent event) { if (LOG.isDebugEnabled()) { LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); } try { writeLock.lock(); JobStateInternal oldState = getInternalState(); try { getStateMachine().doTransition(event.getType(), event); // 获取job的当前的状态机,根据当前消息event进行状态变换。 } catch (InvalidStateTransitonException e) { // 会跳转至 StateMachine 实现的 doTransition()函数。 LOG.error("Can't handle this event at current state", e); addDiagnostic("Invalid event " + event.getType() + " on Job " + this.jobId); eventHandler.handle(new JobEvent(this.jobId, JobEventType.INTERNAL_ERROR)); } //notify the eventhandler of state change if (oldState != getInternalState()) { LOG.info(jobId + "Job Transitioned from " + oldState + " to " + getInternalState()); rememberLastNonFinalState(oldState); } } finally { writeLock.unlock(); } }
MRAppMaster消息分发过程到此结束,这之后就进入了Job状态机的操作,当然根据任务的不同,也许会进入Task状态机,TaskAttempt状态机等等。总之,状态机以及状态机中的各种hook进行对应Job/Task/TaskAttempt的控制。
我们知道一个job对应了一个状态机,我们也应该知道在YARN的实现中一个状态机由以下三个部分组成: 1. 状态(节点) 2. 事件(弧) 3. Hook(触发事件后的处理)。
在JobImpl.java文件中,我们可以看到构建job状态机的过程:
protected static final StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> stateMachineFactory = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> (JobStateInternal.NEW) // 构造JobImpl状态机,初始状态是 NEW 状态 // Transitions from NEW state .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, // 添加一个状态变化 JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, // 添加一个状态变化 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition // 添加一个状态变化 (JobStateInternal.NEW, EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), JobEventType.JOB_INIT, new InitTransition()) .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED, JobEventType.JOB_KILL, new KillNewJobTransition()) .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT, JobEventType.JOB_AM_REBOOT, INTERNAL_REBOOT_TRANSITION) ... ...
后面还有很多,job状态机是比较一个复杂的状态机,涉及到很多状态与事件,可以通过 对YARN状态机可视化深入了解,在此不做更多讨论。
调用addTransition()函数可以添加一个状态变化。addTransition()函数中有四个参数,分别为: 1.preState 2.postState 3.eventType 4.hook。 它们表示 1.事件发生前的状态,2.事件发生后的状态, 3.事件, 4.事件发生时触发的hook
代码中 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()) 这一个状态变化表示,事件发生前是 NEW状态,事件发生后是 INITED状态 或者 FAILED状态,事件是JOB_INIT, hook是InitTransition对象。也就是如果Job状态机当前状态是NEW,这是发生了一个JOB_INIT事件,那么状态机会触发InitTransition对象中的doTransition()函数,如果transition()函数返回
INITED状态,那么状态机最新状态就是 INITED, 如果doTransition()函数返回 FAILED状态,那么状态机最新状态就是FAILED。
那么我们来看一下状态机的状态变换的过程。
private class InternalStateMachine implements StateMachine<STATE, EVENTTYPE, EVENT> { private final OPERAND operand; private STATE currentState; InternalStateMachine(OPERAND operand, STATE initialState) { this.operand = operand; this.currentState = initialState; if (!optimized) { maybeMakeStateMachineTable(); } } @Override public synchronized STATE getCurrentState() { return currentState; } @Override public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException { currentState = StateMachineFactory.this.doTransition // 发生状态机变化 (operand, currentState, eventType, event); return currentState; } }
private STATE doTransition (OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException { // We can assume that stateMachineTable is non-null because we call // maybeMakeStateMachineTable() when we build an InnerStateMachine , // and this code only gets called from inside a working InnerStateMachine . Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = stateMachineTable.get(oldState); // 根据当前状态获取所有该状态有可能发生的事件 if (transitionMap != null) { Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition = transitionMap.get(eventType); // 根据事件获取对应的hook,为一个Transition对象(Transition是一个接口) if (transition != null) { return transition.doTransition(operand, oldState, event, eventType); // 调用该对象的doTransition()函数 } } throw new InvalidStateTransitonException(oldState, eventType); }
private class SingleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> { private STATE postState; private SingleArcTransition<OPERAND, EVENT> hook; // transition hook SingleInternalArc(STATE postState, SingleArcTransition<OPERAND, EVENT> hook) { this.postState = postState; this.hook = hook; } @Override public STATE doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType) { if (hook != null) { hook.transition(operand, event); // 调用 hook 的 transition()函数 } return postState; } }
这个 stateMachineTable 是一个 Map<state, Map<eventType, Transition>> 数据结构。根据当前状态可以获取这个状态的所有<eventType, Transition>对应关系。之后根据event类型可以获得 Transition对象(也就是一个hook)。那么,状态机就可以由当前状态,事件调用hook函数了。本质上就是 StateMachine.hook(preState,
event) 的形式,因为OO的思想发生了变形。之后就是Transition对象的具体实现了,由于在一开始构造状态机的时候,JobImpl 构造了一个状态变化 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()),这里的hook是 InitTransition 对象。那么hook被调用的时候,就会调用
InitTransition 对象的 transition() 函数。InitTransition 对象的 transition() 函数中包括了很多初始化一个job所做的操作,比如创建MapTask,创建ReduceTask,创建MapTask所需要的输入文件Split 等等,在这里就不做过多叙述,可以自行阅读相关文章。
这个就是YARN的状态机工作的基本流程以及如何与 MRAppMaster的消息分发机制相关联的过程。由于YARN相对于MRv1的诸多改进,现在MRAppMaster取代了过去MRv1中的JobTracker对Job与Task进行控制,结构更加清晰,模块化更加明显。总体而言,MRAppMaster是一个相对比较复杂的模块,需要进行更多的更加仔细分析。
如需转载请表明 转载自 http://blog.csdn.net/gjt19910817/article/details/43441801