storm 核心API之普通Topology

普通Topology

如果建立自己的Topology(非Transactional的),用户通常需要利用如下接口和对象:

IRichBolt

IRichSpout

TopologyBuilder

public interface ISpout extends Serializable {

void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

void close();

void activate();

void deactivate();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId);

}

public interface IBolt extends Serializable {

void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

void execute(Tuple input);

void cleanup();

}

IRichBolt和IRichSpout与IBolt和ISpout的不同在于多了两个接口:

declareOutputFields(OutputFieldsDeclarer declarer):声明输出字段

getComponentConfiguration()
:该接口是在0.7.0引入的,用于支持组件级的配置,即允许用户针对单个Spout或Bolt进行参数配置。

实现了这两个接口后,通过调用TopologyBuilder建立起Topology。TopologyBuilder实际上是封装了StormTopology的thrift接口,也就是说Topology实际上是通过thrift定义的一个struct,TopologyBuilder将这个对象建立起来,然后nimbus实际上会运行一个thrift服务器,用于接收用户提交的结构。由于是采用thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。

对于用户来说,通常需要做的就是提供自己的ISpout和IBlot实现,然后利用TopologyBuilder建立起自己需要的拓扑结构。

Storm框架会拿到用户提供这个拓扑结构及Spout和Blot对象,驱动整个处理过程。简单介绍下ISpout的那些接口的调用时机,在创建Spout对象时,会调用open函数。对象销毁时调用close(),但是框架并不保证close函数一定会被调用,因为进程可能是通过kill
-9被杀死的。activate和deactivate是在spout被activate或deactivate时被调用,这两个动作是由用户从外部触发的,Strom的命令行提供两个命令activate和deactivate,允许用户activate和deactivate一个Topology,当用户执行deactivate时,对应Topology的spout会被deactivate,产生影响就是spout的nextTuple此后将不会被调用,直到用户再调用activate。Spout的核心功能是通过nextTuple实现的,用户通过该函数完成Tuple的发射。该函数会被框架周期性的调用。会有类似如下的一个循环:

While(true)

{

if(…)

spout.activate();

if(…)

sput.deactivate();

if(…)

spout.nextTupe();

}

首先这三个函数都是在一个线程中,因此不需要同步。其次,nextTuple()不能阻塞,如果没有Tuple可以发射需要立即返回,用户不能提供一个阻塞式的实现,否则可能阻塞整个后台循环。另外,后台可能会调节nextTuple()的调用频率,比如系统有一个配置参数可以控制当前被pending的Tuple最大数目,如果达到这个限制,可能就会做一些流控。

ack和fail则是两个回调函数。Spout在发射出一个tuple后,该tuple会通过acking机制被acker追踪,除了显式的fail和ack外,每个tuple有一个超时时间,如果超过这个时间还未确定该tuple的状态,那么acker会通知spout,这个tuple处理失败了,然后框架得到这个消息后,就会调用spout的fail函数,如果acker发现这个tuple处理成功了,也会通知spout,然后会调用spout的ack函数。所以通常来说用户在发射tuple时,要确保数据不丢失,都会将已经发射的tuple缓存起来,然后在ack函数中删除对应tuple,在fail函数中重发对应的tuple。

另外需要注意的一点是,Spout使用的collector是SpoutOutputCollector,Bolt使用的collector是OutputCollector。这两个虽然提供的功能类似,都是负责发送tuple的,但是由于一个是面向Spout,一个是面向Bolt的,它们的接口也略有不同。具体如下:

public interface ISpoutOutputCollector {

List<Integer> emit(String streamId, List<Object> tuple, Object messageId);

void emitDirect(int taskId, String streamId, List<Object> tuple, Object
messageId);

void reportError(Throwable error);

}

Spout通过调用ISpoutOutputCollector的emit函数进行tuple的发射,当然实际上emit函数并未完成实际的发送,它主要是根据用户提供的streamId,计算出该tuple需要发送到的目标taskID。emitDirect函数,更裸一些,直接指定目标taskID。它们都只是将<tasked,tuple>组成的序列对放到一个队列中,然后会有另一个线程负责将tuple从队列中取出并发送到目标task。

public interface IOutputCollector extends IErrorReporter {

List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object>
tuple);

void ack(Tuple input);

void fail(Tuple input);

}

IOutputCollector是会被Bolt调用的,与ISpoutOutputCollector功能类似。但是区别也很明显,首先我们可以看到它的emit系列函数,多了一个参数Collection<Tuple>
anchors,增加这样一个anchors原因在于,对于spout来说,它产生的tuple就是root tuple,但是对于bolt来说,它是通过一个或多个输入tuple,进而产生输出tuple的,这样tuple之间是有一个父子关系的,anchors就是用于指定当前要emit的这个tuple的所有父亲,正是通过它,才建立起tuple树,如果用户给了一个空的anchors,那么这个要emit的tuple将不会被加入tuple树,也就不会被追踪,即使后面它丢失了,也不会被spout感知。

