spout和bolt

Storm,核心代码使用clojure书写,实用程序使用python开发,使用java开发拓扑。

  • Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。
  • Storm集群有两种节点:控制(master)节点和工作者(worker)节点。
  • 控制节点运行一个称之为”Nimbus”的后台程序,它类似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测。
  • 每个工作者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成
  • 一个Zookeeper集群负责Nimbus和多个Supervisor之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。

    此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;所有状态维持在Zookeeper或本地磁盘。这意味着你可以kill -9杀掉nimbus进程和supervisor进程,然后重启,它们将恢复状态并继续工作,就像什么也没发生。这种设计使storm极其稳定。这种设计中Master并没有直接和worker通信,而是借助一个中介Zookeeper,这样一来可以分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速回复任何失败的一方。

nimbus为主节点,一般只有一个,supervisor为从节点,可以有多个;

Nimbus节点接收到请求,对提交的拓扑进行分片,分成一个个的task,并将task和supervisor相关的信息提交到zookeeper集群上,supervisor会去zookeeper集群上领自己的task,通知自己的worker进程进行Task的处理。

Spout的主要方法:

open(Map conf,TopologyContext context,SpoutOutputCollector collector)  close() nextTuple() ack(Object msgId) fail(Object msgId)

open() :初始化方法

close():spout将要关闭时调用,但是不保证其一定被调用,因为在集群中supervisor节点,可以使用kill -9来杀死worker进程,只用storm是在本地模式下运行,如果是发送停止命令,是可以保证close执行的。

declareOutputFields方法:

声明要输出的tuple的字段名称。

void ack(Object msgid)

成功处理tuple时回调的方法,通常情况下,此方法的实现是将消息队列中的消息移除,防止重发。

void fail(Object msgid)

处理tuple失败时的回调方法,通常情况下,此方法的实现是将消息放回消息队列中然后在稍后时间里重发。

nextTuple()

Storm框架会一直调用此方法,输出无级到outputcollector.这种方法应该是非阻塞的。nextTuple,ack and fail都在spout任务的同一个线程中被循环调用。

public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index >=sentences.length) {
index=0;
}
Utils.sleep(1);
}

通常情况下,实现一个spout,可以直接实现IRichSpout,或者直接继承BaseRichSpout,可以少写些许代码。

Bolt

prepare()此方法与spout中的open()或mapper/reducer中的setup方法类似,在Task初始化时调用,它提供了bolt的执行环境。

void cleanup()在关闭前调用,同样不保证其一定执行。

execute()方法 接收一个tuple并进行处理,并用prepare方法传入的outputcollector的ack方法或fail来反馈处理结果。

实现bolt,可以实现irichbolt接口或继承baseRichbolt,如果自己不想处理结果反馈,可以实现IbaseBolt接口或继承BaseBasicBolt,它实际上自动实现了collector.emit.ack(inputtuple).

时间: 2024-10-21 21:25:56

spout和bolt的相关文章

关于storm的Spout、Bolt结构图 .

关于storm的Spout.Bolt结构图 绿色部分是我们最常用.比较简单的部分.红色部分是与事务相关的,在以后的文章会具体讲解. BaseComponent 是Storm提供的“偷懒”的类.为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法.这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法.但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null. 通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRic

Storm 基础 -- spout与bolt设置多重grouping

Topology的代码如下: TopologyBuilder builder = new TopologyBuilder(); //WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理 //当文件读取完成后,会发送一个global消息 builder.setSpout("word-reader",new WordReaderSpout()); builder.setBolt("word-normalizer", new

Spout的实现步骤

Spout的实现步骤: ·        对文件的改变进行分开的监听,并监视目录下有无新日志文件添加. ·        在数据得到了字段的说明后,将其转换成tuple. ·        声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径. Spout的具体编码在Listing Three中显示. Listing Three:Spout中open.nextTuple和delcareOutputFields方法的逻辑. 1.  public void open( Map c

Part of defining a topology is specifying for each bolt which streams it should receive as input

http://storm.apache.org/ [doing for realtime processing what Hadoop did for batch processing ] Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing fo

JStorm Storm 上手demo

折线之间的内容整理自:http://blog.csdn.net/suifeng3051/article/details/38369689 -------------------------------------------------------------------------------------------------------------------------------------------- 在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是S

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl

Storm概念、原理详解及其应用(一)BaseStorm

本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出.写这篇文章,是想把一些官文和资料中基础.重点拿出来,能总结出便于大家理解的话语.与大多数"wordcount"代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:"知其然,并知其所以然". Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是基于数据流的实时处理系统,提供了大吞吐量的实

Storm学习中遇到的问题整理

在编写storm代码来进行实时分析的时候遇到了一些问题,有些的确令人比较头痛,现在稍微做一下整理.数据流向(本地-Spout-Bolt-Hdfs) 1数据的输入输出文件的路径选择 因为在此项目中数据是放在本地磁盘的,所有就有两种放数据的途径,一种是直接通过spout接受本地数据发送到任务中,另一种是先把文件数目进行整理(缩小)放到hdfs系统,让后通过spout读取hdfs里面的数据,后来实际表明前一种更加适合(后一种受到了mapreduce的影响),现在提一下后一种遇到的问题,后一种的话是sp

Storm基本概念以及Topology的并发度

Spouts,流的源头 Spout是Storm里面特有的名词,Stream的源头,通常是从外部数据源读取tuples,并emit到topology Spout可以同时emit多个tupic stream,通过OutputFieldsDeclarer中的declareStream,method来定义 Spout需要实现RichSpout端口,最重要的方法是nextTuple,storm会不断调用接口从spout中取数据,同时需要注意的是Spout分为reliable or unreliable两种