初识Storm之HelloWorld程序源码

1. 新建一个Maven项目,pom.xml代码如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yg</groupId>
    <artifactId>storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>storm</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.3</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.path.to.main.Class</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

2.新建HelloWorldSpout.java,代码如下:

package com.yg.storm.spouts;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class HelloWorldSpout extends BaseRichSpout{

    /**
     * 功能:随机生成字符串
     * 实现:先产生一个1-10随机整数,再不断产生一个1-10随机整数,若两者
     * 相等,则发射hello world,否则发送其他字符串
     */
    private static final long serialVersionUID = -5698117627723074157L;
    private static final int MAX_RANDOM = 10;
    private int referenceRandom;
    private SpoutOutputCollector collector;

    //构造函数
    public HelloWorldSpout(){
        //产生第一个随机数
        final Random rand  = new Random();
        referenceRandom = rand.nextInt(MAX_RANDOM);
    }

    //在spout加载时,打开一些资源(只在spout加载的时候执行一次)
    @Override
    public void open(Map conf,
            TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;

    }

    //核心方法,storm会不断调用该方法,也就是方法执行完后会马上重置并再次执行
    @Override
    public void nextTuple() {

        Utils.sleep(1000);//停滞一秒
        final Random rand  = new Random();
        int instanceRandom = rand.nextInt(MAX_RANDOM);
        if (referenceRandom == instanceRandom){
            collector.emit(new Values("Hello World"));//有顺序的
        } else {
            collector.emit(new Values("Other Random Word"));
        }
    }

    //声明Tuple的字段名,有顺序的
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));

    }

}

3.新建HelloWorldBolt.java,代码如下:

