Storm系列(四)Topology校验过程

功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务.

实现源码:

(^void submitTopology
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
        (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                 (SubmitOptions. TopologyInitialStatus/ACTIVE)))

从以上源码中看出submitTopology内部是对submitTopologyWithOpts方法的调用。

submitTopologyWithOpts函数原型如下:

^void submitTopologyWithOpts
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
         ^SubmitOptions submitOptions]

在submitTopologyWithOpts中主要做了以下几件事情:

  1. 校验submitOptions参数不能为空。
  2. 检查storm-name中是否包含非法字符。
  3. 校验storm-name与正在运行的Topology是否有重名,重名将造成冲突。
  4. 将nimbus(nimbus-data类型)中的submitted-count已提交Topology计数字段加1。
  5. 为所提交的Topology创建唯一的storm-id(topology-id),格式:<storm-name>-<submitted-count>-<当前时间>
  6. 通过normalize-conf获取提交的Topology的Storm配置,首先将参数serializedConf进行反序列化,然后加入storm-name,storm-id等。
  7. 将Storm默认的配置(conf)与第六步得到的Storm配置进行合并,合并原则为两份配置中重复的配置项以第六步中的配置为准。
  8. 调用normalize-topology计算提交的Topology中每个组件并行度及更新TOPOLOGY_TASKS配置项.
  9. 获取nimbus(nimbus-data类型)中storm-cluster-state对象。
  10. 调用System-topology!方法对Topology结构进行校验。
  11. 获取nimbus中的submit-lock锁。
  12. 调用setup-storm-code为Topology创建对应的本地文件夹、复杂jar并写入序列化后的Storm配置项和Topology信息.
  13. 调用setup-hearbeats!为Topology在Zookeeper中创建心跳路径,/storm/workerbeats/topology-id.
  14. 定义一个从thrift-status到keyword-status的哈希表,该哈希表用来将传入的submitOptions中的thrift-status转化为对应的keyword-status.
  15. 调用start-storm设置stormBase,它在Zookeeper中路径是/storm/storms/<topology-id>,stormBase的信息将做为该路径所对应的存储值。
  16. 调用mk-assignments为所提交的Topology分配资源.

normalize-topology

实现源码:

(defn normalize-topology [storm-conf ^StormTopology topology]
  (let [ret (.deepCopy topology)]
    (doseq [[_ component] (all-components ret)]
      (.set_json_conf
        (.get_common component)
        (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
             (merge (component-conf component))
             to-json )))
ret ))

实现说明:

  • 调用deepCopy对topology进行深度拷贝,赋值给ret.
  • 遍历topology(ret)所有组件,调用component-parallelism更新组件配置中的TOPOLOGY_TASKS信息。

component-parallelism实现源码(计算组件并行度):

(defn- component-parallelism [storm-conf component]
  (let [storm-conf (merge storm-conf (component-conf component))
        num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
        max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
        ]
    (if max-parallelism
      (min max-parallelism num-tasks)
      num-tasks)))

实现说明:

  • 将Topology配置信息与组件(component)配置信息进行合并,两者存在重复的配置项时以组件的配置项为准。
  • 计算组件并行度(num-tasks),若果配置storm-conf中配置了TOPOLOGY-TASKS信息,就以该配置值做为组件的并行度,否则通过调用num-start-executors获取用户对组件设置的并行度做为num-tasks.
  • 获取storm-conf配置中TOPOLOGY-MAX-TASK-PARALLELISM配置项的值。
  • 返回TOPOLOGY-MAX-TASK-PARALLELISM与num-tasks较小的值做为组件的并行度。
TopologyBuilder builder = new TopologyBuilder();
// 4对应对用用户设置的组件并行度,10对应TOPOLOGY-TASK配置项的值
builder.setBolt("transfer"new TransferBolt(), 4).shuffleGrouping("random").setNumTasks(6); Config conf = new Config();
// 8对应 TOPOLOGY-MAX-TASK-PARALLELISM配置项的值
Conf.setMaxTaskParallelism(8);

 

system-topology!

功能:

验证用户提交的Topology,同时为提交的topology添加一些系统组件和流。

实现源码:

(defn system-topology! [storm-conf ^StormTopology topology]
  (validate-basic! topology)
  (let [ret (.deepCopy topology)]
    (add-acker! storm-conf ret)
    (add-metric-components! storm-conf ret)    
    (add-system-components! storm-conf ret)
    (add-metric-streams! ret)
    (add-system-streams! ret)
    (validate-structure! ret)
10      ret
11  ))

实现说明:

  • 使用validate-basic!校验所提交的Topology.
    主要用于确保topology中的组件id不重复而且不是系统id,以及确保每个组件的TOPOLOGY-TASKS配置项大于0时,组件的并行度设置也一定大于0.
  • 调用deepCopy对topology进行深度拷贝,赋值给ret.
  • 为Topology添加acker-bolt.
    用于追踪发送出去的消息是否被成功处理。
  • 使用add-metric-components为Topology添加metric-bolt.
  • 为Topology添加system-bolt.
    System-bolt没有输入流只有输出流分别为:SYSTEM-TICK-STREAM-ID,声明字段是[“rate_secs”],非直接模式;另一个为METRICS-TICK-STREAM-ID,声明字段为[“interval”]非直接模式,并行度为0.
  • 为Topology中的所有组件添加统计流。
    Stream-id为METRICS-STREAM-ID,声明字段为[“task-info”,”data-points”],非直接流模式.
  • 为Topology中的所有组件添加系统流。
    stream-id为SYSTEM-STREAM-ID,声明字段为[“event”],非直接流模式.
  • 使用validate-structure!检验以上步骤所组合后的Topology.

