JStorm之NimbusServer启动流程

NimbusServer相当于hadoop里的JobTracker或yarn里的ResourceManager,在集群中属于首脑地位,负责分发任务,监控集群状态,与supervisor的通信主要通过Zookeeper。nimbus在启动过程中会做以下工作,以保证集群稳定运行:

1、清理无效topology

2、建立zk连接并创建相应znode

3、启动监控线程

4、启动httpserver

启动主函数如下:

public static void main(String[] args) throws Exception {
		// 读取配置文件,如果我们在JVM启动参数中包含了storm.conf.file系统属性
		//则会直接读取参数指定的文件,否则会找strom.yaml,这个一个yaml格式的文件
		//读取出的配置信息最终放入Map中
		@SuppressWarnings("rawtypes")
		Map config = Utils.readStormConfig();
		NimbusServer instance = new NimbusServer();
		//处理资源分配的工具
		INimbus iNimbus = new DefaultInimbus();
		//主要启动流程包含此函数中
		instance.launchServer(config, iNimbus);
	}

nimbus启动的主方法如下:

private void launchServer(final Map conf, INimbus inimbus) {
		LOG.info("Begin to start nimbus with conf " + conf);

		try {
			// 检测集群配置是本地模式还是集群模式
			StormConfig.validate_distributed_mode(conf);
			//清理pid目录,由参数storm.local.dir指定,该过程包含如下操作
			//1、创建本地进程的pid文件
			//2、删除历史pid文件,比如上次进程直接被kill
			createPid(conf);
			//初始化关闭钩子线程:里面包含各种清理操作,可参考cleanup方法
			initShutdownHook();
			//初始化ininumbus目录
			inimbus.prepare(conf, StormConfig.masterInimbus(conf));
			//创建数据容器,存放各种统计信息例如提交此俗、集群状态、启动时间,参考下面NimbusData类成员
			data = createNimbusData(conf, inimbus);
			//启动fllower线程不断更新zk状态,如果发现自己不是leader则会关闭进程
			initFollowerThread(conf);
			//启动http服务器
			int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
			hs = new Httpserver(port, conf);
			hs.start();
			//如果不是运行在yarn类似的容器中,不用关心此方法
			initContainerHBThread(conf);

			while (!data.isLeader())
				Utils.sleep(5000);

			//初始换监控线程
			initUploadMetricThread(data);
			//进行初始化操作,该方法会清理zk中残留的topology信息,初始化TopologyAssign线程负责分配任务
			init(conf);
		} catch (Throwable e) {
			LOG.error("Fail to run nimbus ", e);
		} finally {
			cleanup();
		}

		LOG.info("Quit nimbus");
	}

NimbusServer的init方法如下:

private void init(Map conf) throws Exception {

		NimbusUtils.cleanupCorruptTopologies(data);
		//初始化TopologyAssign线程
		initTopologyAssign();

		initTopologyStatus();
		//jar文件清理线程,默认每10分钟会清理一次,清理目录:$LOCAL-DIR/nimbus/inbox
		initCleaner(conf);

		serviceHandler = new ServiceHandler(data);

		if (!data.isLocalMode()) {
			//监控各task运行状态,发现dead状态的则会清理掉释放资源
			initMonitor(conf);
			//启动Thrift server
			initThrift(conf);

		}
	}

最后附上NimbusData的数据结构:

	public class NimbusData {

		private Map<Object, Object> conf;

		private StormClusterState stormClusterState;

		// Map<topologyId, Map<taskid, TkHbCacheTime>>
		private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache;

		// TODO two kind of value:Channel/BufferFileInputStream
		private TimeCacheMap<Object, Object> downloaders;
		private TimeCacheMap<Object, Object> uploaders;

		private int startTime;

		private final ScheduledExecutorService scheduExec;

		private AtomicInteger submittedCount;

		private Object submitLock = new Object();

		private StatusTransition statusTransition;

		private static final int SCHEDULE_THREAD_NUM = 8;

		private final INimbus inimubs;

		private Map<String, Map<String, Map<ThriftResourceType, Integer>>> groupToTopology;

		private Map<String, Map<ThriftResourceType, Integer>> groupToResource;

		private Map<String, Map<ThriftResourceType, Integer>> groupToUsedResource;

		private final boolean localMode;

		private volatile boolean isLeader;

		...........
	}

时间: 2024-08-08 02:24:28

JStorm之NimbusServer启动流程的相关文章

JStorm之Supervisor启动流程

