55.storm 之 hello word(本地模式)

strom hello word

概述

然后卡一下代码怎么实现的:

  1. 编写数据源类:Spout。可以使用两种方式:

    继承BaseRichSpout类

    实现IRichSpout接口

    主要需要实现或重写几个方法:open、nextTuple、declareOutputFields

  2. 继续编写数据处理类:Bolt。可以使用两种方式:

    继承BaseBasicBolt类

    实现IRichBolt接口

    终点实现或重写几个方法:execute、declareOutputFields

  3. 最后编写主函数(Topology)去进行提交一个任务

    在使用Topology的时候,Storm框架为我们提供了两种模式:本地模式和集群模式

    本地模式:(无需Storm集群,直接在java中即可运行,一般用于测试和开发阶段)执行main函数即可

    集群模式:(需要Storm集群,把实现java程序打包,然后Topology进行提交)需要把应用打成jar,使用Storm命令吧Topology提交到集群中去。

实际操作

先来看一下代码结构:

就如上图所说,数据从PWSpout流到PrintBolt,最后到WriteBolt写到文件。具体看一下这几个类的代码:

先看一本地模式的:

PWTopology1.java 拓扑结构构建

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import bhz.bolt.PrintBolt;
import bhz.bolt.WriteBolt;
import bhz.spout.PWSpout;

public class PWTopology1 {

    public static void main(String[] args) throws Exception {
        //
        Config cfg = new Config();
        cfg.setNumWorkers(2);
        cfg.setDebug(true);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new PWSpout());
        builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
        builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");

        //1 本地模式
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("top1", cfg, builder.createTopology());
        Thread.sleep(10000);
        cluster.killTopology("top1");
        cluster.shutdown();

        //2 集群模式
//        StormSubmitter.submitTopology("top1", cfg, builder.createTopology());

    }
}

代码分析:

