Storm消息可靠处理机制

在很多应用场景中,分布式系统的可靠性保障尤其重要。比如电商平台中,客户的购买请求需要可靠处理,不能因为节点故障等原因丢失请求;比如告警系统中,产生的核心告警必须及时完整的知会监控人员,不能因为网络故障而丢失数据。

Storm消息可靠性保障是Storm核心特性之一,其中消息树的跟踪管理机制是Storm核心算法之一,本文将详细介绍Storm消息可靠处理机制。我们从Storm初探中的例子入手。

一、消息处理流程

1、 Spout节点

(1) Spout接收到一个文本消息;

msg1

刘备 关羽 张飞

曹操 郭嘉 荀彧

(2) Spout把文本消息拆分为2个行字符串消息,并把2个消息发送给NamesSplit Bolt节点。

2、 NamesSplit Bolt节点

(1) NamesSplit Bolt接收到两个行字符串消息;

msg2 刘备 关羽 张飞

msg3 曹操 郭嘉 荀彧

(2) NamesSplit Bolt把2个行字符串消息拆分为6个名字消息,发送给HelloWorld Bolt节点;

(3) NamesSplit Bolt确认,msg2、msg3处理完成。

3、 HelloWorld Bolt节点

(1) HelloWorld Bolt接收到6个名字消息;

msg4 刘备

msg5 关羽

msg6 张飞

msg7 曹操

msg8 郭嘉

msg9 荀彧

(2) HelloWorld Bolt SayHello;

(3) HelloWorld Bolt确认,msg4、msg5、msg6、msg7、msg8、msg9处理完成。

 二、关键代码

 1、 Spout

下面代码表示Spout节点发送消息,消息绑定到messageId上,这里的messageId可以看做上述例子中的msg1,tuple可以看做上述例子中的msg2或msg3。

public void nextTuple()
{
    this.collector.emit(List<Object> tuple, Object messageId);
}

下面代码会在消息处理成功或失败后调用。

public void ack(Object msgId)
{
}

public void fail(Object msgId)
{
}

2、 Bolt

这段代码是Bolt消息处理发送代码,我们详细看一下标红代码。

public void execute(Tuple input)
{
    String[] nameArray = names.split(" ");
    for(String name : nameArray)
    {
        List<Object> splitList = new ArrayList<Object>();
        splitList.add(name);
        collector.emit(inputList, splitList);
    }
    collector.ack(input);
}
OutputCollector.emit(Collection<Tuple> anchors, List<Object> tuple) 中tuple表示发送的子消息,anchors表示子消息的父节点。这段代码既发送了子消息,有把子消息锚定到了消息树上。上述例子中,相当于把消息msg4 刘备锚定到消息msg2 刘备 关羽 张飞上。OutputCollector.ack(Tuple input)表示回答消息处理完成。上述例子中,相当于确认msg4 刘备处理完成。
下面代码会在消息处理成功或失败后调用。
public void ack(Object msgId)
{
}

public void fail(Object msgId)
{
}

三、消息重发机制

可以看到,一条消息从Spout发送后,会产生一棵消息树,只有当消息树中的所有消息都被确认后(ack),Storm才认为消息处理完成。

代码上可以轻易看出,我们只需要指定根节点消息ID(即Spout接收到的消息ID),其他消息ID系统会自动生成。同时,我们只需要确认非根节点消息处理完成。

实际上,Spout或者Bolt没发送一条消息,消息便会存储到kestrel队列中,Bolt每接收到一条消息,kestrel便会标记这条消息在处理中(pengding),知道该条消息被确认处理完成,kestrel才把它移除出队列。

Bolt消息处理过程中,发生异常或者超时,kestrel会把该条消息从处理中状态重新置为待处理状态,等待Storm下一次调度处理。

四、消息树管理算法

可以看到,Spout每处理一个消息,就会生成一棵消息树,如果Storm存储每个消息树每个节点的状态,内存很快便会耗尽,显然是不可取的。

实际上,Storm仅仅采用20字节管理一棵消息树,数据结构如下:

treeId|{64bit}

treeId用于区分不同的消息树(实际上和代码中指定的根节点ID一一对应),{64bit}则用于消息树节非根点异或计算。

每生成一个64位msgid,则与{64bit}异或计算一次,直到该消息确认处理完成后,再与{64bit}异或计算一次。(异或计算结果为新的{64bit}值)

