JStorm之Topology提交客户端

一个topology包含一或多个spout bolt,spout负责在数据源获得数据并发送给bolt,每个bolt负责做完处理后发给下一个bolt。通常topology的创建是由TopologyBuilder来创建的,该组件会记录包含哪些spout bolt,并做相应验证:各组件是否有id冲突,校验方法如下:

private void validateUnusedId(String id) {
	if (_bolts.containsKey(id)) {
		throw new IllegalArgumentException(
				"Bolt has already been declared for id " + id);
	}
	if (_spouts.containsKey(id)) {
		throw new IllegalArgumentException(
				"Spout has already been declared for id " + id);
	}
	if (_stateSpouts.containsKey(id)) {
		throw new IllegalArgumentException(
				"State spout has already been declared for id " + id);
	}
} <span style="font-family: 'Courier New'; background-color: rgb(255, 255, 255);">   </span>

TopologyBuilder会保存各个组件到相应的数据结构中,数据结构如下:

public class TopologyBuilder {
	// 存放所有的bolt
	private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
	// 存放所有的spout
	private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
	//存放各组件配置信息
	private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
	..........
}

组件配置信息存放方法如下

private void initCommon(String id, IComponent component, Number parallelism) {
	ComponentCommon common = new ComponentCommon();
	common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
	if (parallelism != null)
		common.set_parallelism_hint(parallelism.intValue());
	else {
	    common.set_parallelism_hint(Integer.valueOf(1));
	}
	Map conf = component.getComponentConfiguration();
	if (conf != null)
		common.set_json_conf(Utils.to_json(conf));
	_commons.put(id, common);
}

信息保存好后,在topology阶段builder会根据这些信息创建一个StormTopology实例,然后由StormSubmitter.submitTopology进行提交,该阶段分两步:1、上传jar文件 2、提交作业

public static void submitTopology(String name, Map stormConf,
		StormTopology topology, SubmitOptions opts)
		throws AlreadyAliveException, InvalidTopologyException {
	//读取配置信息并进行相关校验
	if (!Utils.isValidConf(stormConf)) {
		throw new IllegalArgumentException(
				"Storm conf is not valid. Must be json-serializable");
	}
	stormConf = new HashMap(stormConf);
	stormConf.putAll(Utils.readCommandLineOpts());
	Map conf = Utils.readStormConfig();
	conf.putAll(stormConf);
	putUserInfo(conf, stormConf);
	try {
		String serConf = Utils.to_json(stormConf);
		if (localNimbus != null) {
			LOG.info("Submitting topology " + name + " in local mode");
			localNimbus.submitTopology(name, null, serConf, topology);
		} else {
			NimbusClient client = NimbusClient.getConfiguredClient(conf);
			//校验集群中有无同名topology在运行
			if (topologyNameExists(conf, name)) {
				throw new RuntimeException("Topology with name `" + name
						+ "` already exists on cluster");
			}
			//上传jar文件,下面会详细解释这个方法
			submitJar(conf);
			try {
				LOG.info("Submitting topology " + name
						+ " in distributed mode with conf " + serConf);
		  //提交topology,会调用服务端ServiceHandler的submitTopology方法,开始启动这个topology,那就属于服务端的事情了
				if (opts != null) {
					client.getClient().submitTopologyWithOpts(name, path,
							serConf, topology, opts);
				} else {
					// this is for backwards compatibility
					client.getClient().submitTopology(name, path, serConf,
							topology);
				}
			} finally {
				client.close();
			}
		}
		LOG.info("Finished submitting topology: " + name);
	} catch (InvalidTopologyException e) {
		.......
	}
}

jar文件上传包含两部分,jar文件本身和其依赖的库文件都会被传到服务端,默认上传buf大小为512K,可以通过nimbus.thrift.max_buffer_size来调整buf大小,服务端保存的目录结构如下:

[[email protected] ~]$tree /home/hongmin.lhm/jstorm_data/nimbus/inbox/
/home/hongmin.lhm/jstorm_data/nimbus/inbox/
`-- 7c1b7d1e-9134-4ed8-b664-836271b49bd3
    `-- stormjar-7c1b7d1e-9134-4ed8-b664-836271b49bd3.jar