数据来源:

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class PWSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector collector;

    private static final Map<Integer, String> map = new HashMap<Integer, String>();

    static {
        map.put(0, "java");
        map.put(1, "php");
        map.put(2, "groovy");
        map.put(3, "python");
        map.put(4, "ruby");
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //对spout进行初始化
        this.collector = collector;
        //System.out.println(this.collector);
    }

    /**
     * <B>方法名称:</B>轮询tuple<BR>
     * <B>概要说明:</B><BR>
     * @see backtype.storm.spout.ISpout#nextTuple()
     */
    @Override
    public void nextTuple() {
        //随机发送一个单词
        final Random r = new Random();
        int num = r.nextInt(5);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.collector.emit(new Values(map.get(num)));
    }

    /**
     * <B>方法名称:</B>declarer声明发送数据的field<BR>
     * <B>概要说明:</B><BR>
     * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //进行声明
        declarer.declare(new Fields("print"));
    }

}

代码解析:

整体结构

细入分析

---------------------------- open 方法---------------------------------------------------------

---------------------------------  nextTuple方法 --------------------------------------------------------------

---------------------------- declareOutputFields方法 ----------------------------------------------------

数据处理

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class PrintBolt extends BaseBasicBolt {

    private static final Log log = LogFactory.getLog(PrintBolt.class);

    private static final long serialVersionUID = 1L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //获取上一个组件所声明的Field
        String print = input.getStringByField("print");
        log.info("【print】: " + print);
        //System.out.println("Name of input word is : " + word);
        //进行传递给下一个bolt
        collector.emit(new Values(print));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("write"));
    }

}

代码分析

import java.io.FileWriter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import clojure.main;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WriteBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;

    private static final Log log = LogFactory.getLog(WriteBolt.class);

    private FileWriter writer ;
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //获取上一个组件所声明的Field
        String text = input.getStringByField("write");
        try {
            if(writer == null){
                if(System.getProperty("os.name").equals("Windows 10")){
                    writer = new FileWriter("D:\\099_test\\" + this);
                } else if(System.getProperty("os.name").equals("Windows 8.1")){
                    writer = new FileWriter("D:\\099_test\\" + this);
                } else if(System.getProperty("os.name").equals("Windows 7")){
                    writer = new FileWriter("D:\\099_test\\" + this);
                } else if(System.getProperty("os.name").equals("Linux")){
                    System.out.println("----:" + System.getProperty("os.name"));
                    writer = new FileWriter("/usr/local/temp/" + this);
                }
            }
            log.info("【write】: 写入文件");
            writer.write(text);
            writer.write("\n");
            writer.flush();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

和PrintBolt 这个类很相似,都是在处理数据。不作过多解释

时间: 2024-08-08 10:40:09

55.storm 之 hello word(本地模式)的相关文章

storm入门——本地模式helloworld

创建maven项目,在pom.xml中加入以下配置: <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <type>jar</type> <version>0.9.3-rc1</version> </dependency> 创建SimpleSpout类用于获取数据流:

storm的本地模式demo

SimpleTopology.java package com.zgl.helloword; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; /**  * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bol

Storm集群上的开发 ,本地模式报错问题(插曲)

打包上传到集群上跑是没问题的,在本地模式跑,报客户端没有所需特权,此处客户端指的是MyEclipse,右击用管理员模式打开myclipse即可. 错误日志 : 4573 [SLOT_1027] ERROR o.a.s.d.s.Slot - Error when processing event java.nio.file.FileSystemException: C:\Users\ADMINI~1\AppData\Local\Temp\6d36a211-4aed-4485-ac2f-156088

02 使用Flink的本地模式完成词频统计

前面我们已经安装了flink,与storm一样,flink也有两种模式,一是本地模式,主要用于学习和测试,另一个是集群模式,实际生产中使用这种模式.本节将阐述如何使用本地模式的flink进行词频统计. 1 系统.软件以及前提约束 CentOS 7 64 工作站 作者的机子ip是192.168.100.200,请读者根据自己实际情况设置 idea 2018.1 在Win10中安装nc https://www.jianshu.com/p/4f6fb8834ad9 2 操作 1 在idea中创建一个m

Storm使用入门之本地开发环境搭建

本篇博文详细告诉你如何安装Storm的本地开发环境,总体分为两步,具体如下: 1.从官网上下载Storm的发布包,下载完成后将其解压,并将解压后的bin目录添加到环境变量(PATH)中,以方便后续执行Storm的相关命令 2.修改Storm的配置文件(storm.yaml),主要是按照实际情况更新配置文件中的集群信息,然后将修改后的配置文件添加到目录(~/.storm/)中,目的是为了后续能够远程启动和停止集群上的计算任务(即topology) 接下来,咱们来详细地介绍每一个操作步骤. 首先,何

Storm集群的DRPC模式

storm的DRPC模式的作用是实现从远程调用storm集群的计算资源,而不需要连接到集群的某一个节点.OK.那么storm实现DRPC主要是使用LinearDRPCTopologyBuilder这个类.下面就先来看看一个简单的例子,它的源码的github上. import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.stor

Linux下的Hadoop安装(本地模式)

系统为CentOS 6.9,Hadoop版本2.8.3,虚拟机VMware Workstation 主要介绍Linux虚拟机安装.环境配置和Hadoop本地模式的安装.伪分布式和Windows下的安装将另作补充. 网络上关于Hadoop安装的教程非常多,这里主要是归纳出安装过程的简要步骤和可能碰到的问题. Linux环境安装 NAT Linux 设置网络 HOST 其他环境设置 Java环境配置 Hadoop本地模式安装 Linux环境安装 NAT Linux 设置网络 HOST 其他环境设置

IntelliJ IDEA(Ultimate版本)的下载、安装和WordCount的初步使用(本地模式和集群模式)Ultimate

不多说,直接上干货! IntelliJ IDEA号称当前Java开发效率最高的IDE工具.IntelliJ IDEA有两个版本:社区版(Community)和旗舰版(Ultimate).社区版时免费的.开源的,但功能较少,旗舰版提供了较多的功能,是收费的,可以试用30天. 强烈推荐,新手刚入门,可以去用社区版,但是,立马还是用旗舰版,我可是走了弯路,当然,体会到其中的棘手还是很不错! IDEA Community(社区版)再谈之无奈之下还是去安装社区版 IntelliJ IDEA(Communi

Hive的三种安装方式(内嵌模式,本地模式远程模式)

一.安装模式介绍:     Hive官网上介绍了Hive的3种安装方式,分别对应不同的应用场景.     1.内嵌模式(元数据保村在内嵌的derby种,允许一个会话链接,尝试多个会话链接时会报错)     2.本地模式(本地安装mysql 替代derby存储元数据)     3.远程模式(远程安装mysql 替代derby存储元数据) 二.安装环境以及前提说明:     首先,Hive是依赖于hadoop系统的,因此在运行Hive之前需要保证已经搭建好hadoop集群环境.     本文中使用的