流式计算-Jstorm提交Topology过程

Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt、bolt和bolt之间的关系,它可以被提交到Jstorm集群。

本文以Jstorm自带的SequenceTopology简单介绍一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及具体业务,

1、 SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.SetBuilder(TopologyBuilder builder, Map conf),该方法主要根据配置文件,使用TopologyBuilder构造Topology的spout和bolt,以及spout和bolt之间的关系

2、TopologyBuilder构造好Topology之后,通过Jstorm Client的StormSubmitter.submitTopology(streamName, conf,builder.createTopology())提交Topology到Jstorm集群,

3、在StormSubmitter.submitTopology方法中,首先会对配置项进行检查、然后将Topology自己的配置项和Jstorm的配置项组装成一个大的Map,之后上传用户在命令行提交的Jar包,然后通过NimbusClient 的submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) 方法将Topology提交到Jstorm集群,其核心代码如下:

if (!Utils.isValidConf(stormConf)) {
			throw new IllegalArgumentException(
					"Storm conf is not valid. Must be json-serializable");
		}
		stormConf = new HashMap(stormConf);
		stormConf.putAll(Utils.readCommandLineOpts());
		Map conf = Utils.readStormConfig();
		conf.putAll(stormConf);
		putUserInfo(conf, stormConf);
		String serConf = JSON.toJSONString(stormConf);
		if (localNimbus != null) {
			localNimbus.submitTopology(name, null, serConf, topology);
		} else {
			NimbusClient client = NimbusClient.getConfiguredClient(conf);
			if (topologyNameExists(conf, name)) {//检查名字是否重复,Jstorm要求每个topology名称必须唯一
					throw new RuntimeException("Topology with name `" + name
							+ "` already exists on cluster");
				}
			submitJar(conf);//上传Jar包到ZK
			client.getClient().submitTopologyWithOpts(name, path,
								serConf, topology, opts);//通过Thrift将topology提交到集群
					

4、NimbusClient提交之后,NimbusSever通过com.alibaba.jstorm.daemon.nimbus.ServiceHandler.submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)处理接收到的topology,其具体逻辑如下(代码已经精简)

public void submitTopologyWithOpts(String topologyname,
			String uploadedJarLocation, String jsonConf,
			StormTopology topology, SubmitOptions options)
			throws AlreadyAliveException, InvalidTopologyException,
			TopologyAssignException, TException {
		//首先检查topology是否已经存在
		checkTopologyActive(data, topologyname, false);
		//生成topology的唯一标识
		int counter = data.getSubmittedCount().incrementAndGet();
		String topologyId = topologyname + "-" + counter + "-"
				+ TimeUtils.current_time_secs();
		try {
			//反序列化topology配置项
			Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils
					.from_json(jsonConf);
			if (serializedConf == null) {
				LOG.warn("Failed to serialized Configuration");
				throw new InvalidTopologyException(
						"Failed to serilaze topology configuration");
			}
			//将topology的名称和ID增加到配置项中
			serializedConf.put(Config.TOPOLOGY_ID, topologyId);
			serializedConf.put(Config.TOPOLOGY_NAME, topologyname);
			Map<Object, Object> stormConf;
			stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
					topology);
			Map<Object, Object> totalStormConf = new HashMap<Object, Object>(
					conf);
			totalStormConf.putAll(stormConf);
			StormTopology normalizedTopology = NimbusUtils.normalizeTopology(
					stormConf, topology);

			// this validates the structure of the topology
			Common.validate_basic(normalizedTopology, totalStormConf,
					topologyId);
			// don't need generate real topology, so skip Common.system_topology
			// Common.system_topology(totalStormConf, topology);
			StormClusterState stormClusterState = data.getStormClusterState();
			// 创建topology在ZK上的目录
			setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
					normalizedTopology);

			// 为每一个spout或者bolt生成Task,并在ZK上创建相应的task目录<span style="font-family: Arial, Helvetica, sans-serif;">/ZK/tasks/topoologyId/xxx</span>
			setupZkTaskInfo(conf, topologyId, stormClusterState);
			// 进行任务分配
			TopologyAssignEvent assignEvent = new TopologyAssignEvent();
			assignEvent.setTopologyId(topologyId);
			assignEvent.setScratch(false);
			assignEvent.setTopologyName(topologyname);
			assignEvent.setOldStatus(Thrift
					.topologyInitialStatusToStormStatus(options
							.get_initial_status()));

			TopologyAssign.push(assignEvent);
			LOG.info("Submit for " + topologyname + " with conf "
					+ serializedConf);

			boolean isSuccess = assignEvent.waitFinish();
			if (isSuccess == true) {
				LOG.info("Finish submit for " + topologyname);
			} 
时间: 2024-10-12 21:18:39

流式计算-Jstorm提交Topology过程的相关文章

流式计算-Jstorm提交Topology过程(下)

