流式计算-Jstorm提交Topology过程(下)

紧接上篇流式计算-Jstorm提交Topology过程(上)

5、上篇任务已经ServiceHandler.submitTopologyWithOpts()方法,在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中,具体代码如下:

TopologyAssignEvent assignEvent = new TopologyAssignEvent();
			assignEvent.setTopologyId(topologyId);
			assignEvent.setScratch(false);
			assignEvent.setTopologyName(topologyname);
			assignEvent.setOldStatus(Thrift
					.topologyInitialStatusToStormStatus(options
							.get_initial_status()));

			TopologyAssign.push(assignEvent);

6、TopologyAssign是Jstorm一个任务分配器,它会根据配置和Topology中spout和bolt的关系来进行Task的创建和分配,但是具体任务的创建和非配并发其自身完成的,二是调用Jstorm自身的调度器完成的,当然Jstorm允许用户根据自己业务需求定制调度器,关于Jstorm的调度器分析会本人专门写一篇文章,此处暂不做任何说明。回到TopologyAssign,该类是一个实现了Runnable接口的后台线程,随着Nimbus启动,主要完成topology作业分配、备份和作业均衡的作用,当天还是通过Jstorm的调度器来完成的,其run方法会采用阻塞的方式获取自身作业队列中的作业,然后进行作业分配,其作业分配核心业务如下

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
		String topologyId = event.getTopologyId();
		TopologyAssignContext context = prepareTopologyAssign(event);
		//ResourceWorkerSlot是worker的抽象,封装了worker和其task
		Set<ResourceWorkerSlot> assignments = null;
		IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
		//通过Jstorm的调度来计算任务的分配
		assignments = scheduler.assignTasks(context);
		Assignment assignment = null;
		Map<String, String> nodeHost = getTopologyNodeHost(
				context.getCluster(), context.getOldAssignment(), assignments);

		Map<Integer, Integer> startTimes = getTaskStartTimes(context,
				nimbusData, topologyId, context.getOldAssignment(), assignments);
		//获取提交到集群的jar包地址,Worker执行任务时需要下载代码
		String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(),
				topologyId);
		assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
		StormClusterState stormClusterState = nimbusData.getStormClusterState();
		//将分配好的任务上传到ZK,通知supervisor
		stormClusterState.set_assignment(topologyId, assignment);
		//更新Task的开始时间
		NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
		// 更新元信息到ZK
		if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
				|| context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_MONITOR)
			NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment);
		else
			metricsMonitor(event);
		return assignment;
	}

7、Nimbus已经将任务分配好了,并且创建到ZK上,此时就需要supervisor认领自己的任务了,supervisor获取任务的具体逻辑封装在SyncSupervisorEvent,其也是一个后台线程,会不停获取ZK上(JSTORM_ROOT/assignments下)的全部任务,然后把自己的任务保存到本地磁盘上,再通过NimbusClient把topology的代码保存到本地,然后启动worker启动线程来执行任务,具体业务逻辑代码如下

public void run() {

			RunnableCallback syncCallback = new EventManagerZkPusher(this,
					syncSupEventManager);

			/**
			 *首次启动时主动获取ZK上JSTORM_ROOT/assignments的全部任务,后续通过ZK的watch以一种回调的方式获取任务,
			 */
			Map<String, Assignment> assignments = Cluster.get_all_assignment(
					stormClusterState, syncCallback);
			/**
			 *获取本地已经下载的topology
			 */
			List<String> downloadedTopologyIds = StormConfig
					.get_supervisor_toplogy_list(conf);
			/**
			 * 在所有作业中,获取自身的作业
			 */
			Map<Integer, LocalAssignment> localAssignment = getLocalAssign(
					stormClusterState, supervisorId, assignments);

			/**
			 * 将作业保存到本地磁盘
			 */
			localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);
			// 获取topology的代码下载地址
			Map<String, String> topologyCodes = getTopologyCodeLocations(
					assignments, supervisorId);
			//通过NimbusClient将代码下载到本地
			downloadTopology(topologyCodes, downloadedTopologyIds);

			/**
			 * 删除无用的topology
			 */
			removeUselessTopology(topologyCodes, downloadedTopologyIds);

			/**
			 * 将syncProcesses加到执行队列,syncProcesses复杂启动新的worker来执行任务
			 */
			processEventManager.add(syncProcesses);

	}

8、SyncSupervisorEvent将自己的作业选出来,并保存到本地之后,再由SyncProcessEvent来启动worker执行具体的作业,SyncProcessEvent主要干两件事,启动新的worker,杀死无用的worker,此处要涉及启动新的Worker,具体业务逻辑如下

private void startNewWorkers(Set<Integer> keepPorts,
			Map<Integer, LocalAssignment> localAssignments) throws Exception {
		/**
		 * 获取本次新分配的作业
		 */
		Map<Integer, LocalAssignment> newWorkers = JStormUtils
				.select_keys_pred(keepPorts, localAssignments);

		/**
		 * 给每个新作业生成一个ID
		 */
		Map<Integer, String> newWorkerIds = new HashMap<Integer, String>();

		for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) {
			Integer port = entry.getKey();
			LocalAssignment assignment = entry.getValue();

			String workerId = UUID.randomUUID().toString();
			newWorkerIds.put(port, workerId);
			//保存每个Worker的ID到本地

			StormConfig.worker_pids_root(conf, workerId);
			//启动新的JVM执行作业
			launchWorker(conf, sharedContext,
							assignment.getTopologyId(), supervisorId, port,
							workerId, assignment);

		}

