Storm杂谈之Topology的启动过程(二)

在一中讲到了topology提交给nimbus

nimbus

Nimbus可以 说是storm中最核心的部分,它的主要功能有两个:

  • 对Topology的任务进行分配资源
  • 接收用户的命令并做相应的处理,如Topology的提交,杀死,激活等等

Nimbus本身是基于Thrift框架实现的,使用了Thrift的THsHaServer服务,即半同步半异步服务模式,使用一个单独的线程来处理网络IO,使用一个独立的线程池来处理消息,大大提高了消息的并发处理能力。

服务接口的定义都在storm.thrift文件中定义,贴下部分代码:

service Nimbus {
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs

  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);

  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // returns json
  string getNimbusConf();
  // stats functions
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  //returns json
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

当执行命令  nohup ${STORM_HOME}/bin/storm nimbus & 时,会启动nimbus服务,具体的代码执行:

storm python脚本代码,默认启动backtype.storm.daemon.nimbus程序:

def nimbus(klass="backtype.storm.daemon.nimbus"):
    """Syntax: [storm nimbus]

    Launches the nimbus daemon. This command should be run under
    supervision with a tool like daemontools or monit. 

    See Setting up a Storm cluster for more information.
    (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
    """
    cppaths = [CLUSTER_CONF_DIR]
    jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
        "-Dlogfile.name=nimbus.log",
        "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
    ]
    exec_storm_class(
        klass,
        jvmtype="-server",
        extrajars=cppaths,
        jvmopts=jvmopts)

然后执行nimbus.clj 脚本,主要涉及两个方法——launch-server!(nimbus的启动入口)和service-handler(真正定义处理逻辑的地方)。

nimbus启动后,对外提供了一些服务,topology的提交,UI信息,topology的kill,rebalance等等。在文章一中讲到提交topology给nimbus,这些服务的处理逻辑全部在service-handler方法中。以下截取service-handler里面处理提交Topology的逻辑

(reify Nimbus$Iface
      (^void submitTopologyWithOpts
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
         ^SubmitOptions submitOptions]
        (try
          (assert (not-nil? submitOptions))
          (validate-topology-name! storm-name)
          (check-storm-active! nimbus storm-name false)
          (let [topo-conf (from-json serializedConf)]
            (try
              (validate-configs-with-schemas topo-conf)
              (catch IllegalArgumentException ex
                (throw (InvalidTopologyException. (.getMessage ex)))))
            (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
                       storm-name
                       topo-conf
                       topology))
          (swap! (:submitted-count nimbus) inc)
          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
                storm-conf (normalize-conf
                            conf
                            (-> serializedConf
                                from-json
                                (assoc STORM-ID storm-id)
                              (assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf (merge conf storm-conf)
                topology (normalize-topology total-storm-conf topology)
                storm-cluster-state (:storm-cluster-state nimbus)]
            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
            (log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
            (locking (:submit-lock nimbus)
              (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
              (.setup-heartbeats! storm-cluster-state storm-id)
              (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
                                              TopologyInitialStatus/ACTIVE :active}]
                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
              (mk-assignments nimbus)))
          (catch Throwable e
            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
            (throw e))))

      (^void submitTopology
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
        (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                 (SubmitOptions. TopologyInitialStatus/ACTIVE)))

检查Topology的DAG图是否是有效连接图、以及该topology Name是否已经存在,然后分配资源和任务调度(mk-assignments )方法,等分配好资源之后,把数据写入到zookeeper,watcher发现有数据,就通知supervisor读取数据启动新的worker,一个worker就是一个JVM进程,worker启动后就会按照用户事先定好的task数来启动task,一个task就是一个thread

在executor.clj中mk-threads: spout ,mk-threads:
bolt方法就是启动task,而task就是对应的spout或bolt 组件,而且这时Spout的open,nextTuple方法,以及bolt的preapre,execute方法都是在这里被调用的,结合文章一中提到的,对于

Spout 方法调用顺序:

declareOutputFields-> open -> nextTuple -> fail/ack or other

Bolt 方法调用顺序:

declareOutputFields-> prepare -> execute

需要的注意的是在Spout中fail、ack方法和nextTuple是在同一线程中被顺序调用的,所以在nextTuple中不要做延迟很大的操作。

至此,一个topology算是可以正式启动工作了。

时间: 2024-10-10 23:20:10

Storm杂谈之Topology的启动过程(二)的相关文章

Storm杂谈之Topology的启动过程(一)

topology的提交 大家都知道,要提交Storm Topology 到Cluster,需要运行如下命令: ${STORM_HOME}/bin/storm jar xxxxxxxxxxx.jar ${main class} [args ...] bin目录下storm是一个Python文件,我们可以看一下Python脚本的main方法 def main(): if len(sys.argv) <= 1: print_usage() sys.exit(-1) global CONFIG_OPTS

IPv6通讯原理(1) - 不能忽略的网卡启动过程

本文主题:通过抓包分析,深入观察网卡启动过程的每个步骤,从而逐步掌握通讯原理. ??小慢哥的原创文章,欢迎转载 目录 ? 一. 为什么不能忽略网卡启动过程 ? 二. 实验环境 ? 三. 网卡启动前/后的样子 ? 四. 结论写在前 ? Step1. 生成"链路本地地址" ? Step2. 生成"被请求节点多播地址" ? Step3. "多播成员报告" ? Step4. "重复地址检测" ? Step5. "无状态地址自

storm源码剖析(3):topology启动过程

storm的topology启动过程是执行strom jar topology1.jar MAINCLASS ARG1 ARG2 鉴于前面已经分析了脚本的解析过程,现在重点分析topology1.jar的执行. 以storm-starter中的ExclamationTopology为例,来进行剖析: public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { Outpu

storm之topology的启动

一个topology的启动包括了三个步骤 1)创建TopologyBuilder,设置输入源,输出源 2)获取config 3)提交topology(这里不考虑LocalCluster本地模式) 以storm.starter的ExclamationTopology为例: public static void main(String[] args)throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.