紧接上篇流式计算-Jstorm提交Topology过程(上), 5.上篇任务已经ServiceHandler.submitTopologyWithOpts()方法,在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中,具体代码如下: TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTo

流式计算(二)-Kafka Stream

前面说了Java8的流,这里还说流处理,既然是流,比如水流车流,肯定得有流的源头,源可以有多种,可以自建,也可以从应用端获取,今天就拿非常经典的Kafka做源头来说事,比如要来一套应用日志实时分析框架,或者是高并发实时流处理框架,正是Kafka的拿手好戏. 环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1 难度:新手--战

大数据技术(1)流式计算与Storm

2011年在海量数据处理领域,Hadoop是人们津津乐道的技术,Hadoop不仅可以用来存储海量数据,还以用来计算海量数据.因为其高吞吐.高可靠等特点,很多互联网公司都已经使用Hadoop来构建数据仓库,高频使用并促进了Hadoop生态圈的各项技术的发展.一般来讲,根据业务需求,数据的处理可以分为离线处理和实时处理,在离线处理方面Hadoop提供了很好的解决方案,但是针对海量数据的实时处理却一直没有比较好的解决方案. 就在人们翘首以待的时间节点,storm横空出世,与生俱来的分布式.高可靠.高吞

流式计算框架-STORM简介

在当前的数据分析领域,对实时数据的计算需求越来越强烈,在此领域,出现了各类计算框架,如:Storm.S4等.目前本土公司对这些流式计算框架的应用也比较广泛,但苦于相关文档英文居多,缺少成系列且与官方相对应的中文手册.本系列试图从官方文档翻译入手,给大家呈现较为完备的中文资料,同时也是对自身知识的总结沉淀. 在这个系列博客中,我们选择了twitter的Storm框架,原因很简单,因为本人长期使用的就是该框架,咱们先从简介开始. Apache Storm是一个免费.开源.分布式的实时计算系统.相对于

流式计算形态下的大数据分析

1 介 绍 1.1 流式计算介绍 流式大数据计算主要有以下特征: 1)实时性.流式大数据不仅是实时产生的,也是要求实时给出反馈结果.系统要有快速响应能力,在短时间内体现出数据的价值,超过有效时间后数据的价值就会迅速降低. 2)突发性.数据的流入速率和顺序并不确定,甚至会有较大的差异.这要求系统要有较高的吞吐量,能快速处理大数据流量. 3)易失性.由于数据量的巨大和其价值随时间推移的降低,大部分数据并不会持久保存下来,而是在到达后就立刻被使用并丢弃.系统对这些数据有且仅有一次计算机会. 4)无限性

流式计算(五)-Flink核心概念

一手资料,完全来自官网,直接参考英文过来的,并加了一些自己的理解,希望能让看官君了解点什么,足矣. 环境:Flink1.9.1 难度:新手--战士--老兵--大师 目标: 理解Flink的计算模型 认识各重要组件 说明: 本篇作为前两篇的补充内容,算是理论篇 步骤: 01-Flink编程模型 Flink的流计算整体来看都是按照Source -> Transformation -> Sink三步走,即获取流源 -> 进行转换 -> 汇聚(Sink),但“转换 (Transformat

Strom流式计算

序言 主要学习方向 Kafka 分布式消息系统 Redis 缓存数据库 Storm 流式计算 1.Storm 的基本概念 2.Storm 的应用场景 3.Storm 和Hadoop的对比 4.Storm 集群的安装的linux环境准备 5.zookeeper集群搭建 6.Storm 集群搭建 7.Storm 配置文件配置项讲解 8.集群搭建常见问题解决 9.Storm 常用组件和编程 API:Topology. Spout.Bolt 10.Storm分组策略(stream groupings)

大数据读书笔记(2)-流式计算

早期和当前的"流式计算"系统分别称为"连续查询处理类"和"可扩展数据流平台类"计算系统. 流式计算系统的特点: 1)低延迟 2)极佳的系统容错性 3)极强的系统扩展能力 4)灵活强大的应用逻辑表达能力 目前典型的流式计算系统: S4,storm,millwheel,samza,d-stream,hadoop online,mupd8等. 其中storm和millwheel是各方面比较突出的. 流式计算系统架构: 常见的流式计算系统架构分为两种:主

什么是流式计算?

一.流式计算的背景 在日常生活中,我们通常会先把数据存储在一张表中,然后再进行加工.分析,这里就涉及到一个时效性的问题.如果我们处理以年.月为单位的级别的数据,那么多数据的实时性要求并不高:但如果我们处理的是以天.小时,甚至分钟为单位的数据,那么对数据的时效性要求就比较高.在第二种场景下,如果我们仍旧采用传统的数据处理方式,统一收集数据,存储到数据库中,之后在进行分析,就可能无法满足时效性的要求. 二.流式计算与批量计算 大数据的计算模式主要分为批量计算(batch computing).流式计