JStorm之Supervisor启动流程

Supervisor中文翻译是监督者,意思简单明了,就是对资源进行监控,其实主要是woker资源。该组件所做的事情概括如下:

1、每隔一段时间发送心跳证明自己还活着

2、下载新的topology

3、释放无效的woker

4、分配新的任务

该组件主要包含:心跳线程、supervisor事件接受线程、处理线程,一旦事件接受到则会进入任务分配环节,主要逻辑代码如下:

public static void main(String[] args) {
	Supervisor instance = new Supervisor();
	//主要初始化操作方法
	instance.run();
}
public void run() {
	SupervisorManger supervisorManager = null;
	try {
		//读取配置文件,和nimbus一样,不多解释
		Map<Object, Object> conf = Utils.readStormConfig();
		//获得集群模式
		StormConfig.validate_distributed_mode(conf);
		//创建pid文件
		createPid(conf);
		//supervisor会在该方法中启动
		supervisorManager = mkSupervisor(conf, null);
		JStormUtils.redirectOutput("/dev/null");

	} catch (Exception e) {
		LOG.error("Failed to start supervisor\n", e);
		System.exit(1);
	}
	while (supervisorManager.isFinishShutdown() == false) {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {

		}
	}
}

下面看组件启动的主方法

public SupervisorManger mkSupervisor(Map conf, IContext sharedContext)
			throws Exception {
		LOG.info("Starting Supervisor with conf " + conf);
		active = new AtomicBoolean(true);
		/**
		 * Step 1: 清理临时文件:$storm.local.dir/supervisor/tmp
		 */
		String path = StormConfig.supervisorTmpDir(conf);
		FileUtils.cleanDirectory(new File(path));
		/*
		 * Step 2: 创建zk连接,并建立一系列目录:[/assignments, /tasks, /topology, /supervisors, /taskbeats, /taskerrors, /monitor]
		 *
		 */
		StormClusterState stormClusterState = Cluster
				.mk_storm_cluster_state(conf);
		/*
		 * Step 3  创建$storm.local.dirsupervisor/localstate目录
		 *         创建以时间戳为名的版本文件:1421217778765、1421217778765.version并将supervisorid序列化到文件内
		 *         删除旧的版本文件,保留最近4个
		 */
		LocalState localState = StormConfig.supervisorState(conf);
		String supervisorId = (String) localState.get(Common.LS_ID);
		if (supervisorId == null) {
			supervisorId = UUID.randomUUID().toString();
			localState.put(Common.LS_ID, supervisorId);
		}
		Vector<SmartThread> threads = new Vector<SmartThread>();
		// Step 5 create HeartBeat
		/*
		* 创建heatbeat线程,并每隔supervisor.heartbeat.frequency.secs秒发送一次心跳,更新zk里的znode
		* znode节点为:/supervisors/supervisor-id (0add54ac-2c23-49bc-aaee-05b3cb9fef00)
		* 会更新该节点的如下信息:SupervisorInfo[hostName=rt2l02046.tbc,
		*					 										supervisorId=0add54ac-2c23-49bc-aaee-05b3cb9fef00,
		*					 										timeSecs=1421219320,
		*					 										uptimeSecs=908,
		*					 										workerPorts=[6801, 6800, 6803, 6802]]
		*
		*/
		Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId,
				active);
		hb.update();
		AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null,
				Thread.MIN_PRIORITY, true);
		threads.add(heartbeat);

		// Sync heartbeat to Apsara Container
		AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf);
		if (syncContainerHbThread != null) {
		    threads.add(syncContainerHbThread);
		}
		// Step 6 create and start sync Supervisor thread
		// every supervisor.monitor.frequency.secs second run SyncSupervisor
		/*
		*  创建两个同步线程,SyncSupervisorEvent和SyncProcessEvent分别用来接收事件和处理事件,
		*  与topology关系比较密切,在topology提交的时候再详细介绍
		*/
		EventManager processEventManager = new EventManagerImp(false);
		ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>();
		SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId,
				conf, localState, workerThreadPids, sharedContext);

		EventManager syncSupEventManager = new EventManagerImp(false);
		SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(
				supervisorId, conf, processEventManager, syncSupEventManager,
				stormClusterState, localState, syncProcessEvent);

		int syncFrequence = JStormUtils.parseInt(conf
				.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
		EventManagerPusher syncSupervisorPusher = new EventManagerPusher(
				syncSupEventManager, syncSupervisorEvent, active, syncFrequence);
		AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(
				syncSupervisorPusher);
		threads.add(syncSupervisorThread);

		//Step 7 start httpserver
		int port = ConfigExtension.getSupervisorDeamonHttpserverPort(conf);
		Httpserver httpserver = new Httpserver(port, conf);
		httpserver.start();

		//Step 8 上传监控信息
		MetricSendClient client;
		if (ConfigExtension.isAlimonitorMetricsPost(conf)) {
			client = new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR,
					AlimonitorClient.DEFAULT_PORT, true);
		} else {
		    client = new MetricSendClient();
		}
		UploadSupervMetric uploadMetric = new UploadSupervMetric(conf, stormClusterState,
				supervisorId, active, 60, client);
		AsyncLoopThread uploadMetricThread = new AsyncLoopThread(uploadMetric);
		threads.add(uploadMetricThread);

		// SupervisorManger which can shutdown all supervisor and workers
		return new SupervisorManger(conf, supervisorId, active, threads,
				syncSupEventManager, processEventManager, httpserver,
				stormClusterState, workerThreadPids);
	}
