Storm系列(三)Topology提交过程

提交示例代码:

1 public static void main(String[] args) throws Exception {

2 TopologyBuilder builder = new TopologyBuilder();

3 builder.setSpout("random", new RandomWordSpout(), 2);

4 builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");

5 builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));

6 Config conf = new Config();

7 conf.setNumWorkers(4);// 设置启动4个Worker

8 conf.setNumAckers(1); // 设置一个ack线程

9 conf.setDebug(true); // 设置打印所有发送的消息及系统消息

10 StormSubmitter.submitTopology("test", conf, builder.createTopology());

11 }

1、构建 TopologyBuilder 对象 builder,主要用于对各个组件(bolt、spout)进行配置;

TopologyBuilder主要属性字段定义如下:

public class TopologyBuilder {

// 所提交Topolog中所有的bolt将放入到_bolts中

private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();

// 所提交Topolog中所有的spout将放入到_spouts中

private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();

// 所提交Topolog中所有的spout和bolt都将放入_commons中

private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

....................................

}

2、以上提交代码中第三行,配置了一个id值为random,IRichSpout对象为RandomWordSpout,而并行度为2(两个线程里面跑两个任务)的spout;

// setSpout函数实现源码

public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {

validateUnusedId(id);

initCommon(id, spout, parallelism_hint);

_spouts.put(id, spout);

return new SpoutGetter(id);

}

validateUnusedId:检测输入的id是不是唯一,若已经存在将抛出异常;

initCommon:构建ComponentCommon对象并进行相应的初始化,最后放入到_commons(以上TopologyBuilder中定义的Map);

initCommon函数实现源码:

private void initCommon(String id, IComponent component, Number parallelism) {

ComponentCommon common = new ComponentCommon();

// 设置消息流的来源及分组方式

common.set_inputs(new HashMap<GlobalStreamId, Grouping>());

if(parallelism!=null)

// 设置并行度

common.set_parallelism_hint(parallelism.intValue());

Map conf = component.getComponentConfiguration();

if(conf!=null)

// 设置组件的配置参数

common.set_json_conf(JSONValue.toJSONString(conf));

_commons.put(id, common);

}

在ComponentCommon中主要对以下四个属性字段进行设置:

// GlobalStreamId:确定消息来源,其中componentId表示所属组件,streamId为消息流的标识符;

// Grouping:确定消息分组方式;

private Map<GlobalStreamId,Grouping> inputs;

// StreamInfo表示输出的字段列表及是否为直接流

private Map<String,StreamInfo> streams;

private int parallelism_hint; // 设置并行度

private String json_conf; // 其它配置参数设置(必须为JSON格式)

3、SpoutGetter

实现源码:

protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {

public SpoutGetter(String id) {

super(id);

}

}

ConfigGetter、SpoutGetter的实现都是在TopologyBuilder中, ConfigGetter作用:设置程序中的配置项,覆盖默认的配置项,且配置项的格式为为JSON(本质上是改变对应ComponentCommon对象中json_conf的值);

4、提交示例代码中的第四行定义了一个id为transfer,IRichSpout对象为TransferBolt,并行度为4的bolt

setBolt实现源码:

public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {

validateUnusedId(id);

initCommon(id, bolt, parallelism_hint);

_bolts.put(id, bolt);

return new BoltGetter(id);

}

设置Bolt的函数与设置Spout函数的实现唯一的区别在返回结果;

BoltGetter实现部分源码:

protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {

private String _boltId;

public BoltGetter(String boltId) {

super(boltId);

_boltId = boltId;

}

public BoltDeclarer shuffleGrouping(String componentId) {

return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);

}

public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {

return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);

}

public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {

return grouping(componentId, streamId, Grouping.fields(fields.toList()));

}

public BoltDeclarer shuffleGrouping(String componentId, String streamId) {

return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));

}

private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {

_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);

return this;

}

.........................................

}

BoltGetter继承至ConfigGetter并实现了BoltDeclarer接口,并重载了BoltDeclarer(InputDeclarer)中各种分组方式(如:fieldsGrouping、shuffleGrouping),分组方式的实现本质上是在_commons中通过对用的boltId找到对应的ComponentCommon对象,对inputs属性进行设置;

5、通过以上几步完成了bolt与spout的配置(对应提交示例代码中的2~5行),6~9行是对运行环境的配置,10行用于向集群提交执行任务,builder.createTopology用于构建StormTopology对象.

createTopology实现源码:

public StormTopology createTopology() {

Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();

Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();

for(String boltId: _bolts.keySet()) {

IRichBolt bolt = _bolts.get(boltId);

ComponentCommon common = getComponentCommon(boltId, bolt);

boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));

}

