Storm 系列(五)—— Storm 编程模型详解

一、简介

下图为 Strom 的运行流程图,在开发 Storm 流处理程序时,我们需要采用内置或自定义实现 spout(数据源) 和 bolt(处理单元),并通过 TopologyBuilder 将它们之间进行关联,形成 Topology

二、IComponent接口

IComponent 接口定义了 Topology 中所有组件 (spout/bolt) 的公共方法,自定义的 spout 或 bolt 必须直接或间接实现这个接口。

public interface IComponent extends Serializable {

    /**
     * 声明此拓扑的所有流的输出模式。
     * @param declarer 这用于声明输出流 id,输出字段以及每个输出流是否是直接流(direct stream)
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);

    /**
     * 声明此组件的配置。
     *
     */
    Map<String, Object> getComponentConfiguration();

}

三、Spout

3.1 ISpout接口

自定义的 spout 需要实现 ISpout 接口,它定义了 spout 的所有可用方法:

public interface ISpout extends Serializable {
    /**
     * 组件初始化时候被调用
     *
     * @param conf ISpout 的配置
     * @param context 应用上下文,可以通过其获取任务 ID 和组件 ID,输入和输出信息等。
     * @param collector  用来发送 spout 中的 tuples,它是线程安全的,建议保存为此 spout 对象的实例变量
     */
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /**
     * ISpout 将要被关闭的时候调用。但是其不一定会被执行,如果在集群环境中通过 kill -9 杀死进程时其就无法被执行。
     */
    void close();

    /**
     * 当 ISpout 从停用状态激活时被调用
     */
    void activate();

    /**
     * 当 ISpout 停用时候被调用
     */
    void deactivate();

    /**
     * 这是一个核心方法,主要通过在此方法中调用 collector 将 tuples 发送给下一个接收器,这个方法必须是非阻塞的。
     * nextTuple/ack/fail/是在同一个线程中执行的,所以不用考虑线程安全方面。当没有 tuples 发出时应该让
     * nextTuple 休眠 (sleep) 一下,以免浪费 CPU。
     */
    void nextTuple();

    /**
     * 通过 msgId 进行 tuples 处理成功的确认,被确认后的 tuples 不会再次被发送
     */
    void ack(Object msgId);

    /**
     * 通过 msgId 进行 tuples 处理失败的确认,被确认后的 tuples 会再次被发送进行处理
     */
    void fail(Object msgId);
}

3.2 BaseRichSpout抽象类

通常情况下,我们实现自定义的 Spout 时不会直接去实现 ISpout 接口,而是继承 BaseRichSpoutBaseRichSpout 继承自 BaseCompont,同时实现了 IRichSpout 接口。

IRichSpout 接口继承自 ISpoutIComponent,自身并没有定义任何方法:

public interface IRichSpout extends ISpout, IComponent {

}

BaseComponent 抽象类空实现了 IComponentgetComponentConfiguration 方法:

public abstract class BaseComponent implements IComponent {
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

BaseRichSpout 继承自 BaseCompont 类并实现了 IRichSpout 接口,并且空实现了其中部分方法:

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
    @Override
    public void close() {}

    @Override
    public void activate() {}

    @Override
    public void deactivate() {}

    @Override
    public void ack(Object msgId) {}

    @Override
    public void fail(Object msgId) {}
}

通过这样的设计,我们在继承 BaseRichSpout 实现自定义 spout 时,就只有三个方法必须实现:

  • open : 来源于 ISpout,可以通过此方法获取用来发送 tuples 的 SpoutOutputCollector
  • nextTuple :来源于 ISpout,必须在此方法内部发送 tuples;
  • declareOutputFields :来源于 IComponent,声明发送的 tuples 的名称,这样下一个组件才能知道如何接受。

四、Bolt

bolt 接口的设计与 spout 的类似:

4.1 IBolt 接口

 /**
  * 在客户端计算机上创建的 IBolt 对象。会被被序列化到 topology 中(使用 Java 序列化),并提交给集群的主机(Nimbus)。
  * Nimbus 启动 workers 反序列化对象,调用 prepare,然后开始处理 tuples。
 */