以上就是Jstorm提交一个topology的过程,这两篇文章只是给出了一条主线,具体的代码逻辑并未详细给出,后续会不断完善,同时关于Jstrom的调度器后续也会给出详细分析

时间: 2024-12-18 08:36:17

流式计算-Jstorm提交Topology过程(下)的相关文章

流式计算-Jstorm提交Topology过程

Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt.bolt和bolt之间的关系,它可以被提交到Jstorm集群. 本文以Jstorm自带的SequenceTopology简单介绍一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及具体业务, 1. SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.Se

流式计算形态下的大数据分析

1 介 绍 1.1 流式计算介绍 流式大数据计算主要有以下特征: 1)实时性.流式大数据不仅是实时产生的,也是要求实时给出反馈结果.系统要有快速响应能力,在短时间内体现出数据的价值,超过有效时间后数据的价值就会迅速降低. 2)突发性.数据的流入速率和顺序并不确定,甚至会有较大的差异.这要求系统要有较高的吞吐量,能快速处理大数据流量. 3)易失性.由于数据量的巨大和其价值随时间推移的降低,大部分数据并不会持久保存下来,而是在到达后就立刻被使用并丢弃.系统对这些数据有且仅有一次计算机会. 4)无限性

流式计算(二)-Kafka Stream

前面说了Java8的流,这里还说流处理,既然是流,比如水流车流,肯定得有流的源头,源可以有多种,可以自建,也可以从应用端获取,今天就拿非常经典的Kafka做源头来说事,比如要来一套应用日志实时分析框架,或者是高并发实时流处理框架,正是Kafka的拿手好戏. 环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1 难度:新手--战

大数据技术(1)流式计算与Storm

2011年在海量数据处理领域,Hadoop是人们津津乐道的技术,Hadoop不仅可以用来存储海量数据,还以用来计算海量数据.因为其高吞吐.高可靠等特点,很多互联网公司都已经使用Hadoop来构建数据仓库,高频使用并促进了Hadoop生态圈的各项技术的发展.一般来讲,根据业务需求,数据的处理可以分为离线处理和实时处理,在离线处理方面Hadoop提供了很好的解决方案,但是针对海量数据的实时处理却一直没有比较好的解决方案. 就在人们翘首以待的时间节点,storm横空出世,与生俱来的分布式.高可靠.高吞

流式计算(五)-Flink核心概念

一手资料,完全来自官网,直接参考英文过来的,并加了一些自己的理解,希望能让看官君了解点什么,足矣. 环境:Flink1.9.1 难度:新手--战士--老兵--大师 目标: 理解Flink的计算模型 认识各重要组件 说明: 本篇作为前两篇的补充内容,算是理论篇 步骤: 01-Flink编程模型 Flink的流计算整体来看都是按照Source -> Transformation -> Sink三步走,即获取流源 -> 进行转换 -> 汇聚(Sink),但“转换 (Transformat

流式计算框架-STORM简介

在当前的数据分析领域,对实时数据的计算需求越来越强烈,在此领域,出现了各类计算框架,如:Storm.S4等.目前本土公司对这些流式计算框架的应用也比较广泛,但苦于相关文档英文居多,缺少成系列且与官方相对应的中文手册.本系列试图从官方文档翻译入手,给大家呈现较为完备的中文资料,同时也是对自身知识的总结沉淀. 在这个系列博客中,我们选择了twitter的Storm框架,原因很简单,因为本人长期使用的就是该框架,咱们先从简介开始. Apache Storm是一个免费.开源.分布式的实时计算系统.相对于

什么是流式计算?

一.流式计算的背景 在日常生活中,我们通常会先把数据存储在一张表中,然后再进行加工.分析,这里就涉及到一个时效性的问题.如果我们处理以年.月为单位的级别的数据,那么多数据的实时性要求并不高:但如果我们处理的是以天.小时,甚至分钟为单位的数据,那么对数据的时效性要求就比较高.在第二种场景下,如果我们仍旧采用传统的数据处理方式,统一收集数据,存储到数据库中,之后在进行分析,就可能无法满足时效性的要求. 二.流式计算与批量计算 大数据的计算模式主要分为批量计算(batch computing).流式计

流式计算(一)-Java8Stream

大约各位看官君多少也听说了Storm/Spark/Flink,这些都是大数据流式处理框架.如果一条手机组装流水线上不同的人做不同的事,有的装电池,有的装屏幕,直到最后完成,这就是典型的流式处理.如果手机组装是先全部装完电池,再交给装屏幕的组,直到完成,这就是旧式的集合式处理.今天,就来先说说JDK8中的流,虽然不是很个特新鲜的话题,但是一个很好的开始,因为——思想往往比细节重要! 准备: Idea2019.03/Gradle5.6.2/JDK11.0.4/Lambda 难度:新手--战士--老兵

Strom流式计算

序言 主要学习方向 Kafka 分布式消息系统 Redis 缓存数据库 Storm 流式计算 1.Storm 的基本概念 2.Storm 的应用场景 3.Storm 和Hadoop的对比 4.Storm 集群的安装的linux环境准备 5.zookeeper集群搭建 6.Storm 集群搭建 7.Storm 配置文件配置项讲解 8.集群搭建常见问题解决 9.Storm 常用组件和编程 API:Topology. Spout.Bolt 10.Storm分组策略(stream groupings)