for(String spoutId: _spouts.keySet()) {

IRichSpout spout = _spouts.get(spoutId);

ComponentCommon common = getComponentCommon(spoutId, spout);

spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));

}

return new StormTopology(spoutSpecs,

boltSpecs,

new HashMap<String, StateSpoutSpec>());

}

以上源码实现中主要做了两件事:

  • 通过boltId从_bolts中获取到对应的bolt对象,再通过getComponentCommon方法设置对应ComponentCommon对象的streams(输出的字段列表及是否为直接流)属性值,最后将bolt和common一起 放入到boltSpecs集合中。
  • 通过spoutId从_spouts中获取到对应的spout对象,再通过getComponentCommon方法设置对应ComponentCommon对象的streams(输出的字段列表及是否为直接流)属性值,最后将spout和common一起 放入到boltSpecs集合中。
  • 通过以上两步使所设置的所有组件都封装到StormTopology对象中,最后提交的到集群中运行。
时间: 2024-12-16 08:28:08

Storm系列(三)Topology提交过程的相关文章

Storm系列(四)Topology校验过程

功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务. 实现源码: 1  (^void submitTopology 2          [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] 3          (.submitTopo

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

storm源码分析之topology提交过程

storm集群上运行的是一个个topology,一个topology是spouts和bolts组成的图.当我们开发完topology程序后将其打成jar包,然后在shell中执行storm jar xxxxxx.jar xxxxxxxClass就可以将jar包上传到storm集群的nimbus上,并执行topology.本文主要分析下topology的jar包是如何上传到nimbus上的.首先我们从storm的jar命令入手,jar命令的实现位于storm根目录的bin/storm文件里.定义如

Storm系列(五)Nimbus启动过程

启动流程图   mk-assignments 功能:对当前集群中所有Topology进行新一轮的任务调度. 实现源码路径: \apache-storm-0.9.4\storm-core\src\clj\backtype\storm\daemon\ nimbus.clj 方法原型: 1  defnk mk-assignments [nimbus :scratch-topology-id nil]   方法说明: 参数nimbus为nimbus-data对象,scratch-topology-id为

Storm 系列(三)Storm 集群部署和配置

Storm 系列(三)Storm 集群部署和配置 本章中主要介绍了 Storm 的部署过程以及相关的配置信息.通过本章内容,帮助读者从零开始搭建一个 Storm 集群.相关的过程和主要的配置选项是 Storm 的运维人员需要重点关注的,对部署和配置选项不感兴趣的读者,可以跳过本章. 在开始 Storm 之旅前,我们先看一下 Storm 部署和配置的相关信息,并提交一个 Topology,了解 Storm 的基本原理.Storm 的部署模式包括单机和集群环境,同时在向 Storm 环境中提交 To

Storm 系列(六)—— Storm 项目三种打包方式对比分析

一.简介 在将 Storm Topology 提交到服务器集群运行时,需要先将项目进行打包.本文主要对比分析各种打包方式,并将打包过程中需要注意的事项进行说明.主要打包方式有以下三种: 第一种:不加任何插件,直接使用 mvn package 打包: 第二种:使用 maven-assembly-plugin 插件进行打包: 第三种:使用 maven-shade-plugin 进行打包. 以下分别进行详细的说明. 二.mvn package 2.1 mvn package的局限 不在 POM 中配置

Storm Topology 提交到集群

问题:当完成Topology各个组件的定义之后(写好了**Spout.java 和 **Bolt.java)如何将Topology提交到集群中去? 参考:http://www.cnblogs.com/fxjwind/archive/2013/06/05/3119056.html 1,在**Topology.java中的main方法 setSpout.setBolt 之后通过TopologyBuilder.createTopology()创建Topology对象,在**Topology的main方

storm源码剖析(3):topology启动过程

storm的topology启动过程是执行strom jar topology1.jar MAINCLASS ARG1 ARG2 鉴于前面已经分析了脚本的解析过程,现在重点分析topology1.jar的执行. 以storm-starter中的ExclamationTopology为例,来进行剖析: public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { Outpu

1.Storm 安装手册和提交Topology

目录 (一)搭建单机storm集群 一.下载所需要的资源 二.资源解压 三.安装JDK 四.安装依赖的库文件 五.安装ZMQ 六.安装jzmq 七.启动zookeeper 八.启动storm 九.在UI端访问 十.编译storm-start jar包 十一.storm-starter源码导出 十二.提交Topology (二)搭建多机storm集群 一.基本设置 二.修改配置文件 三.启动集群 四.在UI端口访问 五.提交Topology 附一 Storm 安装手册 (一)搭建单机storm集群