private static void submitJar(Map conf) {
	if (submittedJar == null) {
		NimbusClient client = NimbusClient.getConfiguredClient(conf);
		try {
			LOG.info("Jar not uploaded to master yet. Submitting jar...");
			String localJar = System.getProperty("storm.jar");
			path = client.getClient().beginFileUpload();
			String[] pathCache = path.split("/");
			String uploadLocation = path + "/stormjar-"
					+ pathCache[pathCache.length - 1] + ".jar";
			List<String> lib = (List<String>) conf
					.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
			Map<String, String> libPath = (Map<String, String>) conf
					.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
			if (lib != null && lib.size() != 0) {
				for (String libName : lib) {
					String jarPath = path + "/" + libName;
					client.getClient().beginLibUpload(jarPath);
					submitJar(conf, libPath.get(libName), jarPath, client);
				}
				if (localJar != null)
					submittedJar = submitJar(conf, localJar,
							uploadLocation, client);
			} else {
				submittedJar = submitJar(conf, localJar, uploadLocation,
						client);
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		} finally {
			client.close();
		}
	} else {
		LOG.info("Jar already uploaded to master. Not submitting jar.");
	}
}

时间: 2024-10-19 13:36:42

JStorm之Topology提交客户端的相关文章

JStorm之Topology提交服务端

topology提交前会先判断集群中是否存在同名作业,如果存在在提交失败,如果没有则会增加集群提交次数SubmittedCount,每次提交成功,该变量都会加1,然后会为该作业分配一个id,生成规则如下: public static String TopologyNameToId(String topologyName, int counter) { return topologyName + "-" + counter + "-" + TimeUtils.curr

Storm Topology 提交到集群

问题:当完成Topology各个组件的定义之后(写好了**Spout.java 和 **Bolt.java)如何将Topology提交到集群中去? 参考:http://www.cnblogs.com/fxjwind/archive/2013/06/05/3119056.html 1,在**Topology.java中的main方法 setSpout.setBolt 之后通过TopologyBuilder.createTopology()创建Topology对象,在**Topology的main方

JStorm之Topology调度

topology在服务端提交过程中,会经过一系列的验证和初始化:TP结构校验.创建本地目录并拷贝序列化文件jar包.生成znode用于存放TP和task等信息,最后一步才进行任务分配,如下图: 提交主函数位于ServiceHandler.java中 private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopology

storm源码分析之topology提交过程

storm集群上运行的是一个个topology,一个topology是spouts和bolts组成的图.当我们开发完topology程序后将其打成jar包,然后在shell中执行storm jar xxxxxx.jar xxxxxxxClass就可以将jar包上传到storm集群的nimbus上,并执行topology.本文主要分析下topology的jar包是如何上传到nimbus上的.首先我们从storm的jar命令入手,jar命令的实现位于storm根目录的bin/storm文件里.定义如

Storm系列(三)Topology提交过程

提交示例代码: 1 public static void main(String[] args) throws Exception { 2 TopologyBuilder builder = new TopologyBuilder(); 3 builder.setSpout("random", new RandomWordSpout(), 2); 4 builder.setBolt("transfer", new TransferBolt(), 4).shuffle

2 storm的topology提交执行

本博文的主要内容有 .storm单机模式,打包,放到storm集群 .Storm的并发机制图 .Storm的相关概念 .附PPT 打包,放到storm集群去.我这里,是单机模式下的storm. weekend110-storm  ->   Export   ->   JAR file   -> 当然,这边,肯定是,准备工作已经做好了.如启动了zookeeper,storm集群. 上传导出的jar sftp> cd /home/hadoop/ sftp> put c:/d de

流式计算-Jstorm提交Topology过程

Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt.bolt和bolt之间的关系,它可以被提交到Jstorm集群. 本文以Jstorm自带的SequenceTopology简单介绍一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及具体业务, 1. SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.Se

[转]JStorm介绍

一.简介 Storm是开源的分布式容错实时计算系统,目前被托管在GitHub上,遵循 Eclipse Public License 1.0.最初由BackType开发,现在已被Twitter收入麾下.Storm最新版本是Storm 0.9,核心采用Clojure实现.Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息:Storm也可被用于“连续计算”(continuous computation),对数据流做连续处理,在计算时就将结果以流的形式输出给用户:它还可被

JStorm学习

一.简介 JStorm是一个分布式实时计算引擎.JStorm是一个类似于Hadoop MapReduce的系统,用户按照指定的接口实现一个任务,然后将这个任务交给JStorm系统,JStorm将这个任务跑起来,并按7*24小时运行.如果中间一个worker发生了意外故障,调度器立即分配一个新的worker来替换这个失效的worker. 从应用的角度上看,JStorm是一种遵循某种编程规范的分布式应用:从系统的角度上看,JStorm是一套类似MapReduce的调度系统:从数据角度上看,JStor