Storm之spout,bolt编写

Storm,核心代码使用clojure书写,实用程序使用python开发,使用java开发拓扑。

Nimbus节点接收到请求,对提交的拓扑进行分片,分成一个个的task,并将task和supervisor相关的信息提交到zookeeper集群上,supervisor会去zookeeper集群上领自己的task,通知自己的worker进程进行Task的处理。

Spout的主要方法:

open(Map conf,TopologyContext context,SpoutOutputCollector collector)  close() nextTuple() ack(Object msgId) fail(Object msgId)

open() :初始化方法

close():spout将要关闭时调用,但是不保证其一定被调用,因为在集群中supervisor节点,可以使用kill -9来杀死worker进程,只用storm是在本地模式下运行,如果是发送停止命令,是可以保证close执行的。

declareOutputFields方法:

声明要输出的tuple的字段名称。

void ack(Object msgid)

成功处理tuple时回调的方法,通常情况下,此方法的实现是将消息队列中的消息移除,防止重发。

void fail(Object msgid)

处理tuple失败时的回调方法,通常情况下,此方法的实现是将消息放回消息队列中然后在稍后时间里重发。

nextTuple()

Storm框架会一直调用此方法,输出无级到outputcollector.这种方法应该是非阻塞的。nextTuple,ack and fail都在spout任务的同一个线程中被循环调用。

public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index >=sentences.length) {
index=0;
}
Utils.sleep(1);
}

通常情况下,实现一个spout,可以直接实现IRichSpout,或者直接继承BaseRichSpout,可以少写些许代码。

Bolt

prepare()此方法与spout中的open()或mapper/reducer中的setup方法类似,在Task初始化时调用,它提供了bolt的执行环境。

void cleanup()在关闭前调用,同样不保证其一定执行。

execute()方法 接收一个tuple并进行处理,并用prepare方法传入的outputcollector的ack方法或fail来反馈处理结果。

实现bolt,可以实现irichbolt接口或继承baseRichbolt,如果自己不想处理结果反馈,可以实现IbaseBolt接口或继承BaseBasicBolt,它实际上自动实现了collector.emit.ack(inputtuple).

时间: 2024-08-30 13:57:08

Storm之spout,bolt编写的相关文章

关于storm的Spout、Bolt结构图 .

关于storm的Spout.Bolt结构图 绿色部分是我们最常用.比较简单的部分.红色部分是与事务相关的,在以后的文章会具体讲解. BaseComponent 是Storm提供的“偷懒”的类.为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法.这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法.但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null. 通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRic

Storm中Spout使用注意事项小结

Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待.这里罗列几点个人觉得编写Spout代码时需要特别注意的地方: 1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据:另外的一个或多个线程负责从数据源(如各种消息中间件.db等)读取数据并放入queue中. 2. 如果不关心数据是否丢

转:storm中一个Bolt发emit多次相同类型消息

在storm中的Bolt中可以处理完成逻辑后,向后面的Blot继续发送消息. 可以发送多个不同的消息,如: collector.emit("update-delivered-status",new Values(emailDeliverStatus)); collector.emit("save-request",new Values(udsn)); 也可以同一个类型的消息发送多个不同内容如; for (int i = 0; i < emailParamVo.

Storm 基础 -- spout与bolt设置多重grouping

Topology的代码如下: TopologyBuilder builder = new TopologyBuilder(); //WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理 //当文件读取完成后,会发送一个global消息 builder.setSpout("word-reader",new WordReaderSpout()); builder.setBolt("word-normalizer", new

storm学习笔记

Storm学习笔记 一.简介 本文使用的Storm版本为1.0.1 Storm是一个免费开源的分布式实时计算系统,它使得可靠地处理无限的数据流更加容易,可以实时的处理Hadoop的批量任务.Storm简单易用,且支持各种主流的程序语言. Storm有很多适用场景:实时分析.在线机器学习.连续计算.分布式RPC.分布式ETL.易扩展.支持容错,可确保你的数据得到处理,易于构建和操控. 下图是Storm"流式数据处理"的概念图,即数据像水流一样从数据源头源源不断的流出,经过每个节点,每个节

storm问题记录(1) python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理

一.问题背景 Python 写的脚本,不断从txt文件中读取一行数据封装成消息,作为producer发给kafka, storm的spout从kafka中读取这些消息后做一些处理发送给bolt,bolt最后将数据按既定的格式写入到HBASE 二.问题描述 一共14000条左右的数据,加调试信息观察到spout把消息都读到处理并发射了,但是bolt中只处理了一部分(2000多条,还有一万条显然没有处理到),写入HBASE的也只有2000多条,即Bolt读到的那些 出问题时的最后的log: OLT

storm spout的速度抑制问题

转发请注明原文地址:http://www.cnblogs.com/dongxiao-yang/p/6031398.html 最近协助同事优化一个并发消费kafka数据用来计算的任务,压测过程中发现有两个spout对应的topic消费速度明显低于其他topic的指标,每个spout分配10个并发消费速度到了1w左右完全就上不去了,通过监控埋点分析出spout以及下游的bolt代码块里面的业务代码执行耗时完全不高于其余可以正常消费的topic对应的spout组件. 最后只能摘出有问题的代码新做一个d

storm 中的Python bolt的注意事项

Storm可支持多种语言,其中就有python . 首先需要创建一个类, public static class BasieCalculateBolt extends ShellBolt implements IRichBolt { public BasieCalculateBolt() { super("python", "bolt_base_calculate.py"); } @Override public void declareOutputFields(O

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl