Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt、bolt和bolt之间的关系,它可以被提交到Jstorm集群。
本文以Jstorm自带的SequenceTopology简单介绍一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及具体业务,
1、 SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.SetBuilder(TopologyBuilder builder, Map conf),该方法主要根据配置文件,使用TopologyBuilder构造Topology的spout和bolt,以及spout和bolt之间的关系
2、TopologyBuilder构造好Topology之后,通过Jstorm Client的StormSubmitter.submitTopology(streamName, conf,builder.createTopology())提交Topology到Jstorm集群,
3、在StormSubmitter.submitTopology方法中,首先会对配置项进行检查、然后将Topology自己的配置项和Jstorm的配置项组装成一个大的Map,之后上传用户在命令行提交的Jar包,然后通过NimbusClient 的submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) 方法将Topology提交到Jstorm集群,其核心代码如下:
if (!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException( "Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); putUserInfo(conf, stormConf); String serConf = JSON.toJSONString(stormConf); if (localNimbus != null) { localNimbus.submitTopology(name, null, serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); if (topologyNameExists(conf, name)) {//检查名字是否重复,Jstorm要求每个topology名称必须唯一 throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } submitJar(conf);//上传Jar包到ZK client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);//通过Thrift将topology提交到集群4、NimbusClient提交之后,NimbusSever通过com.alibaba.jstorm.daemon.nimbus.ServiceHandler.submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)处理接收到的topology,其具体逻辑如下(代码已经精简)
public void submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException { //首先检查topology是否已经存在 checkTopologyActive(data, topologyname, false); //生成topology的唯一标识 int counter = data.getSubmittedCount().incrementAndGet(); String topologyId = topologyname + "-" + counter + "-" + TimeUtils.current_time_secs(); try { //反序列化topology配置项 Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils .from_json(jsonConf); if (serializedConf == null) { LOG.warn("Failed to serialized Configuration"); throw new InvalidTopologyException( "Failed to serilaze topology configuration"); } //将topology的名称和ID增加到配置项中 serializedConf.put(Config.TOPOLOGY_ID, topologyId); serializedConf.put(Config.TOPOLOGY_NAME, topologyname); Map<Object, Object> stormConf; stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); Map<Object, Object> totalStormConf = new HashMap<Object, Object>( conf); totalStormConf.putAll(stormConf); StormTopology normalizedTopology = NimbusUtils.normalizeTopology( stormConf, topology); // this validates the structure of the topology Common.validate_basic(normalizedTopology, totalStormConf, topologyId); // don't need generate real topology, so skip Common.system_topology // Common.system_topology(totalStormConf, topology); StormClusterState stormClusterState = data.getStormClusterState(); // 创建topology在ZK上的目录 setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology); // 为每一个spout或者bolt生成Task,并在ZK上创建相应的task目录<span style="font-family: Arial, Helvetica, sans-serif;">/ZK/tasks/topoologyId/xxx</span> setupZkTaskInfo(conf, topologyId, stormClusterState); // 进行任务分配 TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyname); assignEvent.setOldStatus(Thrift .topologyInitialStatusToStormStatus(options .get_initial_status())); TopologyAssign.push(assignEvent); LOG.info("Submit for " + topologyname + " with conf " + serializedConf); boolean isSuccess = assignEvent.waitFinish(); if (isSuccess == true) { LOG.info("Finish submit for " + topologyname); }