Supervisor中文翻译是监督者,意思简单明了,就是对资源进行监控,其实主要是woker资源.该组件所做的事情概括如下: 1.每隔一段时间发送心跳证明自己还活着 2.下载新的topology 3.释放无效的woker 4.分配新的任务 该组件主要包含:心跳线程.supervisor事件接受线程.处理线程,一旦事件接受到则会进入任务分配环节,主要逻辑代码如下: public static void main(String[] args) { Supervisor instance = new

#24 centos6(RHEL)系列操作系统的启动流程、与命令chkconfig、grub的使用

所有由rc脚本关闭或启动的链接文件的原文件都存在于/etc/rc.d/init.d,系统为了方便使用,为此目录创建了链接/etc/init.d 所有/etc/inid.d(/etc/rc.d/init.d)目录中的脚本执行方式: # /etc/init.d/srv_script {start|stop|restart|status} # service srv_script {start|stop|restart|status} chkconfig命令: chkconfig - updates

#23 centos5(RHEL)系列操作系统的启动流程、与命令mkinitrd、dracut的使用

centos(RHEL)系列操作系统的启动流程:Intel x86兼容架构: Linux的系统组成:内核 + 应用程序  GUN/Linux:单纯的指Linux内核: 从硬盘存储和启动操作系统的角度: Linux的系统组成:内核 + 根文件系统(rootfs) 内核功能:进程管理,文件系统管理,内存管理,网络协议,驱动程序,安全功能,... Linux系统的系统运行环境可以分为两部分: 内和空间:内核代码(系统调用) 就是内核进程占用的CPU和内存资源的总和: 用户空间:应用程序(进程或线程)

CentOS 6开机启动流程实验篇

CentOS 6开机启动流程实验篇 centos 系统的启动流程 grub 破坏Linux的核心文件再修复体验系统启动流程 CentOS 6开机启动的具体详情请参见理论篇! 了解了系统启动的基本流程,以下我们通过"破坏式实验",即破坏系统启动过程中的一些关键环节,使系统无法启动,然后我们再通过修复这些文件使得系统正常重启,进而体验Linux系统的启动流程,这些关键环节包括破坏grub三个stage(stage1.stage1-5.stage2) 中的任何一个阶段,甚至是整个grub;

u-boot启动流程分析(2)_板级(board)部分

转自:http://www.wowotech.net/u-boot/boot_flow_2.html 目录: 1. 前言 2. Generic Board 3. _main 4. global data介绍以及背后的思考 5. 前置的板级初始化操作 6. u-boot的relocation 7. 后置的板级初始化操作 1. 前言 书接上文(u-boot启动流程分析(1)_平台相关部分),本文介绍u-boot启动流程中和具体版型(board)有关的部分,也即board_init_f/board_i

CentOS开机启动流程简介

我们都知道按下电脑电源键后,屏幕上会一闪而过很多信息,然后显示登录界面,然后输入用户名,密码就可以畅享网络世界了.那么这中间到底发生了什么呢,今天就让我们来简单探讨一下CentOS的简易版开机启动流程吧. 第一阶段:通电自检过程 我们都知道电脑所有数据指令都是在内存上才能被cpu处理的吧,我们还知道内存在断电后其上面的所有数据都会丢失吧,那么开机的时候内存应该是没有东西的吧,那上面都不能干了,更别说启动一个操作系统了,其实啊,我们内存并不只是我们常见的那个内存卡,很多硬件都会映射一段内存到cpu

Cocos2d-x3.3RC0的Android编译Activity启动流程分析

本文将从引擎源代码Jni分析Cocos2d-x3.3RC0的Android Activity的启动流程,以下是具体分析. 1.引擎源代码Jni.部分Java层和C++层代码分析 watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQveXV4aWt1b18x/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" > watermark/2/text/aHR0cDov

centos启动流程[转]

启动流程概览 在硬件驱动成功后,Kernel 会主动呼叫 init 程序,而 init 会取得 run-level 资讯: init 运行 /etc/rc.d/rc.sysinit 文件来准备软件运行的作业环境 (如网络.时区等): init 运行 run-level 的各个服务之启动 (script 方式): init 运行 /etc/rc.d/rc.local 文件: init 运行终端机模拟程序 mingetty 来启动 login 程序,最后就等待使用者登陆啦:    init,/etc

System进程的启动流程第二部分

继System进程的启动流程第一部分,我们接着分析com.android.server.SystemServer的main函数.如下: public class SystemServer { ...... native public static void init1(String[] args); ...... public static void main(String[] args) { ...... init1(args); ...... } public static final vo