storm的acker机制

一、简介:

storm中有一个很重要的特性:
保证发出的每个tuple都会被完整处理。一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所产生的所有的子tuple都被成功处理。如果任一个消息在timeout所指定的时间内没有完成处理,那这个tuple就失败了。

二、原理:

acker并不会为每个tuple都分配内存空间来完成跟踪,而是利用了一个非常巧妙的算法,这个算法只需使用恒定的20字节就可以完成整个tuple树的跟踪

具体原理:

acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下,
并且把得到的值更新为ack-val的新值。那么假设每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。

通俗理解:

1. 在spout产生一条tuple时,会向acker发送一条信息,让ack来进行跟踪

消息内容:{spout-tuple-id {:spout-task task-id :val ack-val}}
spout-tuple-id:这条tuple的id,每条tuple都会产生一个随机的MessageId
task-id:产生这条tuple的id,spout可能有多个task,每个task都会被分配一个唯一的taskId
ack-val:默认值为0,用来跟踪tuple

2. acker会在自己的map(类型为TimeCacheMap)里保存这条记录。 这就是acker对spout-tuple进行跟踪的核心数据结构, 对于每个spout-tuple所产生的tuple树的跟踪
都只需要保存上面这条记录。acker后面会检查:val什么时候变成0,变成0, 说明这个spout-tuple产生的tuple都处理完成了。

3. spout在发送完消息给acker后会将该tuple和MessageId发送到boltTask。boltTask在创建子tuple时并不会向acker发送消息让其跟踪,而是很巧妙的省略了这一步。
bolt在发射一个新的bolt的时候会把这个新tuple跟它的父tuple的关系保存起来(strom称之为anchoring)。然后在ack tuple的时候,storm会把要ack的tuple的id, 以及这个tuple新创建的所有的tuple的id的异或值发送给acker。消息格式是:(spout-tuple-id,tmp-ack-val)执行完这一步后,ack-val的值就变成了所有子tuple的id的异或值
ps:storm使用一致性哈希来把一个spout-tuple-id对应到acker, 因为每一个tuple知道它所有的祖宗的tuple-id, 所以它自然可以算出要通知哪个acker来ack。

4. 当所有子tuple都被ack之后,val会被异或成0,OK 整个tuple树执行跟踪完成。

场景分析:

1. 由于对应的task挂掉了,一个tuple没有被ack: storm的超时机制在超时之后会把这个tuple标记为失败,从而可以重新处理。

2. Acker挂掉了: 这种情况下由这个acker所跟踪的所有spout tuple都会超时,也就会被重新处理。

3. Spout挂掉了: 在这种情况下给spout发送消息的消息源负责重新发送这些消息。比如Kestrel和RabbitMQ在一个客户端断开之后会把所有”处理中“的消息放回队列。
由此可见storm的高度容错性。

时间: 2024-10-09 04:36:59

storm的acker机制的相关文章

storm的acker机制理解。

转载请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/6142356.html Storm 的拓扑有一些特殊的称为"acker"的任务,这些任务负责跟踪每个 Spout 发出的 tuple 的 DAG.开启storm tracker机制的前提有三个: 1. 在spout emit tuple的时候,要加上第3个参数messageid 2. 在配置中acker数目至少为1 3. 在bolt emit的时候,要加上第二个参数anchor tuple

Storm的ack机制

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调

storm(二) 事务机制

前言 为了保证tuple的强有序和exactly-once语义,storm提供了事务机制,为每个tuple提供一个id 设计方法1 为每个tuple设置一个事务id,在数据库保存事务id和当前处理的id做比较. 1.两个id不一样,由于事务的强有序特点,判断出该tuple没有出现过,所以更新id 2.id一样,重复出现,可以不用处理 问题: 这样做会导致新能很低,每个tuple都必须处理完后才能处理下一个tuple(否则会影响和下一个tuple的顺序),并且每个tuple还得至少访问一次数据库

Storm的ack机制在项目应用中的坑

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调

storm的并发机制

storm计算支持在多台机器上水平扩容,通过将计算切分为多个独立的tasks在集群上并发执行来实现. 一个task可以简单地理解:在集群某节点上运行的一个spout或者bolt实例. topology的组成部分:Nodes(服务器):配置在一个storm集群中的服务器,会执行topology的一部分运算.一个storm集群可以包括一个或者多个工作node; Workers(JVM虚拟机):一个NOde上相互独立运行的JVM进程.每个Node可以配置运行一个或者多个worke.一个topology

storm 消息确认机制及可靠性

worker进程死掉 在一个节点 kill work进程 比方 kill 2509  对work没有影响 由于会在其它节点又一次启动进程运行topology任务 supervisor进程死掉 supervisor进程kill掉 对work进程没有影响  由于他们是互相独立的! . nimbus进程死掉(存在HA的问题) nimbus假设死掉 整个任务挂掉 存在单点故障问题!(hadoop2有ha!.!!.! storm没有ha高可用) 节点宕机(和supervisor是一样的) ack/fail

Storm进程通信机制分析

本文主要分析storm的worker进程间消息传递机制,消息的接收和处理的大概流程见下图 在Storm中,worker进程内部的thread通信与worker进程间的通信有一些差别,worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架.worker进程内部通信或在同一个节点的不同worker的thread通信使用LMAX Disruptor来完成. 对于worker进程来说,为了管理流入和传出的消息,每个worker

Storm Ack容错机制

原文地址:https://www.cnblogs.com/tangzhe/p/9536808.html

Storm杂谈之Acker拾趣

Storm杂谈之Acker拾趣 本文所讲内容并非storm的acker机制,如果想看acker机制的让您失望了,不过在此奉上徐明明大牛的blog: Twitter Storm源代码分析之acker工作流程 Twitter Storm如何保证消息不丢失 或者查看<storm源码分析>(又给京狗打链接)第12章-storm的acker系统,里面会详细说明storm的acker机制,笔者在此就不多述(多述都是废话,还不一定有人家讲的好)了. 这篇主要讲一下,关于开acker和不开acker的区别.