Samza在YARN上的启动过程 =》 之二 submitApplication

首先,来看怎么构造一个org.apache.hadoop.yarn.client.api.YarnClient ? 1 2 3 4 5 class ClientHelper(conf: Configuration) extends Logging {   val yarnClient = YarnClient.createYarnClient   info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM

google 分屏 横屏模式 按home键界面错乱故障分析(二) 分屏的启动过程

google 进入分屏后在横屏模式按home键界面错乱(二) 你确定你了解分屏的整个流程? Android 关机对话框概率没有阴影故障分析 android recent key长按事件弹起触发最近列表故障分析 google 分屏 popup无法显示故障分析 分享此文便是对代码GG的支持,也是爱的表达方式,所以让爱来的猛烈些吧. 代码阅读,请到此处http://androidxref.com 查看原生代码 前情回顾: google 分屏 横屏模式 按home键界面错乱故障分析(一) 上一节我们主要

Linux学习之旅(二) --- Linux启动过程

一.通用操作系统的启动过程: 不同的处理器和硬件系统会采用不同的启动策略,从而具体的启动过程会有所差异. 但无论差异如何,从计算机系统的角度来看,启动过程一般分为三个步骤:                  开机并执行bootloader程序     ---->     操作系统内核初始化     ---->     执行第一个应用程序 大致流程: 1.开机,系统供电,此时硬件电路会产生一个确定的复位时序,保证CPU是最后一个被复位的器件: 2.当正确完成复位后,CPU开始执行第一条指令,该指

Storm系列(四)Topology校验过程

功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务. 实现源码: 1  (^void submitTopology 2          [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] 3          (.submitTopo

Android深入四大组件(二)Service的启动过程

相关文章 Android深入理解四大组件系列 前言 此前我用较长的篇幅来介绍Android应用程序的启动过程(根Activity的启动过程),这一篇我们接着来分析Service的启动过程.建议阅读此篇文章前,请先阅读Android深入四大组件(一)应用程序启动过程(前篇)和Android深入四大组件(一)应用程序启动过程(后篇)这两篇文章. 1.ContextImpl到ActivityManageService的调用过程 要启动Service,我们会调用startService方法,它的实现在C