时间: 2024-08-18 19:22:19

JStorm之Supervisor启动流程的相关文章

JStorm之NimbusServer启动流程

NimbusServer相当于hadoop里的JobTracker或yarn里的ResourceManager,在集群中属于首脑地位,负责分发任务,监控集群状态,与supervisor的通信主要通过Zookeeper.nimbus在启动过程中会做以下工作,以保证集群稳定运行: 1.清理无效topology 2.建立zk连接并创建相应znode 3.启动监控线程 4.启动httpserver 启动主函数如下: public static void main(String[] args) throw

[转]JStorm之Supervisor简介

一.简介 Supervisor是JStorm中的工作节点,类似于MR的TT,subscribe zookeeper的任务调度结果数据,根据任务调度情况启动/停止工作进程Worker.同时Supervisor需要定期向zookeeper写入活跃端口信息以便Nimbus监控.Supervisor不执行具体处理工作,所有的计算任务都交Worker完成.从整个架构上看,Supervisor处在整个JStorm三级管理架构的中间环节,辅助管理任务调度和资源管理工作. 二.架构 1.Supervisor S

Storm启动流程分析

Storm启动流程分析 1.客户端运行storm nimbus时,会调用storm的python脚本,该脚本中为每个命令编写了一个方法,每个方法都可以生成一条相应的Java命令. 命令格式:java -server xxx.ClassName  -args nimbus--->Running:/export/servers/jdk/bin/java  -server  backtype.storm.daemon.nimbus supervisor--->Running:/export/serv

Storm启动流程简介

storm启动流程          storm是一个流行的开源的,分布式实时处理框架,关于storm的基本介绍可以参加这篇官方文档.大致的拓扑结构如图所示:        其中Nimbus是一个后台管理进程,运行在master node上.Supervisor也是后台进程,运行在work node上.依据上图,Nimbus和Supervisor不直接通信,通过zookeeper进行通信.在Master Node上,可以通过storm nimbus命令来启动nimbus进程,同时通过storm

supervisor启动worker源码分析-worker.clj

supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-supervisor.clj".sync-processes函数代码片段如下: sync-processes函数代码片段 ;; sync-processes函数用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers;; supervisor标识supervis

庖丁解牛 Activity 启动流程

前言 这是 Android 9.0 AOSP 系列 的第五篇了,先来回顾一下前面几篇的大致内容. Java 世界的盘古和女娲 -- Zygote 主要介绍了 Android 世界的第一个 Java 进程 Zygote 的启动过程. 注册服务端 socket,用于响应客户端请求 各种预加载操作,类,资源,共享库等 强制 GC 一次 fork SystemServer 进程 循环等待客户端发来的 socket 请求(请求 socket 连接和请求 fork 应用进程) Zygote家的大儿子 --

#24 centos6(RHEL)系列操作系统的启动流程、与命令chkconfig、grub的使用

所有由rc脚本关闭或启动的链接文件的原文件都存在于/etc/rc.d/init.d,系统为了方便使用,为此目录创建了链接/etc/init.d 所有/etc/inid.d(/etc/rc.d/init.d)目录中的脚本执行方式: # /etc/init.d/srv_script {start|stop|restart|status} # service srv_script {start|stop|restart|status} chkconfig命令: chkconfig - updates

#23 centos5(RHEL)系列操作系统的启动流程、与命令mkinitrd、dracut的使用

centos(RHEL)系列操作系统的启动流程:Intel x86兼容架构: Linux的系统组成:内核 + 应用程序  GUN/Linux:单纯的指Linux内核: 从硬盘存储和启动操作系统的角度: Linux的系统组成:内核 + 根文件系统(rootfs) 内核功能:进程管理,文件系统管理,内存管理,网络协议,驱动程序,安全功能,... Linux系统的系统运行环境可以分为两部分: 内和空间:内核代码(系统调用) 就是内核进程占用的CPU和内存资源的总和: 用户空间:应用程序(进程或线程)

CentOS 6开机启动流程实验篇

CentOS 6开机启动流程实验篇 centos 系统的启动流程 grub 破坏Linux的核心文件再修复体验系统启动流程 CentOS 6开机启动的具体详情请参见理论篇! 了解了系统启动的基本流程,以下我们通过"破坏式实验",即破坏系统启动过程中的一些关键环节,使系统无法启动,然后我们再通过修复这些文件使得系统正常重启,进而体验Linux系统的启动流程,这些关键环节包括破坏grub三个stage(stage1.stage1-5.stage2) 中的任何一个阶段,甚至是整个grub;