关于storm的Spout、Bolt结构图 .

关于storm的Spout、Bolt结构图

  绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的,在以后的文章会具体讲解。

  BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。


  通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout

  通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple)。


补充——RichBolt vs BasicBolt

直接用BasicBolt,会在execute()后自动ack/fail Tuple,而RichBolt则需要自行调用ack/fail。

那什么时候使用RichBolt? Bolt不是在每次execute()时立刻产生新消息,需要异步的发送新消息(比如聚合一段时间的数据再发送)时,又或者想异步的ack/fail原消息时就需要。

BasicBolt的prepare()里并没有collector参数,只在每次execute()时传入collector。而RichBolt刚好相反,你可以在初始化时就把collector保存起来,用它在任意时候发送消息。

另外,如果用RichBolt的collector,还要考虑在发送消息时是否带上传入的Tuple,如果不带,则下游的处理节点出错也不会回溯到Spout重发。用BasicBolt则已默认带上。

时间: 2024-10-28 22:26:22

关于storm的Spout、Bolt结构图 .的相关文章

Storm之spout,bolt编写

Storm,核心代码使用clojure书写,实用程序使用python开发,使用java开发拓扑. Nimbus节点接收到请求,对提交的拓扑进行分片,分成一个个的task,并将task和supervisor相关的信息提交到zookeeper集群上,supervisor会去zookeeper集群上领自己的task,通知自己的worker进程进行Task的处理. Spout的主要方法: open(Map conf,TopologyContext context,SpoutOutputCollector

Storm中Spout使用注意事项小结

Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待.这里罗列几点个人觉得编写Spout代码时需要特别注意的地方: 1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据:另外的一个或多个线程负责从数据源(如各种消息中间件.db等)读取数据并放入queue中. 2. 如果不关心数据是否丢

转:storm中一个Bolt发emit多次相同类型消息

在storm中的Bolt中可以处理完成逻辑后,向后面的Blot继续发送消息. 可以发送多个不同的消息,如: collector.emit("update-delivered-status",new Values(emailDeliverStatus)); collector.emit("save-request",new Values(udsn)); 也可以同一个类型的消息发送多个不同内容如; for (int i = 0; i < emailParamVo.

Storm 基础 -- spout与bolt设置多重grouping

Topology的代码如下: TopologyBuilder builder = new TopologyBuilder(); //WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理 //当文件读取完成后,会发送一个global消息 builder.setSpout("word-reader",new WordReaderSpout()); builder.setBolt("word-normalizer", new

storm学习笔记

Storm学习笔记 一.简介 本文使用的Storm版本为1.0.1 Storm是一个免费开源的分布式实时计算系统,它使得可靠地处理无限的数据流更加容易,可以实时的处理Hadoop的批量任务.Storm简单易用,且支持各种主流的程序语言. Storm有很多适用场景:实时分析.在线机器学习.连续计算.分布式RPC.分布式ETL.易扩展.支持容错,可确保你的数据得到处理,易于构建和操控. 下图是Storm"流式数据处理"的概念图,即数据像水流一样从数据源头源源不断的流出,经过每个节点,每个节

storm问题记录(1) python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理

一.问题背景 Python 写的脚本,不断从txt文件中读取一行数据封装成消息,作为producer发给kafka, storm的spout从kafka中读取这些消息后做一些处理发送给bolt,bolt最后将数据按既定的格式写入到HBASE 二.问题描述 一共14000条左右的数据,加调试信息观察到spout把消息都读到处理并发射了,但是bolt中只处理了一部分(2000多条,还有一万条显然没有处理到),写入HBASE的也只有2000多条,即Bolt读到的那些 出问题时的最后的log: OLT

storm spout的速度抑制问题

转发请注明原文地址:http://www.cnblogs.com/dongxiao-yang/p/6031398.html 最近协助同事优化一个并发消费kafka数据用来计算的任务,压测过程中发现有两个spout对应的topic消费速度明显低于其他topic的指标,每个spout分配10个并发消费速度到了1w左右完全就上不去了,通过监控埋点分析出spout以及下游的bolt代码块里面的业务代码执行耗时完全不高于其余可以正常消费的topic对应的spout组件. 最后只能摘出有问题的代码新做一个d

storm 中的Python bolt的注意事项

Storm可支持多种语言,其中就有python . 首先需要创建一个类, public static class BasieCalculateBolt extends ShellBolt implements IRichBolt { public BasieCalculateBolt() { super("python", "bolt_base_calculate.py"); } @Override public void declareOutputFields(O

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl