Hadoop服务库与事件库的使用及其工作流程

Hadoop服务库与事件库的使用及其工作流程

?

Hadoop服务库:

YARN采用了基于服务的对象管理模型,主要特点有:

  • 被服务化的对象分4个状态:NOTINITED,INITED,STARTED,STOPED
  • 任何服务状态变化都可以触发另外一些动作
  • 可通过组合方式对任意服务进行组合,统一管理

具体类请参见 org.apache.hadoop.service包下.核心接口是Service,抽象实现是AbstractService

????YARN中,ResourceManager和NodeManager属于组合服务,内部包含多个单一和组合服务.以实现对内部多种服务的统一管理.

?

Hadoop事件库:

YARN采用事件驱动并发模型, 把各种逻辑抽象成事件,进入事件队列,然后由中央异步调度器负责传递给相应的事件调度器处理,或者调度器之间再传递,直至完成任务.

具体参见org.apache.hadoop.yarn.event.主要类和接口是:Event, AsyncDispatcher,EventHandler

?

?

按照惯例, 先给出一个Demo,然后顺着Demo研究代码实现.

示例我是直接抄<hadoop技术内幕>:

例子涉及如下几个模块:

  1. Task

  2. TaskType

  3. Job

  4. JobType

  5. Dispatcher

package
com.demo1;

?

import org.apache.hadoop.conf.Configuration;

import
org.apache.hadoop.service.CompositeService;

import
org.apache.hadoop.service.Service;

import
org.apache.hadoop.yarn.event.AsyncDispatcher;

import
org.apache.hadoop.yarn.event.Dispatcher;

import
org.apache.hadoop.yarn.event.EventHandler;

?

/**

* Created by yang on 2014/8/25.

*/

public
class
SimpleService
extends
CompositeService {

?

private
Dispatcher
dispatcher;

private String jobID;

private
int
taskNum;

private String[] taskIDs;

?

public SimpleService(String name, String jobID, int
taskNum) {

super(name);

this.jobID = jobID;

this.taskNum = taskNum;

this.taskIDs = new String[taskNum];

?

for (int
i = 0; i < taskNum; i++) {

taskIDs[i] = new String(jobID + "_task_" + i);

}

}

?

public
Dispatcher getDispatcher() {

return
dispatcher;

}

?

public
void serviceInit(Configuration conf) throws Exception {

dispatcher = new
AsyncDispatcher();

dispatcher.register(JobEventType.class, new JobEventDIspatcher());

dispatcher.register(TaskEventType.class, new TaskEventDIspatcher());

addService((Service)dispatcher);

super.serviceInit(conf);

}

?

private
class JobEventDIspatcher implements
EventHandler<JobEvent> {

?

@Override

public
void handle(JobEvent
jobEvent) {

if (jobEvent.getType() == JobEventType.JOB_KILL) {

System.out.println("JOB KILL EVENT");

for (int
i = 0; i < taskNum; i++) {

dispatcher.getEventHandler().handle(new
TaskEvent(taskIDs[i], TaskEventType.T_KILL));

}

} else
if (jobEvent.getType() == JobEventType.JOB_INIT) {

System.out.println("JOB INIT EVENT");

for (int
i = 0; i < taskNum; i++) {

dispatcher.getEventHandler().handle(new
TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));

}

}

}

}

?

private
class TaskEventDIspatcher implements
EventHandler<TaskEvent> {

?

@Override

public
void handle(TaskEvent
taskEvent) {

if (taskEvent.getType() == TaskEventType.T_KILL) {

System.out.println("TASK KILL EVENT" + taskEvent.getTaskID());

} else
if (taskEvent.getType() == TaskEventType.T_SCHEDULE) {

System.out.println("TASK INIT EVENT" + taskEvent.getTaskID());

}

}

}

}

?

  1. 测试程序

package
com.demo1;

?

import org.apache.hadoop.conf.Configuration;

import
org.apache.hadoop.yarn.conf.YarnConfiguration;

