官方链接:
http://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html
What does it mean for a message to be “fully processed”?
A tuple coming off a spout can trigger thousands of tuples to be created based on it. Consider, for example, the streaming word count topology:
java TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22133, "sentence_queue", new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word"));
一个Spout产生的tuple可触发及创建数以千计的tuples,举例如下, 这个topology从Queue读到一行记录, 按空格分组然后将相同的words进行计数. 一个由spout创建的tuple可以产生并触发多个tuples, tuple用于每行中的单词用于计数, 一个消息树如下:
This topology reads sentences off of a Kestrel queue, splits the sentences into its constituent words, and then emits for each word the number of times it has seen that word before. A tuple coming off the spout triggers many tuples being created based on it: a tuple for each word in the sentence and a tuple for the updated count for each word. The tree of messages looks something like this:
Storm considers a tuple coming off a spout “fully processed” when the tuple tree has been exhausted and every message in the tree has been processed. A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. This timeout can be configured on a topology-specific basis using the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.
Storm 认为一个Tuple被消费了,当元组树已经用尽,树中的每个消息已被处理。元组被认为是失败的消息时,它的树不能得到充分的指定超时时间内进行处理。此超时可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置,默认为30秒
What happens if a message is fully processed or fails to be fully processed?
To understand this question, let’s take a look at the lifecycle of a tuple coming off of a spout. For reference, here is the interface that spouts implement (see the Javadoc for more information):
为了理解这个问题. 我们来看一下tuple的生命周期, 从spout开始
java public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
First, Storm requests a tuple from the Spout
by calling the nextTuple
method on theSpout
. The Spout
uses the SpoutOutputCollector
provided in the open
method to emit a tuple to one of its output streams. When emitting a tuple, the Spout
provides a “message id” that will be used to identify the tuple later. For example, theKestrelSpout
reads a message off of the kestrel queue and emits as the “message id” the id provided by Kestrel for the message. Emitting a message to theSpoutOutputCollector
looks like this:
首先, storm调用nextTuple()方法从spout请求一个tuple, Spout使用提供开发的方法来发出一个元组到它的输出流的一个SpoutOutputCollector。当发出一个元组,Spout提供了一个“消息ID”,将用于以后的识别元组。例如,theKestrelSpout从队列读取消息,发出的“消息ID”为队列提供的ID。发信息给theSpoutOutputCollector看起来像这样:
java _collector.emit(new Values("field1", "field2", 3) , msgId);
Next, the tuple gets sent to consuming bolts and Storm takes care of tracking the tree of messages that is created. If Storm detects that a tuple is fully processed, Storm will call the ack
method on the originating Spout
task with the message id that theSpout
provided to Storm. Likewise, if the tuple times-out Storm will call the fail
method on the Spout
. Note that a tuple will be acked or failed by the exact same Spout
task that created it. So if a Spout
is executing as many tasks across the cluster, a tuple won’t be acked or failed by a different task than the one that created it.
接下来.tuple被发送到消费的bolt, storm来跟踪被创建的消息树, 如果storm发现tuple被完全处理了, storm将会调用ack方法通知spout任务messageid, 如果tuples timeout时间到了,storm将会调用fail方法, 注意, tuple的成功ack失败fail信息只会由创建它的spout接收, 所以如果一个Spout正在执行许多个task, Tuple就无法被其他task所ack或者fail
Let’s use KestrelSpout
again to see what a Spout
needs to do to guarantee message processing. When KestrelSpout
takes a message off the Kestrel queue, it “opens” the message. This means the message is not actually taken off the queue yet, but instead placed in a “pending” state waiting for acknowledgement that the message is completed. While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue. When a message is opened, Kestrel provides the client with the data for the message as well as a unique id for the message. TheKestrelSpout
uses that exact id as the “message id” for the tuple when emitting the tuple to the SpoutOutputCollector
. Sometime later on, when ack
or fail
are called on the KestrelSpout
, the KestrelSpout
sends an ack or fail message to Kestrel with the message id to take the message off the queue or have it put back on.
让我们用KestrelSpout再次看到一个Spout需要做,以保证信息的处理。当KestrelSpout将需要的信息从队列取出, ,它“打开”的消息。这意味着该消息实际上并没从队列中被删除,而是放置在“待定”状态,等待该消息已完成确认。而在"挂起"状态,消息将不会被发送到其他消费者的队列。此外,如果客户端断开连接被放在该客户端的所有未决消息回到了队列中。当打开一个消息,Kestrel与提供该消息的数据以及一个独特的ID为消息的客户端。KestrelSpout用特殊id作为Tuple的messageID当tuple被SpoutOutputCollector提交。一段时间以后,当ACK或失败都呼吁KestrelSpout的KestrelSpout发送一个ACK或失败消息,Kesrel的消息ID把消息从队列中清除,或把它放回去。
What is Storm’s reliability API?
There’s two things you have to do as a user to benefit from Storm’s reliability capabilities. First, you need to tell Storm whenever you’re creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm’s API provides a concise way of doing both of these tasks.
作为使用者, 你有两件事情来优化Storm的可靠性, 第一, 你需要告诉Storm当你在Tuple Tree上创建一个新的Tuple link, 第二, 你虚高告诉Storm 当你完成了一个Tuple的处理, 当这两件事情做好, Storm就可以发现TupleTree被完全处理了, 也就可以适时的告知Spout消息的ack/fail状态
Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple. Let’s use the following bolt as an example. This bolt splits a tuple containing a sentence into a tuple for each word:
在Tuple树上指定一个link被称作锚定, 当你提交一个新的Tuple时, 一个锚定就已完成,
```java public class SplitSentence extends BaseRichBolt { OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple, new Values(word)); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ```
Each word tuple is anchored by specifying the input tuple as the first argument toemit
. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream. In contrast, let’s look at what happens if the word tuple is emitted like this:
每个word tuple被input tuple的第一个param所绑定, 当它被绑定候, Spout tuple将会重发当下游处理超时, 相反的, 我们看下当emit之后会发生什么
java _collector.emit(new Values(word));
Emitting the word tuple this way causes it to be unanchored. If the tuple fails be processed downstream, the root tuple will not be replayed. Depending on the fault-tolerance guarantees you need in your topology, sometimes it’s appropriate to emit an unanchored tuple.
发出word tuple被称为解绑, 当tuple未被下游处理, 根tuple将不会被重试. 取决于你的topology所需要的失败容忍保障. 有时提交一个解绑的tuple是合适的
An output tuple can be anchored to more than one input tuple. This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts. Multi-anchoring is done by specifying a list of tuples rather than just a single tuple. For example:
一个输出tuple可以和多个输入tuple进行绑定, 它的作用在于将下游的流聚合或合并. 一个多绑定的tuple失败将会导致多个spout重发, 多绑定完成也需要list的tuples而不是单个tuples
java List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values(1, 2, 3));
Multi-anchoring adds the output tuple into multiple tuple trees. Note that it’s also possible for multi-anchoring to break the tree structure and create tuple DAGs, like so:
多重锚定会将被锚定的消息加到多棵tuple tree上。
注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(有向无环图),如图2所示:
Storm’s implementation works for DAGs as well as trees (pre-release it only worked for trees, and the name “tuple tree” stuck).
Storm的实现可以像处理树那样来处理DAGs。
Anchoring is how you specify the tuple tree – the next and final piece to Storm’s reliability API is specifying when you’ve finished processing an individual tuple in the tuple tree. This is done by using the ack
and fail
methods on the OutputCollector
. If you look back at the SplitSentence
example, you can see that the input tuple is acked after all the word tuples are emitted.
锚定表明了如何将一个消息加入到指定的tuple tree中,高可靠处理API的接下来部分将向您描述当处理完tuple tree中一个单独的消息时我们该做些什么。这些是通过OutputCollector 的ack和fail方法来实现的。回头看一下例子SplitSentence,可以发现当所有的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。
You can use the fail
method on the OutputCollector
to immediately fail the spout tuple at the root of the tuple tree. For example, your application may choose to catch an exception from a database client and explicitly fail the input tuple. By failing the tuple explicitly, the spout tuple can be replayed faster than if you waited for the tuple to time-out.
可以使用fail方法当出现异常并且被catch之后, 这样显示的fail会快速让spout进行重发而不是等待timeout
Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don’t ack/fail every tuple, the task will eventually run out of memory.
每个被处理的消息必须表明成功或失败(acked 或者failed)。Storm是使用内存来跟踪每个消息的处理情况的,如果被处理的消息没有应答的话,迟早内存会被耗尽!
A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute
method. These bolts fall into the categories of filters and simple functions. Storm has an interface called BasicBolt
that encapsulates this pattern for you. The SplitSentence
example can be written as aBasicBolt
like follows:
很多bolt遵循特定的处理流程: 读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。一般的过滤器(filter)或者是简单的处理功能都是这类的应用。Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可以使用BasicBolt来重写:
```java public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(“ “)) {
collector.emit(new Values(word)); }
}
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ```
This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to BasicOutputCollector
are automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.
使用这种方式,代码比之前稍微简单了一些,但是实现的功能是一样的。发送到BasicOutputCollector的消息会被自动的锚定到输入消息,并且,当execute执行完毕的时候,会自动的应答输入消息。
In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Aggregations and joins will commonly multi-anchor their output tuples as well. These things fall outside the simpler pattern of IBasicBolt
.
很多情况下,一个消息需要延迟应答,例如聚合或者是join。只有根据一组输入消息得到一个结果之后,才会应答之前所有的输入消息。并且聚合和join大部分时候对输出消息都是多重锚定。然而,这些特性不是IBasicBolt所能处理的。
How do I make my applications work correctly given that tuples can be replayed?
As always in software design, the answer is “it depends.” Storm 0.7.0 introduced the “transactional topologies” feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.
How does Storm implement reliability in an efficient way?
A Storm topology has a set of special “acker” tasks that track the DAG of tuples for every spout tuple. When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. You can set the number of acker tasks for a topology in the topology configuration usingConfig.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task – you will need to increase this number for topologies processing large amounts of messages.
Storm 系统中有一组叫做“acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。每当发现一个DAG被完全处理,它就向创建这个根消息的spout任务发送一个信号。拓扑中acker任务的并行度可以通过配置参数Config.TOPOLOGY_ACKERS来设置。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提高acker任务的并发度
The best way to understand Storm’s reliability implementation is to look at the lifecycle of tuples and tuple DAGs. When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.
为了理解Storm可靠性处理机制,我们从研究一个消息的生命周期和tuple tree的管理入手。当一个消息被创建的时候(无论是在spout还是bolt中),系统都为该消息分配一个64bit的随机值作为id。这些随机的id是acker用来跟踪由spout消息派生出来的tuple tree的。
Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple’s anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker “I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me”.
每个消息都知道它所在的tuple tree对应的根消息的id。每当bolt新生成一个消息,对应tuple tree中的根消息的messageId就拷贝到这个消息中。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息已经处理完毕,但是我派生出了一些新的消息,帮忙跟踪一下吧。
For example, if tuples “D” and “E” were created based on tuple “C”, here’s how the tuple tree changes when “C” is acked:
Since “C” is removed from the tree at the same time that “D” and “E” are added to it, the tree can never be prematurely completed.
因为在C被从树中移除的同时D和E会被加入到tuple tree中,因此tuple tree不会被过早的认为已完全处理。
There are a few more details to how Storm tracks tuple trees. As mentioned already, you can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information?
关于Storm如何跟踪tuple tree,我们再深入的探讨一下。前面说过系统中可以有任意个数的acker,那么,每当一个消息被创建或应答的时候,它怎么知道应该通知哪个acker呢?
Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.
系统使用一种哈希算法来根据spout消息的messageId确定由哪个acker跟踪此消息派生出来的tuple tree。因为每个消息都知道与之对应的根消息的messageId,因此它知道应该与哪个acker通信。
Another detail of Storm is how the acker tasks track which spout tasks are responsible for each spout tuple they’re tracking. When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.
当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会创建一个新的tuple tree。当acker发现这棵树被完全处理之后,他就会通知对应的spout任务。
Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs.
tuple是如何被跟踪的呢?系统中有成千上万的消息,如果为每个spout发送的消息都构建一棵树的话,很快内存就会耗尽。所以,必须采用不同的策略来跟踪每个消息。由于使用了新的跟踪算法,Storm只需要固定的内存(大约20字节)就可以跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。
An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the “ack val”. The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree.
acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为“ack val”, 它是树中所有消息的随机id的异或结果。ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。
When an acker task sees that an “ack val” has become 0, then it knows that the tuple tree is completed. Since tuple ids are random 64 bit numbers, the chances of an “ack val” accidentally becoming 0 is extremely small. If you work the math, at 10K acks per second, it will take 50,000,000 years until a mistake is made. And even then, it will only cause data loss if that tuple happens to fail in the topology.
每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了。因为消息的随机ID是一个64bit的值,因此ack val在树处理完之前被置为0的概率非常小。假设你每秒钟发送一万个消息,从概率上说,至少需要50,000,000年才会有机会发生一次错误。即使如此,也只有在这个消息确实处理失败的情况下才会有数据的丢失!
Now that you understand the reliability algorithm, let’s go over all the failure cases and see how in each case Storm avoids data loss:
- A tuple isn’t acked because the task died: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
- Acker task dies: In this case all the spout tuples the acker was tracking will time out and be replayed.
- Spout task dies: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.
As you have seen, Storm’s reliability mechanisms are completely distributed, scalable, and fault-tolerant.
我们来看下3中失败的情况来看一下Storm如何避免数据丢失
1. 一个Tuple不会被Ack当task僵死, tuple将会timeout, 被重试
2. acker task僵死, 所有spout tuples被追踪timeout时间
3. spout task死掉, 这时与spout所通讯的数据源负责进行重发, 比如Rabbit MQ会将所有pending状态(not ack)的message回写到queue当client端连接关闭
Tuning reliability
Acker tasks are lightweight, so you don’t need very many of them in a topology. You can track their performance through the Storm UI (component id “__acker”). If the throughput doesn’t look right, you’ll need to add more acker tasks.
Acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在。可以通过Storm UI来观察acker任务的吞吐量,如果看上去吞吐量不够的话,说明需要添加额外的acker。
If reliability isn’t important to you – that is, you don’t care about losing tuples in failure situations – then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there’s an ack message for every tuple in the tuple tree. Additionally, it requires fewer ids to be kept in each downstream tuple, reducing bandwidth usage.
如果你并不要求每个消息必须被处理(你允许在处理过程中丢失一些信息),那么可以关闭消息的可靠处理机制,从而可以获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每个消息不需要应答了)。另外,关闭消息的可靠处理可以减少消息的大小(不需要每个tuple记录它的根id了),从而节省带宽。
There are three ways to remove reliability. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack
method on the spout immediately after the spout emits a tuple. The tuple tree won’t be tracked.
The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in theSpoutOutputCollector.emit
method.
Finally, if you don’t care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they’re not anchored to any spout tuples, they won’t cause any spout tuples to fail if they aren’t acked.
有三种方法可以关系消息的可靠处理机制:
- 1.将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用;
- 2.第二个方法是Spout发送一个消息时,不指定此消息的messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;
- 3.最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在emit方法中不指定输入消息。因为这些子孙消息没有被锚定在任何tuple tree中,因此他们的失败不会引起任何spout重新发送消息。
到现在为止,大家已经理解了Storm的可靠性机制,并且知道了如何选择不同的可靠性级别来满足需求。接下来我们研究一下Storm如何保证在各种情况下确保数据不丢失。
1、任务级失败
因为bolt任务crash引起的消息未被应答。此时,acker中所有与此bolt任务关联的消息都会因为超时而失败,对应spout的fail方法将被调用。
- acker任务失败。如果acker任务本身失败了,它在失败之前持有的所有消息都将会因为超时而失败。Spout的fail方法将被调用。
- Spout任务失败。这种情况下,Spout任务对接的外部设备(如MQ)负责消息的完整性。例如当客户端异常的情况下,kestrel队列会将处于pending状态的所有的消息重新放回到队列中。
2、任务槽(slot) 故障
- worker失败。每个worker中包含数个bolt(或spout)任务。supervisor负责监控这些任务,当worker失败后,supervisor会尝试在本机重启它。
- supervisor失败。supervisor是无状态的,因此supervisor的失败不会影响当前正在运行的任务,只要及时的将它重新启动即可。supervisor不是自举的,需要外部监控来及时重启。
- nimbus失败。nimbus是无状态的,因此nimbus的失败不会影响当前正在运行的任务(nimbus失败时,无法提交新的任务),只要及时的将它重新启动即可。nimbus不是自举的,需要外部监控来及时重启。
3.、集群节点(机器)故障
- storm集群中的节点故障。此时nimbus会将此机器上所有正在运行的任务转移到其他可用的机器上运行。
- zookeeper集群中的节点故障。zookeeper保证少于半数的机器宕机仍可正常运行,及时修复故障机器即可。
参考: http://os.51cto.com/art/201312/422572_1.htm