public interface IBolt extends Serializable {
    /**
     * 组件初始化时候被调用
     *
     * @param conf storm 中定义的此 bolt 的配置
     * @param context 应用上下文,可以通过其获取任务 ID 和组件 ID,输入和输出信息等。
     * @param collector  用来发送 spout 中的 tuples,它是线程安全的,建议保存为此 spout 对象的实例变量
     */
    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

    /**
     * 处理单个 tuple 输入。
     *
     * @param Tuple 对象包含关于它的元数据(如来自哪个组件/流/任务)
     */
    void execute(Tuple input);

    /**
     * IBolt 将要被关闭的时候调用。但是其不一定会被执行,如果在集群环境中通过 kill -9 杀死进程时其就无法被执行。
     */
    void cleanup();

4.2 BaseRichBolt抽象类

同样的,在实现自定义 bolt 时,通常是继承 BaseRichBolt 抽象类来实现。BaseRichBolt 继承自 BaseComponent 抽象类并实现了 IRichBolt 接口。

IRichBolt 接口继承自 IBoltIComponent,自身并没有定义任何方法:

public interface IRichBolt extends IBolt, IComponent {

}

通过这样的设计,在继承 BaseRichBolt 实现自定义 bolt 时,就只需要实现三个必须的方法:

  • prepare: 来源于 IBolt,可以通过此方法获取用来发送 tuples 的 OutputCollector
  • execute:来源于 IBolt,处理 tuples 和发送处理完成的 tuples;
  • declareOutputFields :来源于 IComponent,声明发送的 tuples 的名称,这样下一个组件才能知道如何接收。

五、词频统计案例

5.1 案例简介

这里我们使用自定义的 DataSourceSpout 产生词频数据,然后使用自定义的 SplitBoltCountBolt 来进行词频统计。

案例源码下载地址:storm-word-count

5.2 代码实现

1. 项目依赖

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
</dependency>

2. DataSourceSpout

public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模拟产生数据
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }

    /**
     * 模拟数据
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

上面类使用 productData 方法来产生模拟数据,产生数据的格式如下:

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm

3. SplitBolt

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

4. CountBolt

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 输出
        System.out.print("当前实时统计结果:");
        counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
        System.out.println();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

5. LocalWordCountApp

通过 TopologyBuilder 将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。通常在开发中,可先用本地模式进行测试,测试完成后再提交到服务器集群运行。

public class LocalWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("DataSourceSpout", new DataSourceSpout());

        // 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");

        //  指明将 SplitBolt 的数据发送到 CountBolt 中 处理
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        // 创建本地集群用于测试 这种模式不需要本机安装 storm,直接运行该 Main 方法即可
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountApp",
                new Config(), builder.createTopology());
    }

}

6. 运行结果

启动 WordCountApp 的 main 方法即可运行,采用本地模式 Storm 会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。

六、提交到服务器集群运行

6.1 代码更改

提交到服务器的代码和本地代码略有不同,提交到服务器集群时需要使用 StormSubmitter 进行提交。主要代码如下:

为了结构清晰,这里新建 ClusterWordCountApp 类来演示集群模式的提交。实际开发中可以将两种模式的代码写在同一个类中,通过外部传参来决定启动何种模式。

public class ClusterWordCountApp {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("DataSourceSpout", new DataSourceSpout());

        // 指明将 DataSourceSpout 的数据发送到 SplitBolt 中处理
        builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");

        //  指明将 SplitBolt 的数据发送到 CountBolt 中 处理
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");

        // 使用 StormSubmitter 提交 Topology 到服务器集群
        try {
            StormSubmitter.submitTopology("ClusterWordCountApp",  new Config(), builder.createTopology());
        } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
            e.printStackTrace();
        }
    }

}

6.2 打包上传

打包后上传到服务器任意位置,这里我打包后的名称为 storm-word-count-1.0.jar

# mvn clean package -Dmaven.test.skip=true

6.3 提交Topology

使用以下命令提交 Topology 到集群:

# 命令格式: storm jar jar包位置 主类的全路径 ...可选传参
storm jar /usr/appjar/storm-word-count-1.0.jar  com.heibaiying.wordcount.ClusterWordCountApp

出现 successfully 则代表提交成功:

6.4 查看Topology与停止Topology(命令行方式)

# 查看所有Topology
storm list

# 停止  storm kill topology-name [-w wait-time-secs]
storm kill ClusterWordCountApp -w 3

6.5 查看Topology与停止Topology(界面方式)

使用 UI 界面同样也可进行停止操作,进入 WEB UI 界面(8080 端口),在 Topology Summary 中点击对应 Topology 即可进入详情页面进行操作。

七、关于项目打包的扩展说明

mvn package的局限性

在上面的步骤中,我们没有在 POM 中配置任何插件,就直接使用 mvn package 进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方 JAR 包,就会出现问题,因为 package 打包后的 JAR 中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。

这时候可能大家会有疑惑,在我们的项目中不是使用了 storm-core 这个依赖吗?其实上面之所以我们能运行成功,是因为在 Storm 的集群环境中提供了这个 JAR 包,在安装目录的 lib 目录下:

为了说明这个问题我在 Maven 中引入了一个第三方的 JAR 包,并修改产生数据的方法:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.8.1</version>
</dependency>

StringUtils.join() 这个方法在 commons.lang3storm-core 中都有,原来的代码无需任何更改,只需要在 import 时指明使用 commons.lang3

import org.apache.commons.lang3.StringUtils;

private String productData() {
    Collections.shuffle(list);
    Random random = new Random();
    int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
    return StringUtils.join(list.toArray(), "\t", 0, endIndex);
}

此时直接使用 mvn clean package 打包运行,就会抛出下图的异常。因此这种直接打包的方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的 JAR 包。

想把依赖包一并打入最后的 JAR 中,maven 提供了两个插件来实现,分别是 maven-assembly-pluginmaven-shade-plugin。鉴于本篇文章篇幅已经比较长,且关于 Storm 打包还有很多需要说明的地方,所以关于 Storm 的打包方式单独整理至下一篇文章:

Storm 三种打包方式对比分析

参考资料

  1. Running Topologies on a Production Cluster
  2. Pre-defined Descriptor Files

更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南

原文地址:https://www.cnblogs.com/heibaiying/p/11453444.html

时间: 2024-11-05 07:09:54

Storm 系列(五)—— Storm 编程模型详解的相关文章

CUDA C 编程指导(二):CUDA编程模型详解

CUDA编程模型详解 本文以vectorAdd为例,通过描述C在CUDA中的使用(vectorAdd这个例子可以在CUDA sample中找到.)来介绍CUDA编程模型的主要概念.CUDA C的进一步描述可以参考<Programming Interface>. 主要内容包括: 1.Kernels(核函数) 2.Thread Hierarchy(线程结构) 3.Memory Hierarchy(存储结构) 4.Heterogeneous Programming(异构编程) 5.Compute C

搜索引擎系列五:Lucene索引详解(IndexWriter详解、Document详解、索引更新)

一.IndexWriter详解 问题1:索引创建过程完成什么事? 分词.存储到反向索引中 1. 回顾Lucene架构图: 介绍我们编写的应用程序要完成数据的收集,再将数据以document的形式用lucene的索引API创建索引.存储. 这里重点要强调应用代码负责做什么,lucene负责做什么. 2. Lucene索引创建API 图示 通过该图介绍lucene创建索引的核心API:Document.IndexWriter Lucene中要索引的文档.数据记录以document表示,应用程序通过I

JVM系列(五) - JVM类加载机制详解

前言 本文将由浅及深,介绍Java类加载的过程和原理,进一步对类加载器的进行源码分析,完成一个自定义的类加载器. 正文 (一). 类加载器是什么 类加载器简言之,就是用于把.class文件中的字节码信息转化为具体的java.lang.Class对象的过程的工具. 具体过程: 在实际类加载过程中,JVM会将所有的.class字节码文件中的二进制数据读入内存中,导入运行时数据区的方法区中. 当一个类首次被主动加载或被动加载时,类加载器会对此类执行类加载的流程 – 加载.连接(验证.准备.解析).初始

Socket模型详解(转)

Socket模型详解(转) Socket模型详解 两种I/O模式 一.选择模型 二.异步选择 三.事件选择 四.重叠I/O模型 五.完成端口模型 五种I/O模型的比较 两种I/O模式 1. 两种I/O模式 阻塞模式:执行I/O操作完成前会一直进行等待,不会将控制权交给程序.套接字默认为阻塞模式.可以通过多线程技术进行处理. 非阻塞模式:执行I/O操作时,Winsock函数会返回并交出控制权.这种模式使用起来比较复杂,因为函数在没有运行完成就进行返回,会不断地返回 WSAEWOULDBLOCK错误

vSocket模型详解及select应用详解

1.I/O复用:selec函数 在介绍socket编程之前,首先要熟悉下I/O多路转接技术,尽管SOCKET通信编程有很多模型,但是,在UNIX环境下,使用I/O多路转接模型无疑是一种更好的选择,UNIX下有5种I/0模型,分别是阻塞式I/O.非阻塞式I/O.I/O复用(select和poll).信号驱动式I/O,异步I/O.这5种方式都可用SOCKET编程,这里只介绍阻塞式I/O和I/O复用,如果向详细了解I/O模型的,可以参考<UNIX网络编程卷一:套接字联网API>,或着查看 Socke

日志模型详解

日志模型详解 NET Core的日志模型主要由三个核心对象构成,它们分别是Logger.LoggerProvider和LoggerFactory.总的来说,LoggerProvider提供一个具体的Logger对象将格式化的日志消息写入相应的目的地,但是我们在编程过程中使用的Logger对象则由LoggerFactory创建,这个Logger利用注册到LoggerFactory的LoggerProvider来提供真正具有日志写入功能的Logger,并委托后者来记录日志. 目录一.Logger  

支持向量机(SVM)(五)-- SMO算法详解

一.我们先回顾下SVM问题. A.线性可分问题 1.SVM基本原理: SVM使用一种非线性映射,把原训练            数据映射到较高的维.在新的维上,搜索最佳分离超平面,两个类的数据总可以被超平面分开. 2.问题的提出: 3.如何选取最优的划分直线f(x)呢? 4.求解:凸二次规划 建立拉格朗日函数: 求偏导数: B.线性不可分问题 1.核函数 如下图:横轴上端点a和b之间红色部分里的所有点定为正类,两边的黑色部分里的点定为负类. 设: g(x)转化为f(y)=<a,y> g(x)=

PHP Socket 编程过程详解

PHP Socket 编程过程详解 Socket用于进程间通信.进程间通信通常基于客户端—服务端模型.此时,客户端—服务端是可以彼此交互的应用程序.客户端和服务端之间的交互需要连接.Socket编程负责的就是为应用程序之间建立可进行交互的连接. 在本文中,我们将学习如何用PHP创建一个简单的客户端—服务端.我们还将学习如何客户端应用程序如何发送消息到服务端,以及如何从服务端接受消息. 使用代码 目的:开发一个客户端用于发送string消息到服务端,服务端将相同的信息反转后返回给客户端. PHP服

I/O复用模型详解

一.httpd工作模型 prefork:进程模型,两级结构,主进程master负责生成子进程,每个子进程负责响应一个请求 worker:线程模型,三级结构,主进程master负责生成子进程,每个子进程负责生成多个线程,每个线程相应一个请求 event:线程模型,三级结构,主进程master负责生成子进程,每个子进程响应多个请求 二.I/O模型名词概念 同步/异步:关注的是消息通信机制 同步:synchronous,调用者等待被调用者返回信息,才能继续执行异步:asynchronous,被调用者通