验证过程:
获取Topology中所有组件和组件的输入(包括component-id、stream-id、Grouping),对输入组件依次判断输入组件ID(component-id)是否在该Topology中,不存在则抛出异常,存在则再判断该组件的流类型是否为所对应的stream-id,若不存在则抛出异常,存在则继续检查该流的分组方式(Grouping)是否与能对应,所有组件检查完毕后没有异常抛出表示该Topology有效.

时间: 2024-08-27 06:10:41

Storm系列(四)Topology校验过程的相关文章

Storm系列(三)Topology提交过程

提交示例代码: 1 public static void main(String[] args) throws Exception { 2 TopologyBuilder builder = new TopologyBuilder(); 3 builder.setSpout("random", new RandomWordSpout(), 2); 4 builder.setBolt("transfer", new TransferBolt(), 4).shuffle

Storm系列(五)Nimbus启动过程

启动流程图   mk-assignments 功能:对当前集群中所有Topology进行新一轮的任务调度. 实现源码路径: \apache-storm-0.9.4\storm-core\src\clj\backtype\storm\daemon\ nimbus.clj 方法原型: 1  defnk mk-assignments [nimbus :scratch-topology-id nil]   方法说明: 参数nimbus为nimbus-data对象,scratch-topology-id为

storm源码剖析(3):topology启动过程

storm的topology启动过程是执行strom jar topology1.jar MAINCLASS ARG1 ARG2 鉴于前面已经分析了脚本的解析过程,现在重点分析topology1.jar的执行. 以storm-starter中的ExclamationTopology为例,来进行剖析: public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { Outpu

Storm 系列(六)—— Storm 项目三种打包方式对比分析

一.简介 在将 Storm Topology 提交到服务器集群运行时,需要先将项目进行打包.本文主要对比分析各种打包方式,并将打包过程中需要注意的事项进行说明.主要打包方式有以下三种: 第一种:不加任何插件,直接使用 mvn package 打包: 第二种:使用 maven-assembly-plugin 插件进行打包: 第三种:使用 maven-shade-plugin 进行打包. 以下分别进行详细的说明. 二.mvn package 2.1 mvn package的局限 不在 POM 中配置

Storm 系列(三)Storm 集群部署和配置

Storm 系列(三)Storm 集群部署和配置 本章中主要介绍了 Storm 的部署过程以及相关的配置信息.通过本章内容,帮助读者从零开始搭建一个 Storm 集群.相关的过程和主要的配置选项是 Storm 的运维人员需要重点关注的,对部署和配置选项不感兴趣的读者,可以跳过本章. 在开始 Storm 之旅前,我们先看一下 Storm 部署和配置的相关信息,并提交一个 Topology,了解 Storm 的基本原理.Storm 的部署模式包括单机和集群环境,同时在向 Storm 环境中提交 To

Storm系列二: Storm拓扑设计

Storm系列二: Storm拓扑设计 在本篇中,我们就来根据一个案例,看看如何去设计一个拓扑, 如何分解问题以适应Storm架构,同时对Storm拓扑内部的并行机制会有一个基本的了解. 本章代码都在: [email protected]:zyzdisciple/storm_study.git 项目下的 user_behavior包下. 问题案例 有这样一种场景,在前端存在会话,我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

sed修炼系列(四):sed中的疑难杂症

本文目录:1 sed中使用变量和变量替换的问题2 反向引用失效问题3 "-i"选项的文件保存问题4 贪婪匹配问题5 sed命令"a"和"N"的纠葛 1.sed中使用变量和变量替换的问题 在脚本中使用sed的时候,很可能需要在sed中引用shell变量,甚至想在sed命令行中使用变量替换.也许很多人都遇到过这个问题,但引号却死活调试不出正确的位置.其实这不是sed的问题,而是shell的特性.搞懂sed如何解决引号的问题,对理解shell引号问题有

MM--发票校验 及基于采购订单的MIRO发票校验过程(

一.介绍发票校验是物料管理(MM)系统的一部分.它提供物料管理部分和财务会计, 成本控制和资产管理部分的连接.物料管理模块的发票校验为以下目的服务:它完成物料采购的全过程 - 物料采购从采购申请开始,接下来是采购和收货, 并以收到发票而结束.它允许处理不基于物料采购的发票(例如,服务费, 其它花费,过程费用, 等等).它允许处理贷项凭证, 既可以是发票的取消, 也可以是打折扣. 发票校验不是对支付进行处理, 也不是对发票进行分析. 这些需要处理的信息被传递到其它部门. 发票校验的任务包括:输入接