Storm入门(三)HelloWorld示例

一、关联代码

使用maven,代码如下。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.ljh.storm</groupId>
  <artifactId>storm-helloworld</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>storm-helloworld</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
    </dependency>
  </dependencies>
</project>

ExclamationTopology.java

package cn.ljh.storm.helloworld;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

public class ExclamationTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("word", new TestWordSpout(), 1);
        builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word");
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim");

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
          conf.setNumWorkers(3);

          StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        }
        else {

          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("test3", conf, builder.createTopology());
          Utils.sleep(10000);
          cluster.killTopology("test3");
          cluster.shutdown();
        }
      }
}

TestWordSpout.java

package cn.ljh.storm.helloworld;

import org.apache.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestWordSpout extends BaseRichSpout {
   public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
   SpoutOutputCollector _collector;

   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
       _collector = collector;
   }

   public void nextTuple() {
       Utils.sleep(100);
       final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
       final Random rand = new Random();
       final String word = words[rand.nextInt(words.length)];
       _collector.emit(new Values(word));
   }

   public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("word"));
   }
}

ExclamationBolt.java

package cn.ljh.storm.helloworld;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

  }

PrintBolt.java

package cn.ljh.storm.helloworld;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrintBolt extends BaseRichBolt {
        private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          _collector = collector;
        }

        public void execute(Tuple tuple) {
          LOG.info(tuple.getString(0) + " Hello World!");
          _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
}

storm有本地模式和集群模式。

本地模式一般用于测试和开发阶段,直接在Eclipse执行ExclamationTopology的main函数进行。

集群模式需要先把应用达成jar,然后使用storm命令提交到集群中去。

提交命令:storm jar /home/test/storm-helloworld-0.0.1-SNAPSHOT.jar cn.ljh.storm.helloworld.ExclamationTopology ExclamationTest

杀死命令:storm kill ExclamationTest

二、集群运行效果

运行提交命令后,出现如下log,说明提交成功。

查看集群的进程jps,两个Supervisor节点出现了worker进程

在Nimbus节点的/usr/local/storm/data/nimbus/inbox下面有提交的jar

UI界面显示提交topology

至此HelloWorld示例完成。

时间: 2024-12-21 04:42:04

Storm入门(三)HelloWorld示例的相关文章

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

【转载】Lucene.Net入门教程及示例

本人看到这篇非常不错的Lucene.Net入门基础教程,就转载分享一下给大家来学习,希望大家在工作实践中可以用到. 一.简单的例子 //索引Private void Index(){    IndexWriter writer = new IndexWriter(@"E:\Index", new StandardAnalyzer());    Document doc = new Document();    doc.Add(new Field("Text",&qu

_00019 Storm的体系结构介绍以及Storm入门案例(官网上的简单Java案例)

博文作者:妳那伊抹微笑 博客地址:http://blog.csdn.net/u012185296 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Storm

【原创】NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战

前言 本文将演示一个iOS客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo.服务端将分别用MINA2和Netty4进行实现,而通信时服务端你只需选其一就行了.同时用MINA2和Netty4分别实现服务端的目的,是因为很多人都在纠结到底是用MINA还是Netty来实现高并发的Java网络通信服务端,在此干脆两个都实现了,就看你怎么选择了,够吊吧. NIO框架的流行,使得开发大并发.高性能的互联网服务端成为可能.这其中最流行的无非就是MINA和Netty了,M

Servlet(三) 示例

Servlet是一批服务于HTTP请求且实现了javax.servlet.Servlet接口的类,Web应用程序的开发人员通常会继承javax.servlet.http.HttpServlet这个抽象类,该抽象类同样实现了javax.servlet.Servlet接口,其设计目标就是专门来用处理http请求. 下面是经典的HelloWorld的Servlet实现: // Import required java libraries import java.io.*; import javax.s

MongooooooooooooooooooooDB入门三:MongoDB shell

MongoDB shell是什么? MongDB shell是一个功能完备的Javascript解释器,可以运行Javascript程序. 但shell的作用远不止于此,它主要还是用于连接MongoDB服务器,执行脚本,对数据库进行操作.类似于SQL Server的管理工具SSMS. 可以通过以下2条命令打开shell控制台: > cd C:\Program Files\MongoDB 2.6 Standard\bin > mongo 如图所示: C:\Program Files\MongoD

【第四篇】ASP.NET MVC快速入门之完整示例(MVC5+EF6)

目录 [第一篇]ASP.NET MVC快速入门之数据库操作(MVC5+EF6) [第二篇]ASP.NET MVC快速入门之数据注解(MVC5+EF6) [第三篇]ASP.NET MVC快速入门之安全策略(MVC5+EF6) [第四篇]ASP.NET MVC快速入门之完整示例(MVC5+EF6) [番外篇]ASP.NET MVC快速入门之免费jQuery控件库(MVC5+EF6) 请关注三石的博客:http://cnblogs.com/sanshi 完善数据注解 到目前为止的表格页面效果: 我们需

【opengl 学习笔记01】HelloWorld示例

<<OpenGL Programming Guide>>这本书是看了忘,忘了又看,赶脚还是把笔记做一做心里比较踏实,哈哈. 我的主题是,好记性不如烂笔头. ================================================================ 1. 下载glut库 glut库地址为:www.opengl.org/resources/libraries/glut/glutdlls37beta.zip glut全称为:OpenGL Utilit

Flex入门(三)——微架构之Cairngorm

大家都知道我们在开发后台的时候,都会使用MVC,三层等分层架构,使后台代码达到职责更为分明单一,高内聚低耦合,例如,Dao层只是进行和数据库打交道,负责处理数据:Service(B层)只是进行逻辑判断处理,而Action则进行后台和前台页面的交互等.从而使程序更加容易管理,更加灵活,更加容易扩展,更加容易维护.也就是大家比较熟悉的Struts(SpringMVC)+Spring+Hibernate(Mybatis)等. 而作为前台Flex处理,也提供了类似的处理功能,想要达到的效果,也是代码分层