理解Storm可靠性消息

看过一些别人写的, 感觉有些东西没太说清楚,个人主要以源代码跟踪,参考个人理解讲述,有错误请指正。

1基本名词

1.1 Tuple: 消息传递的基本单位。很多文章中介绍都是这么说的, 个人觉得应该更详细一点。

在spout发送的时候,函数原型

public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

这里的tuple, 实际上是List<Object> 对象,返回的是 List<Integer> 是要发送的tast的IdsList

在bolt接收的时候, 函数原型

public void execute(Tuple tuple)

变成了一个Tuple对象,  结构应该也是一个list, List<Field1, value1, Field2, value2..>这样的一个结构, FieldList ValueList, 我们根据对应的fieldname就可以取出对应的getIntegerByField方法

回到spout对象中来, 在spout有一个定义的输出字段

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

这里定义的一个字段,所以我们在emit的时候就只能发送一个包含一个value的tuple(spout部分), storm会将field, 和 发送的value下标对应, 变成一个Tuple对象,  也就是上面说的

List<Field1, value1, Field2, value2..>这样的一个结构,  在bolt 之间传递tuple, 发送又是List<Object> tuple, 根据组装bolt定义的fiels, 再组合成Tuple对象给下一个Bolt处理

在发射的最后 还有一个 void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);  因为上面emit的时候已经返回List<taskid>, 所以它就知道要发送给哪些taskid处理,然后将taskid 和 tuple放入队列LinkedBlockingQueue, 代码如下

; worker.clj

( defn mk-transfer- fn [ transfer-queue ]

( fn [ task ^Tuple tuple ]

(.put ^LinkedBlockingQueue

transfer-queue [ task tuple ] )

))

然后单独会开启一个叫async-loop的线程,取出每条记录(taskid, tuple), 然后worker会从当前task建立一个到目标task的zeromq连接, 通过zeromq将tuple发送给目标task

总结: 每次emit都是根据List<Object>和定义的输出Fields组合成一个Tuple对象,,每个接受对象接收的是Tuple对象,如果处理完再发送又再组合字段, 在emit的时候返回LIst<taskids>,所以就知道发送给哪些Task, 然后拿这些taskid和tuple再组合成一个任务队列,通过zeromq发送到目标task,目标task接收到tuple进程处理至于并发度控制, 参考

http://www.cnblogs.com/chengxin1982/p/4001275.html

TupleID Tuple对应的ID,  在创建的时候赋予一个64位的id,主要用来跟踪消息

MsgID  官方解释 Emits a new tuple to the default output stream with the given message ID. 如果不指定,acker不会跟踪。主要作用 , 在spout收到fail时候, 能够定位到是哪条消息出错,能够决定重发. 使用实例  _collector.emit(new Values(sentence),  new Integer(num));

acker 消息跟踪者. acker 存储一个Map<taskid, ack val> ,  taskid为祖宗tuple创建者的taskid ack_val 为消息传递过程中的 tupleid的xor值,如果为0则知道是哪个spout或者bolt已经处理完了, 为什么会有bolt, 因为bolt在发射的时候,如果非锚定,就是不带tuple发射,它会被认为是祖宗tuple, 上一个tuple会认为已经结束.
至于分配发射源分配到acker, storm采用一致性hash 祖宗tupleid来分配,因为在所有的tuple中都能知道祖宗tupleid,所以在子孙tuple处理时, 知道该发送给哪个acker跟踪

时间: 2024-10-26 05:15:01

理解Storm可靠性消息的相关文章

Storm 内部消息缓存

这篇文件翻译自 http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ 当进行Storm调优时,理解Storm内部消息队列的配置十分有帮助.这篇文件将说明在Storm 0.8/0.9版本中一个Worker内部的消息通信. Storm Worker进程内部消息传输 这里所说的“内部消息”是指单台节点上的一个Worker进程内部的消息.这种通信依赖于Storm内部各种 LMAX

xor算法在storm可靠性中的应用

1.先看一下数学中的异或 异或xor是一个数学运算符.它应用于逻辑运算.异或符号为“^”. 异或也叫半加运算,其运算法则相当于不带进位的二进制加法:二进制下用1表示真,0表示假,则异或的运算法则为:0异或0=0,1异或0=1,0异或1=1,1异或1=0(同为0,异为1), 既然相同的对象XOR操作,结果是0,那么有这样一个公式, A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次. 2.storm可靠性的机制 storm中有一个系统级别的组件是acker,acker

ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据. 因此,通过Ack机制,很容易做到保证所有数据均被处理,一条都不漏. 另外需要注意的,当spout触发fail动作时,不会自动重发失败的tuple,需要spout自己重新获取数据,手动重新再发送一次 ack机制即, spout发送的每一条消

storm源码之理解Storm中Worker、Executor、Task关系【转】

[原]storm源码之理解Storm中Worker.Executor.Task关系 Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:1. Worker(进程)2. Executor(线程)3. Task 下图简要描述了这3者之间的关系:                                                    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服

用实例理解Storm的Stream概念

原文首发在个人博客:http://zqhxuyuan.github.io/2016/06/30/Hello-Storm/ 如需转载,请注明出处,谢谢! 缘起 事情源于在看基于Storm的CEP引擎:flowmix 的FlowmixBuilder代码, 每个Bolt设置了这么多的Group, 而且declareStream也声明了这么多的stream-id, 对于只写过WordCountTopology的小白而言, 直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Gr

用实例的方式去理解storm的并行度

什么是storm的并发度 一个topology(拓扑)在storm集群上最总是以executor和task的形式运行在suppervisor管理的worker节点上.而worker进程都是运行在jvm虚拟机上面的,每个拓扑都会被拆开多个组件分布式的运行在worker节点上. 1.worker 2.executor 3.task 这三个简单关系图: 一个worker工作进程运行一个拓扑的子集(其实就是拓扑的组件),每个组件的都会以executor(线程)在worker进程上执行,一个worker进

[转载] 快速理解Kafka分布式消息队列框架

转载自http://blog.csdn.net/xiaolang85/article/details/18048631 ==是什么 == 简单的说,Kafka是由Linkedin开发的一个分布式的消息队列系统(Message Queue) 目标Scope(解决什么问题) kafka开发的主要初衷目标是构建一个用来处理海量日志,用户行为和网站运营统计等的数据处理框架.在结合了数据挖掘,行为分析,运营监控等需求的情况下,需要能够满足各种实时在线和批量离线处理应用场合对低延迟和批量吞吐性能的要求.从需

4. Storm可靠性

storm高可靠性: storm有一种机制可以保证从spout发出的每个tuple都会被完全处理 可靠性机制: 1.节点故障迁移 当一个节点上的worker出现问题是,会自动切到其他节点: 2.消息完整发送 一个消息(tuple)从spout发送出来,可能会导致成百上千的消息基于此消息被创建 "单词统计"的例子: storm任务从数据源每次读取一个完整的英文句子:将这个句子分解为独立的单词,最后,实时的输出每个单词以及它出现过的次数. 每个从spout发送出来的消息(每个英文句子)都会

Storm可靠性实例解析——ack机制

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性 很显然,要做到这个特性,必须要track每个data的去向和结果.Storm是如何做到的呢——acker机制. 先概括下acker所参与的工作流程: Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪: Bolt在处理Tuple成功或失败后,也会发一个消息通知acker: acker会找到发射该Tuple的Spout,回调其ack或fail方法. 我们说RichBolt和Basi