Storm Topology 提交到集群

问题:当完成Topology各个组件的定义之后(写好了**Spout.java 和 **Bolt.java)如何将Topology提交到集群中去?

参考:http://www.cnblogs.com/fxjwind/archive/2013/06/05/3119056.html

1,在**Topology.java中的main方法 setSpout、setBolt 之后通过TopologyBuilder.createTopology()创建Topology对象,在**Topology的main方法中通过下面代码进行提交:

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

2,在StormSubmitter.java的submitTopology方法中,先生成一个NimbusClient对象,然后再执行submitJar(, ,)方法进行提交

NimbusClient client = NimbusClient.getConfiguredClient(conf);

submitJar(conf, progressListener);

3,在submitJar方法(只有两个参数)中,先获得待提交的jar文件的地址(string 对象表示),再调用submitJar(, , ,)提交。在这里先判断该Topology是否已经提交

private static void submitJar(Map conf, ProgressListener listener) {
    if(submittedJar==null) {
        LOG.info("Jar not uploaded to master yet. Submitting jar...");
        String localJar = System.getProperty("storm.jar");
        submittedJar = submitJar(conf, localJar, listener);
    } else {
        LOG.info("Jar already uploaded to master. Not submitting jar.");
    }
}

4,在submitJar(, , ,)中,首先生成一个NimbusClient用来获得提交jar文件的目标地址。StormSubmitter是个Thrift Client,Nimbus是Thrift Server,通过以下三步将jar文件通过RPC发送给Nimubs

方法执行完后返回String submittedJar,标记该Topology是否已经提交过。该参数作为第5步中的submitTopologyWithOpts()参数

String uploadLocation = client.getClient().beginFileUpload();
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
client.getClient().finishFileUpload(uploadLocation);

其中用到了 backtype.storm.utils工具包中的类BufferFileInputStream.java

5,此时返回到StormSubmitter.java的submitTopology方法,执行

if(opts!=null) {
       client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); 
} else {
         // this is for backwards compatibility
         client.getClient().submitTopology(name, submittedJar, serConf, topology);

再来看看第5步中调用的方法,public class NimbusClient extends ThriftClient说明其实提交过程都是由Thrift封装好了的。

参考:http://www.cnblogs.com/fxjwind/archive/2013/06/04/3117385.html

public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift.TException
    {
      send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
      recv_submitTopologyWithOpts();
    }
public void send_submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws org.apache.thrift.TException
    {
      submitTopologyWithOpts_args args = new submitTopologyWithOpts_args();
      args.set_name(name);
      args.set_uploadedJarLocation(uploadedJarLocation);
      args.set_jsonConf(jsonConf);
      args.set_topology(topology);
      args.set_options(options);
      sendBase("submitTopologyWithOpts", args);
    }
时间: 2024-12-11 16:47:48

Storm Topology 提交到集群的相关文章

STORM在线业务实践-集群空闲CPU飙高问题排查(转)

最近将公司的在线业务迁移到Storm集群上,上线后遇到低峰期CPU耗费严重的情况.在解决问题的过程中深入了解了storm的内部实现原理,并且解决了一个storm0.9-0.10版本一直存在的严重bug,目前代码已经合并到了storm新版本中,在这篇文章里会介绍这个问题出现的场景.分析思路.解决的方式和一些个人的收获. 背景 首先简单介绍一下Storm,熟悉的同学可以直接跳过这段. Storm是Twitter开源的一个大数据处理框架,专注于流式数据的处理.Storm通过创建拓扑结构(Topolog

STORM在线业务实践-集群空闲CPU飙高问题排查

源:http://daiwa.ninja/index.php/2015/07/18/storm-cpu-overload/ 2015-07-18AUTHORDAIWA STORM在线业务实践-集群空闲CPU飙高问题排查有2条评论 STORM在线业务实践-集群空闲CPU飙高问题排查 最近将公司的在线业务迁移到Storm集群上,上线后遇到低峰期CPU耗费严重的情况.在解决问题的过程中深入了解了storm的内部实现原理,并且解决了一个storm0.9-0.10版本一直存在的严重bug,目前代码已经合并

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

Spark wordcount开发并提交到集群运行

使用的ide是eclipse package com.luogankun.spark.base import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ /** * 统计字符出现次数 */ object WorkCount { def main(args: Array[String]) { if (args.length < 1) {

Eclipse远程提交hadoop集群任务

文章概览: 1.前言 2.Eclipse查看远程hadoop集群文件 3.Eclipse提交远程hadoop集群任务 4.小结 1 前言 Hadoop高可用品台搭建完备后,参见<Hadoop高可用平台搭建>,下一步是在集群上跑任务,本文主要讲述Eclipse远程提交hadoop集群任务. 2 Eclipse查看远程hadoop集群文件 2.1 编译hadoop eclipse 插件 Hadoop集群文件查看可以通过webUI或hadoop Cmd,为了在Eclipse上方便增删改查集群文件,我

MR程序本地调试,提交到集群运行

在本地调试,提交到集群上运行. 在本地程序中的Configuration中添加如下配置: Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.128:9000"); System.setProperty("HADOOP_USER_NAME","hadoop"); conf.set("mapredu

Mapreduce提交YARN集群运行

Eclipse项目打包1.export2.通过maven打包,切入到项目目录下执行命令mvn clean package Mapreduce提交YARN集群运行 将jar包传到hadoop目录下运行格式:bin/hadoop jar  jar包名   包名(代码的包名).类名 +参数(输入路径输出路径)就可以在集群上运行了 原文地址:https://www.cnblogs.com/libin123/p/10330330.html

CentOS下Storm 1.0.0集群安装具体解释

本文环境例如以下: 操作系统:CentOS 6 32位 ZooKeeper版本号:3.4.8 Storm版本号:1.0.0 JDK版本号:1.8.0_77 32位 python版本号:2.6.6 集群情况:一个主控节点(Master)和两个工作节点(Slave1,Slave2) 1. 搭建Zookeeper集群 安装參考:CentOS下ZooKeeper单机模式.集群模式安装 2. 在Nimbus和worker机器上安装依赖包 Java 6 Python 2.6.6 以上的版本号是官方说已经有測

CentOS下Storm 1.0.0集群安装详解

本文环境如下: 操作系统:CentOS 6 32位 ZooKeeper版本:3.4.8 Storm版本:1.0.0 JDK版本:1.8.0_77 32位 python版本:2.6.6 集群情况:一个主控节点(Master)和两个工作节点(Slave1,Slave2) 1. 搭建Zookeeper集群 安装参考:CentOS下ZooKeeper单机模式.集群模式安装 2. 在Nimbus和worker机器上安装依赖包 Java 6 Python 2.6.6 以上的版本是官方说已经有测试可以和Str