storm Ack框架笔记

  Storm利用Acker Bolt节点跟踪消息,当Spout发送出去的消息以及这些消息所衍生出来的消息均被处理后,Spout将受到对应于该消息的Ack。实现要点:

  1、Storm中每条发送出去的消息都会对应一个随机的消息ID

  2、Spout发送消息后,将向Acker Bolt发送一条消息,该消息内容为<RootId,消息ID>,Acker bolt将为该消息创建一条跟踪项。

  3、Bolt产生要发送的消息时,会计算每条新消息的消息ID,并将消息ID发送至Acker Bolt,Acker Bolt对消息ID进行异或后存储。于是,Storm对新发送的消息进行了跟踪。

  4、Blot对输入的消息进行Ack时,也会将该消息ID发送到Acker Bolt,Acker Bolt对每条消息ID进行异或存储,由于该消息在被发送时,已经向Acker Bolt发送过消息ID,之后再被Acker时又再次发送该消息ID。根据异或的语义,这相当于对该消息的跟踪结束

  5、Acker Bolt在更新某一个消息的跟踪值时,若发现其跟踪值变为零,则向Spout节点发送消息,表明Spout发送的这条消息已经被成功处理。

  6、若Spout在发送消息时未指定用于消息跟踪的ID,系统则不对消息进行跟踪,Blot新产生的消息并不会被单独跟踪。

  7、Spout的每条消息以及由该消息演化而来的所有消息的跟踪负载为16个字节,8个字节的根消息ID以及8个字节的消息跟踪值AckValue.但是,由于Storm中采用HashMap对其进行存储,在32位的JVM中,每条消息至少需要20个字节的额外负载,故一条消息的跟踪需要40个字节左右的负载。

(一)1与2中,Spout发送T1到Bolt1,发送T2到Bolt2.T1和T2具有相同的内容,但表示不同的备份,每条消息都会对应一个ID,消息T1的anchors为<RootId,T1>,消息T2的anchors为<RootId,T2>

(二)3中,Spout在Acker Bolt中注册了一条记录RootId=T1^T2。

(三)4与6中,Bolt1发送新的消息T3、T4、T5到Blot3,同时对输入的消息进行Ack操作,消息内容为<RootId,T1^T3^T4^T5>,此时,Acker Bolt中的跟踪项为<RootId,T1^T2^T1^T3^T4^T5=>T2^T3^T4^T5>

(四)在5中,Bolt2对输入的消息T2进行Ack操作,它没有产生新的消息,发送到Acker Bolt的消息为<RootId,T2>,T2异或后消失。

(五)在7中,Blot3对输入的消息进行Ack操作,发送的消息为<RootId,T3^T4^T5>,此时Acker Bolt中的跟踪项为<RootId,T3^T4^T5^T3^T4^T5 =>0>

(六)Acker Bolt发现RootId对应的值为零,它认为该RootId对应的消息以及所有衍生出来的消息均已经被成功处理,于是它向Spout发送消息,而Spout将调用Ack回调方法。

那么每条被处理的消息必须进行Ack或者Fail操作,否则,虽然有超时机制可以对过期消息进行清空,但可能导致消息不断重传。(所以项目中每次进入bolt都有唯一性过滤?)

参考:《Storm 源码分析》

时间: 2024-10-26 17:53:30

storm Ack框架笔记的相关文章

Storm ack和fail机制再论

之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下   首先开启storm tracker机制的前提是, 1. 在spout emit tuple的时候,要加上第3个参数messageid 2. 在配置中acker数目至少为1 3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路   流程, 1. 当tuple具有messageid时,spout会把该tuple加到pending list里面    并发消息给acker,通知acker开

深度学习Keras框架笔记之AutoEncoder类

深度学习Keras框架笔记之AutoEncoder类使用笔记 keras.layers.core.AutoEncoder(encoder, decoder,output_reconstruction=True, weights=None) 这是一个用于构建很常见的自动编码模型.如果参数output_reconstruction=True,那么dim(input)=dim(output):否则dim(output)=dim(hidden). inputshape: 取决于encoder的定义 ou

深度学习Keras框架笔记之TimeDistributedDense类

深度学习Keras框架笔记之TimeDistributedDense类使用方法笔记 例: keras.layers.core.TimeDistributedDense(output_dim,init='glorot_uniform', activation='linear', weights=None W_regularizer=None, b_regularizer=None, activity_regularizer=None, W_constraint=None, b_constraint

Flume-ng+Kafka+storm的学习笔记

Flume-ng Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume的文档可以看http://flume.apache.org/FlumeUserGuide.html 官方的英文文档 介绍的比较全面. 不过这里写写自己的见解 这个是flume的架构图 从上图可以看到几个名词: Agent: 一个Agent包含Source.Channel.Sink和其他的组件.Flume就是一个或多个Agent构成的. Source:数据源.简单的说就是agent获取数据的入口

storm基础框架分析

背景 前期收到的问题: 1.在Topology中我们可以指定spout.bolt的并行度,在提交Topology时Storm如何将spout.bolt自动发布到每个服务器并且控制服务的CPU.磁盘等资源的? 2.Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息.如何保证消息不丢失以及如何实现重发消息机制? 上篇:storm是如何保证at least once语义的 回答了第2个问题. 本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架

storm入门(一):storm编程框架与举例

基础 http://os.51cto.com/art/201308/408739.htm 模型 http://www.cnblogs.com/linjiqin/archive/2013/05/28/3104016.html 一.Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies Streams Spouts Bolts Stream groupings Reliability Tasks Workers Configuration Storm集群和Hado

【Java】Java集合框架笔记

今天在QQ群上跟人家交流,错误得把List接口说成了ArrayList的父类,当时脸都红了,不行,一定要跪着回去复习Java,所以把Java的书拿出来,把集合框架的认真仔细的看了一遍,在EverNote上写了一篇笔记,所以搬到博客上来跟大家分享一下.以下就是了. Java集合框架支持以下两种类型的容器: 一种是为了存储一个元素的集合,简称为集合(collection) 另一种是为了存储键值对,成为图(map) 另外:Java集合框架内的所有实例类都实现了Clonable和Serializable

Python django框架笔记(三):django工作方式简单说明和创建用户界面

(一)  说明 简单说明下django的工作方式,并举2个例子. (二)  Django工作方式 假定我们有下面这些文件 ,这里在前2篇的基础上增加了 templates目录(存放html文件) 和static目录(存放图片JS.css等) (三)   实现一个完整的用户界面 #假设已经创建好了项目和应用.下面URLconf和视图函数没什么先后关系,根据自己喜好,你可以先写视图函数,也可以先定义URLconf. 有下面几个步骤 (1)   创建模型 (2)   定义项目的URLconf(mysi

[Java] SSH框架笔记_SSH三大框架的工作原理及流程

Hibernate工作原理及为什么要用? 原理:1.通过Configuration().configure();读取并解析hibernate.cfg.xml配置文件2.由hibernate.cfg.xml中的<mapping resource="com/xx/User.hbm.xml"/>读取并解析映射信息3.通过config.buildSessionFactory();//创建SessionFactory4.sessionFactory.openSession();//打