?

/**

* Created by yang on 2014/8/25.

*/

public
class
Test {

public
static
void main(String[] args) throws Exception {

String jobID="job_1";

SimpleService
ss = new
SimpleService("test",jobID,5);

YarnConfiguration
config = new
YarnConfiguration(new Configuration());

?

ss.serviceInit(config);

ss.init(config);

ss.start();

?

ss.getDispatcher().getEventHandler().handle(new
JobEvent(jobID,JobEventType.JOB_KILL));

ss.getDispatcher().getEventHandler().handle(new
JobEvent(jobID,JobEventType.JOB_KILL));

}

}

?

?

不出意外的话,运行结果应该类似:


14/08/25 16:02:20 INFO event.AsyncDispatcher: Registering class com.yws.demo1.JobEventType for class com.yws.demo1.SimpleService$JobEventDIspatcher

14/08/25 16:02:42 INFO event.AsyncDispatcher: Registering class com.yws.demo1.TaskEventType for class com.yws.demo1.SimpleService$TaskEventDIspatcher

14/08/25 16:02:54 INFO event.AsyncDispatcher: Registering class com.yws.demo1.JobEventType for class com.yws.demo1.SimpleService$JobEventDIspatcher

14/08/25 16:03:03 INFO event.AsyncDispatcher: Registering class com.yws.demo1.TaskEventType for class com.yws.demo1.SimpleService$TaskEventDIspatcher

JOB KILL EVENT

JOB KILL EVENT

TASK KILL EVENTjob_1_task_0

TASK KILL EVENTjob_1_task_1

TASK KILL EVENTjob_1_task_2

TASK KILL EVENTjob_1_task_3

TASK KILL EVENTjob_1_task_4

TASK KILL EVENTjob_1_task_0

TASK KILL EVENTjob_1_task_1

TASK KILL EVENTjob_1_task_2

TASK KILL EVENTjob_1_task_3

TASK KILL EVENTjob_1_task_4

?

?

我们开始分析:

所谓的Task,Job,其实是按业务逻辑划分的, 他们都继承AbstractEvent类.

SimpleService是一个组合服务,里面放了EventHandler和Dispatcher

?

从Test开始,看看Service是如何创建的

构造函数比较简单,就是将一个job拆分成taskNum个Task

ss.serviceInit(config);做了什么呢:

创建一个中央事件调度器: AsyncDispatcher(具体实现我们在后文分析)

并把Job和Task的Event及2者对应的EventHandler注册到调度器中.

这里就是初始化和启动服务了.最后2行就是模拟2个事件的JOB_KILL事件.

?

我们进到ss.getDispatcher().getEventHandler(),发现他其实是创建一个GenericEventHandler

?

这个handler干什么是呢?

就是把

塞到BlockingQueue<Event> eventQueue; 中.

不知道你发现没有, 这个方法仅仅是一个入队操作啊. 那具体调用JobEventDIspatcher.handler是在什么地方呢?

这时联想到之前不是有个中央调度器嘛, AsyncDispatcher, Line 80行, 他创建了一个线程,并不断的从之前说的EventQueue中不断的取Event,然后执行,这里的执行也就是调用了具体的handler了

就这样一个基于事件驱动的程序这么完成了.

?

按照hadoop 早起版本中, 业务逻辑之间是通过函数调用方式实现的,也就是串行的. 现在基于事件驱动后,大大提高了并发性.很值得我们学习.

?

来张全家福:

?

HandlerThread就是前文说的那个隐藏线程. EventHandler会产生一些新的Event,然后又重新进入队列.循环.

时间: 2024-08-04 15:51:42

Hadoop服务库与事件库的使用及其工作流程的相关文章

Yarn的服务库和事件库使用方法

事件类型定义: package org.apache.hadoop.event; public enum JobEventType { JOB_KILL, JOB_INIT, JOB_START } package org.apache.hadoop.event; public enum TaskEventType { T_KILL, T_SCHEDULE } 事件定义: package org.apache.hadoop.event; import org.apache.hadoop.yarn

