Storm 基础 -- spout与bolt设置多重grouping

Topology的代码如下:

TopologyBuilder builder = new TopologyBuilder();
//WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理
//当文件读取完成后,会发送一个global消息
builder.setSpout("word-reader",new WordReaderSpout());
builder.setBolt("word-normalizer", new WordNormalizerBolt())
        .shuffleGrouping("word-reader")
        .globalGrouping("word-reader", "FINISH");

builder.setBolt("word-counter", new WordCounterBolt(),1)
     .fieldsGrouping("word-normalizer", new Fields("word"))
     .globalGrouping("word-normalizer", "CLOSE");

以globalGrouping为例:

globalGrouping(“word-reader”, “FINISH”); 两个参数的含义

第一个参数: “word-reader” 为componet id, 这个值 与我们代码的 中的word-reader一致。

builder.setSpout("word-reader",new WordReaderSpout());

第二个参数: “FINISH”为stream id

这个值是在WordReaderSpout中定义的

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("line"));
    declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
}

下面以我们熟知的HashMap为例,解释这两个参数

1) 我们需要定义一个word-reader 的HashMap对象

Map<String, Object> word-reader = new HashMap<String, Object>();

2) 我们调用put两个对象进去,这两个对象对应的Key值分别为default与FINISH

word-reader.put("default", new Object());
word-reader.put("FINISH", new Object());

3) 有其它的对象需要获取我们put进去的两个对象,首先就需要先获取word-reader的Handler。

WordNormalizerBolt  word-normalizer = new WordNormalizerBolt();
word-normalizer.setMap(word-reader);

对应到Storm的代码中,就如下:

1) 首先我们需要创建一个word-reader的对象, —这是bolt代码

builder.setSpout("word-reader",new WordReaderSpout());

2) 我们需要put两个对象,这里与Map可能有点儿差别,因为需要先声明。 — 这是spout的代码

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("line")); //declare()相当于declarer.declareStream(“defalut”, new Fields("line"));
        declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
    }

而put对象则是在nextTuple()中进行的

    public void nextTuple() {
                 ... ...
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                               //put一个key为"default"的对象
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
                      ... ...
        }
         //put一个key为"FINISH"的对象
        this.collector.emit("FINISH", new Values("Finish"));
    }

3) bolt获取Map对象 —- 在TopologyBuilder代码中实现

builder.setBolt("word-normalizer", new WordNormalizerBolt())
        .shuffleGrouping("word-reader")
        .globalGrouping("word-reader", "FINISH");

最后附上源码:http://download.csdn.net/detail/eyoulc123/9514466

时间: 2024-08-29 04:34:54

Storm 基础 -- spout与bolt设置多重grouping的相关文章

关于storm的Spout、Bolt结构图 .

关于storm的Spout.Bolt结构图 绿色部分是我们最常用.比较简单的部分.红色部分是与事务相关的,在以后的文章会具体讲解. BaseComponent 是Storm提供的“偷懒”的类.为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法.这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法.但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null. 通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRic

Storm基础

Storm基本概念 Storm是一个开源的实时计算系统,它提供了一系列的基本元素用于进行计算:Topology.Stream.Spout.Bolt等等. 在Storm中,一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似.但是有一点不同的是:在Hadoop中,MapReduce任务最终会执行完成后结束:而在Storm中,Topology任务一旦提交后永远不会结束,除非你显示去停止任务. 计算任务Topology是由不同的Spouts和Bolts,通

storm - 基础概念整理

理论 Hadoop的出现虽然为大数据计算提供了一条捷径,但其仍然存在自身难以克服的缺点:实时性不足.Hadoop的一轮计算的启动需要较长时间,因此其满足不了对实时性有较高要求的场景. Storm由此应运而生,提供了可扩展的,可靠的,易于使用,而且是编程语言无关的实时大数据处理框架. 使用 Components of a storm cluster Storm集群类似于Hadoop集群,storm运行与topo之上. Storm集群中存在两类节点:master节点和worker节点.master运

storm基础框架分析

背景 前期收到的问题: 1.在Topology中我们可以指定spout.bolt的并行度,在提交Topology时Storm如何将spout.bolt自动发布到每个服务器并且控制服务的CPU.磁盘等资源的? 2.Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息.如何保证消息不丢失以及如何实现重发消息机制? 上篇:storm是如何保证at least once语义的 回答了第2个问题. 本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架

Storm 基础知识

分布式的实时计算框架,storm对于实时计算的意义类似于hadoop对于批处理的意义. Storm的适用场景: 1.流数据处理:storm可以用来处理流式数据,处理之后将结果写到某个存入中去. 2.持续计算:连续发送数据到客户端,使它们能够实时更新并显示结果,如网站指标 3.分布式RPC:由于storm的处理组件是分布式的,而且处理延迟极低,所以可以作为一个通用的分布式rpc框架来使用. 我们的搜索引擎本身也是一个分布式rpc系统. storm关注的是数据的一次写入多次处理,storm的job运

Storm中Spout使用注意事项小结

Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待.这里罗列几点个人觉得编写Spout代码时需要特别注意的地方: 1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据:另外的一个或多个线程负责从数据源(如各种消息中间件.db等)读取数据并放入queue中. 2. 如果不关心数据是否丢

用addOnload()函数设置多重onload属性

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-

网络编程基础API、属性设置

网络中Socket数据传输是一种特别的I/O,Socket也是一种文件描述符.Socket也具备一个类似于打开文档的函数调用Socket(),该函数返回一个整型的Socket描述符,随后的连接建立.数据传输等操作都是通过该Socket实现的. 常用的Socket类型有两种:流式Socket (SOCK_STREAM)和数据报式Socket(SOCK_DGRAM).流式是一种面向连接的Socket,针对于面向连接的TCP服务应用:数据报式Socket是一种无连的Socket,对应于无连接的UDP服

Storm之spout,bolt编写

Storm,核心代码使用clojure书写,实用程序使用python开发,使用java开发拓扑. Nimbus节点接收到请求,对提交的拓扑进行分片,分成一个个的task,并将task和supervisor相关的信息提交到zookeeper集群上,supervisor会去zookeeper集群上领自己的task,通知自己的worker进程进行Task的处理. Spout的主要方法: open(Map conf,TopologyContext context,SpoutOutputCollector