{64bit}==0时,表示消息处理完毕。(一目了然,在次不再证明)

时间: 2024-10-15 15:53:41

Storm消息可靠处理机制的相关文章

Storm的并行度、Grouping策略以及消息可靠处理机制简介

概念: Workers (JVMs): 在一个节点上可以运行一个或多个独立的JVM 进程.一个Topology可以包含一个或多个worker(并行的跑在不同的machine上), 所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology Executors (threads): 在一个worker JVM进程中运行着多个Java线程.一个executor线程可以执行一个或多个tasks.但一般默认每个executor只执行一个task.

storm 消息的可靠处理机制——Ack整个tuple树异或

消息的可靠处理机制 Storm内部通过一种巧妙的异或算法判读每个tuple是否被正确完整的处理. Spout的一个Task创建一个Tuple时,即在Spout的nextTuple()方法中实现从特定数据源读取数据的处理逻辑中,会与Acker进行通信,向Acker发送消息,Acker保存该Tuple对应信息:{:spout-task task-id :val ack-val)}. Bolt在emit一个新的子Tuple时,会保存子Tuple与父Tuple的关系. 在Bolt中进行ack时,会计算出

Storm消息可靠性的保障机制

参考[并发编程网]的Storm官方教程翻译 以WordCountToPology为例: // 构造Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID,new SentenceSpout(), 2)// 指定 Spout ,2 指的是使用2个executor来运行spout .setNumTasks(4);//指定tasks的数量 // 指定 SentenceSpout 向Split

ActiveMQ源码解析(四):聊聊消息的可靠传输机制和事务控制

在消息传递的过程中,某些情况下比如网络闪断.丢包等会导致消息永久性丢失,这时消费者是接收不到消息的,这样就会造成数据不一致的问题.那么我们怎么才能保证消息一定能发送给消费者呢?怎么才能避免数据不一致呢?又比如我们发送多条消息,有时候我们期望都发送成功但实际上其中一部分发送成功,另一部分发送失败了,没达到我们的预期效果,那么我们怎么解决这个问题呢? 前一种问题我们通过消息确认机制来解决,它分为几种模式,需要在创建session时指定是否开启事务和确认模式,像下面这样: <span style=&quo

Android消息推送机制

1.推送方式基础知识: 当我们开发需要和服务器交互的应用程序时,基本上都需要获取服务器端的数据,比如<地震应急通>就需要及时获取服务器上最新的地震信息.要获取服务器 上不定时更新的信息一般来说有两种方法,第一种是客户端使用Pull(拉)的方式,隔一段时间就去服务器上获取信息,看是否有更新的信息出现.第二种就是 服务器使用Push(推送)的方式,当服务器端有新信息了,则把最新的信息Push到客户端上.? 虽然Pull和Push两种方式都能实现获取服务器端更新信息的功能,但是明显来说Push is

Storm ack和fail机制再论

之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下   首先开启storm tracker机制的前提是, 1. 在spout emit tuple的时候,要加上第3个参数messageid 2. 在配置中acker数目至少为1 3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路   流程, 1. 当tuple具有messageid时,spout会把该tuple加到pending list里面    并发消息给acker,通知acker开

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

【RabbitMQ】如何进行消息可靠投递【上篇】

说明 前几天,突然发生线上报警,钉钉连发了好几条消息,一看是RabbitMQ相关的消息,心头一紧,难道翻车了? [橙色报警]?应用[xxx]在[08-15?16:36:04]发生[错误日志异常],alertId=[xxx].由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发. 应用xxx?可能原因如下 服务名为: ?异常为:org.springframework.amqp.rabbit.lis

IOS OS X 中集中消息的传递机制

1 KVO (key-value Observing) 是提供对象属性被改变是的通知机制.KVO的实现实在Foundation中,很多基于 Foundation 的框架都依赖与它.如果只对某一个对象的值的改变感兴趣的话.就可以使用KVO消息传递.满足KVO的前提条件:1接受者(接受对象改变的通知的对象)需要知道发送者(值会改变的对象):2,接受者需要知道发送者的生命周期,因为它需要在发送者被销毁前注销观察者身份.如果这两个要求都符合的话,这个消息传递机制可以一对多(多个观察者可以注册同一个对象的