Yarn的服务库和事件库

对于生命周期较长的对象,YARN采用了基于服务对象管理模型对其进行管理. 该模型有一下特点: 每个被服务化的对象都分为4个状态 任何服务状态变化都可以触发另外一些动作 可以通过组合方式对任意服务进行组合,以便统一管理. YARN中服务模型的类图(位于包:org.apahce.hadoop.service中) 在YARN中,resourceManager 和 nodeManager属于组合服务,他们的内部包含多个单一服务和组合服务,以实现对内部多种服务的统一管理. 所有的核心服务实际上都是一个中央

muduo库中TcpServer一次完整的工作流程

模拟单线程情况下muduo库的工作情况 muduo的源代码对于一个初学者来说还是有一些复杂的,其中有很多的回调函数以及交叉的组件,下面我将追踪一次TCP连接过程中发生的事情,不会出现用户态的源码,都是库内部的运行机制.下文笔者将描述一次连接发生的过程,将Channel到加入到loop循环为止. 监听套接字加入loop循环的完整过程 首先创建一个TcpServer对象,在的创建过程中,首先new出来自己的核心组件(Acceptor,loop,connectionMap,threadPool)之后T

Redis源码-事件库

网上看了很多Redis事件库的解读,自己也研究了好几遍,还是记录下来,虽然水平有限,但是进步总会是有的 网络事件库封装了Epoll的操作(当然是指Linux下的多路复用了),并且实现一个定时器,定时器也是服务端程序的基石,很多问题都需要靠定时器解决 (一)数据结构+算法构成一个完整的程序,要一窥Redis网络库,需要先从数据结构开始学习 1.整个事件循环是用一个全局的数据结构描述的,aeEventLoop /* State of an event based program */ typedef

利用epoll写一个&quot;迷你&quot;的网络事件库

epoll是linux下高性能的IO复用技术,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率.另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了.epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存I

部门经理的风范---记一版本库迁移事件

工作中少不了团队间的合作,而有了合作,也必将有摩擦.如何应对摩擦和项目行进中的人为阻碍因素,能体现出一位团队领导者的能力. 我们部门经理(简称B)在11月21日的进取性做法让我一个小组员获益良多. 事件: 有个VSS迁移到SVN的项目----迁移某软件中心下属4个软件开发部门的4个VSS库到到SVN环境.  我们部门分出3个组员和小组长(简称Z)合共4人连续7个工作日,做好了环境搭建.数据迁移.数据校对的工作.于20日晚上,加班完成了4个库的整体迁移. 然而21日上午,整个部门的工作QQ和邮箱被

js学习总结----移动端事件基础及常用的事件库

一.事件基础 PC:click.mouseover.mouseout.mouseenter.mouseleave.mousemove.mousedown.mouseup.mousewheel.keydown.keyup.load.scroll.blur.focus.change... 移动端:click(单击).load.scroll.blur.focus.change.input(代替keyup.keydown)...TOUCH事件模型(处理单手指操作).GESTURE事件模型(处理多手指操作

手势识别与事件库 Touch.js若干问题及解决方法

Touch.js是移动设备上的手势识别与事件库, 由百度云Clouda团队维护,也是在百度内部广泛使用的开发工具. Touch.js的代码已托管于github并开源,希望能帮助国内更多的开发者学习和开发出优秀的App产品. Touch.js手势库专为移动设备设计, 请在Webkit内核浏览器中使用. 极速CDN <script src="http://code.baidu.com/touch-0.2.14.min.js"></script> Examples /

ktouch移动端事件库

最近闲来无事,写了个移动端的事件库,代码贴在下面,大家勿拍. 1 /** 2 @version 1.0.0 3 @author gangli 4 @deprecated 移动端触摸事件库 5 */ 6 (function () { 7 "use strict"; 8 var util = { 9 $: function (selector) { 10 return document.querySelector(selector); 11 }, 12 getEventInfo: func