除了anchors参数外,IOutputCollector还多了ack和fail两个接口。这两个接口,与Spout的ack和fail完全不同,对于Spout来说ack和fail是提供给Spout在tuple发送成功或失败时进行处理的一个机会。而IOutputCollector的ack和fail则是向acker汇报当前tuple的处理状态的,是需要Bolt在处理完tuple后主动调用的。

更多分享请关注:bbs.superwu.cn 关注超人学院官方微信:BJ-CRXY

时间: 2024-10-09 10:42:00

storm 核心API之普通Topology的相关文章

Storm笔记整理(四):Storm核心概念与验证——并行度与流式分组

[TOC] Storm核心概念之并行度 Work 1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务).1个worker进程会启动1个或多个executor线程来执行1个topology的(spout或bolt).因此,1个运行中的topology就是由集群中多台(可能是一台)物理机上的一个或者多个worker进程组成的. Executor executor是worker进程启动的一个单独线程. 每个executor只会运行1个topo

Storm 系列(二)—— Storm 核心概念详解

一.Storm核心概念 1.1 Topologies(拓扑) 一个完整的 Storm 流处理程序被称为 Storm topology(拓扑).它是一个是由 Spouts 和 Bolts 通过 Stream 连接起来的有向无环图,Storm 会保持每个提交到集群的 topology 持续地运行,从而处理源源不断的数据流,直到你将主动其杀死 (kill) 为止. 1.2 Streams(流) Stream 是 Storm 中的核心概念.一个 Stream 是一个无界的.以分布式方式并行创建和处理的

Unit02: JDBC核心API

Unit02: JDBC核心API db.properties 注意:如果使用连接池,可以在这个文件中增加对连接池的相关设置: 连接池参数,常用参数有: 初始连接数 最大连接数 最小连接数 每次增加的连接数 超时时间 最大空闲连接 最小空闲连接 # db connection parameters # key=value driver=oracle.jdbc.driver.OracleDriver url=jdbc:oracle:thin:@192.168.201.227:1521:orcl u

配置文件详解和核心api讲解

一.配置文件详解 1.映射文件详解 1.映射配置文件的位置和名称没有限制. -建议:位置:和实体类放在统一目录下.  名称:实体类名称.hbm.xml.    2.在映射配置文件中,标签内的name属性的值要和实体内的属性对应. (1)class标签内的name的值为实体类的全路径. (2)property标签内的name的值为实体类的属性. (3)id标签内的name的值为实体类的属性. (4)id和property标签内的column属性可以不写. (5)id和property标签内有一个t

Java核心API需要掌握的程度

分类: java技术2009-08-29 01:03 213人阅读 评论(0) 收藏 举报 javaapiswingxmlio Java的核心API是非常庞大的,这给开发者来说带来了很大的方便,经常人有评论,java让程序员变傻. 但是一些内容我认为是必须掌握的,否则不可以熟练运用java,也不会使用就很难办了. 1.java.lang包下的80%以上的类的功能的灵活运用. 2.java.util包下的80%以上的类的灵活运用,特别是集合类体系.正规表达式.时间.属性.和Timer. 3.jav

笔记-Node.js中的核心API之HTTP

最近正在学习Node,在图书馆借了基本关于Node的书,同时在网上查阅资料,颇有收获,但是整体感觉对Node的理解还是停留在一个很模棱两可的状态.比如Node中的模块,平时练习就接触到那么几个,其他的一些模块暂时只会在学习的时候接触到,不常用便就荒废了.正所谓好记心不如烂笔头,多做笔记还是更有利于理解和记忆.自己做的总结也方便回头复习,所以决定踏上漫长的修炼之旅-- Node提供了许多API,其中一些比较重要.这些核心的API是所有Node应用的支柱,你会不停的用到他们. HTTP服务器 Nod

笔记-Nodejs中的核心API之Events

最近正在学习Node,在图书馆借了基本关于Node的书,同时在网上查阅资料,颇有收获,但是整体感觉对Node的理解还是停留在一个很模棱两可的状态.比如Node中的模块,平时练习就接触到那么几个,其他的一些模块暂时只会在学习的时候接触到,不常用便就荒废了.正所谓好记心不如烂笔头,多做笔记还是更有利于理解和记忆.自己做的总结也方便回头复习,所以决定踏上漫长的修炼之旅-- Node提供了许多API,其中一些比较重要.这些核心的API是所有Node应用的支柱,你会不停的用到他们. Events 几乎所有

JDBC核心API

JDBC核心API在java.sql.*和javax.sql.* 1.Driver接口:表示Java驱动程序接口,具体的数据库厂商要实现其此接口 connect(url.propertis):连接数据库的方法 url:连接数据库的URL url语法格式:jdbc协议:数据库子协议://主机:端口/数据库 user:数据库用户名 password:数据库用户密码 2.DriverManager类:驱动管理器的类,管理所有的注册驱动程序 registerDriver(driver):注册驱动 Con

Hibernate核心API

五.核心API Configuration A) AnnotationConfiguration B) 进行配置信息的管理 C) 用来产生SessionFactory D) 可以在configure方法中指定hibernate配置文件 E) 只需要关注一个方法:buildSessionFactory() 1.configure()方法有一个重载的方法configure(String str),用于指定配置文件的路径. 2.SessionFactory可以用于产生session,如调用其getCu