package com.yg.storm.bolts;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class HelloWorldBolt extends BaseRichBolt{

    /**
     * 功能:就收到spout发送的数据,打印并统计hello world的数量
     * 实现:打印,创建计数变量用于统计hello world
     */
    private static final long serialVersionUID = -5061906223048521415L;
    private int myCount = 0;//计数变量,不能在execute函数中初始化
    private TopologyContext context;//上下文变量
    private OutputCollector collector;

    //相当于spout中的open
    @Override
    public void prepare(Map stormConf,
            TopologyContext context,
            OutputCollector collector) {
        this.context = context;
        this.collector = collector;
    }

    //相当于spout中的nextTuple
    @Override
    public void execute(Tuple input) {
        //拿到数据,用字段名取出
        String text = input.getStringByField("sentence");
        System.out.println("One tuple gets in: " + context.getThisTaskId() + text);
        if ("Hello World".equals(text)){
            myCount++;
            System.out.println("Found a Hello World! My count is now:" + myCount);
        }
        collector.ack(input);//处理完成要通知Storm
//        collector.fail(input);//处理失败要通知Storm    

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

4.新建HelloWorldTopolog.java,代码如下:

package com.yg.storm.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

import com.yg.storm.bolts.HelloWorldBolt;
import com.yg.storm.spouts.HelloWorldSpout;

public class HelloWorldTopology {

    //可以向main传递一个参数作为集群模式下的Topology的名字,若没有传入参数则使用本地模式
    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("hlSpout", new HelloWorldSpout());
        builder.setBolt("hlBolt", new HelloWorldBolt())
        .shuffleGrouping("hlSpout");

        Config conf = new Config();

        if (args != null && args.length > 0){
            //集群模式提交
            conf.setNumWorkers(3);

            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (AuthorizationException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        } else {
            //本地模式提交
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(1000*60);
            cluster.killTopology("test");
            cluster.shutdown();

        }
    }

}

直接本地运行HelloWorldTopology类即可.

原文地址:https://www.cnblogs.com/dreamboy/p/11392809.html

时间: 2024-10-17 23:25:16

初识Storm之HelloWorld程序源码的相关文章

微信小程序源码下载(200多个)

微信小程序源码下载汇总,点击标题进入对应的微信小程序下载页面. 最新 demo源码(点击标题进入帖子下载) 描述 1 微信小程序 会议室预定小程序 微信小程序 会议室预定小程序**** 本内容被作者隐藏 **** 2 微信小程序-双人五子棋小游戏 微信小程序-双人五子棋小游戏**** 本内容被作者隐藏 **** 3 打卡签到小程序 用微信小程序实现的一个简单的打卡签到的小程序拒绝 4 微信小程序---左滑删除 微信小程序---左滑删除**** 本内容被作者隐藏 **** 5 一个借钱的记事本的微

微信小程序_微信小程序开发,小程序源码、案例、教程

原文地址:http://whosmall.com/?post=448 本文标签: 微信小程序 小程序源码案例 小程序项目 小程序源码 微信小程序教程 什么是微信小程序? 微信小程序是微信基于微信平台的一个应用发布平台,微信小程序app开发属于原生app组件提供js接口的开发方式,比混合是app的用户体验更好,仅次于原生应用. 不过微信小程序定位于小,要符合轻量易用无需下载,所以从体积上也是有限制,整个小程序应用体积不能超过1M. 微信小程序的应用场景? 微信小程序的应用场景适用于轻量应用,非强交

java socket控制台版本聊天室程序源码下载

原文:java socket控制台版本聊天室程序源码下载 代码下载地址:http://www.zuidaima.com/share/1550463257578496.htm java socket控制台版本聊天室程序源码下载,学习的时候写的,适合学习java基础 java网络编程基础用 标签: java socket 控制台 聊天室 源码话题: 网络编程 java socket控制台版本聊天室程序源码下载,布布扣,bubuko.com

【小程序源码案例】微信小程序项目开发案例分享

作者:web小二本文标签: 微信小程序 小程序源码案例 小程序项目小程序的开发,并不是适合所有公司,我今天跟大家分享小程序方面的教程,主要是供大家学习使用.学习这种东西,有时候则是单纯的喜欢,没有任何目的,很单纯的为了好玩,记得很早之前学flash,没有想法,就是觉得好玩,纯娱乐爱好而已.到后来玩视频剪辑也是出于同样的原因,不图钱财名利,只是图自己个人爱好娱乐. 但是,学习,有时候则是需要有明确目的,特别是关系到自己吃饭问题的时候,你就需要非常有目的去学习,并且还需要制定好学习的计划与目标,希望

微信小程序-整理各种小程序源码和资料免费下载

微信小程序整理下载 [小程序源码]微信小程序-车源宝微信版 [小程序源码]小程序-微赞社区(论坛demo) [小程序源码]微信小程序-收支账单 [小程序工具]微信小程序-日历 [小程序源码]小程序-在线聊天功能 [小程序源码]微信小程序-大好商城(新增功能天气查询和2048游戏) [小程序源码]微信小程序-查询号码归属地 [小程序源码]微信小程序-备忘录2 [小程序源码]微信小程序-QQ音乐 [小程序源码]小程序-货币汇率 [小程序源码]微信小程序-大学图书馆 [小程序源码]小程序-积分商城 [

C#实现联通短信Sgip协议程序源码

此程序为中国联通Sgip协议程序接口,适合在中国联通申请了短信发送端口的公司使用. 短信群发已经成为现在软件系统.网络营销等必不可少的应用工具.可应用在短信验证.信息群发.游戏虚拟商品购买.事件提醒.送祝福等方面. 本程序功能包括: 1.支持中国联通Sgip1.2协议: 2.支持一般的短信发送.上行短信接收.状态报告: 3.支持长短信: 4.支持普通手机号和物联网卡: 5.全套C#.Net源码 程序适用于Sgip 1.2协议,可用.Net任何版本编译. DLL版:短信接口程序DLL源码,调用此D

c# Ftp下载程序源码解析

using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.IO; using System.Net; using System.Threading.Tasks; using System.Windows.Forms; n

查看Chrome浏览器扩展程序源码的两种方法

注意:仅在当前最新的版本 55.0.2883.87 m (64-bit)上测试有效 首先获取extensionId: chrome 打开扩展程序页面 chrome://extensions/ 比如我想查看Adblock的源码:就先复制他的Id:gighmmpiobklfepjocnamgkkbiglidom 方法1: 打开目录 C:\Users\{YOUR_NAME}\AppData\Local\Google\Chrome\User Data\Profile 2\Extensions {YOUR

基于TCP网络通信的自动升级程序源码分析-客户端请求服务器上的升级信息

每次升级,客户端都会获取服务器端存放在upgradefile文件夹下的需要升级的文件和升级信息配置文件(即upgradeconfig.xml文件) 我们来看一下代码 //升级信息配置文件相对应的类 ( 升级信息配置文件是由这个类转化成的) private UpgradeConfig upgradeConfig = null; //客户端存储升级配置文件的地址 是放在客户端根目录下的 (就是把服务器 upgradefile/upgradeconfig.xml下载到客户端存放的位置) string