storm文档(9)----消息处理保证机制

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41577125

源地址:http://storm.apache.org/documentation/Guaranteeing-message-processing.html

Storm保证:每条离开spout的消息都可以得到"fullyprocessed"。本文描述了storm如何实现这种保证以及你如何能够从Storm这种可靠性能力中受益。

"fully processed"对消息意味着什么?

离开spout的一个tuple可能触发创建成百上千个基于它的tuples。例如,考虑一下streaming word count topology:

TopologyBuilder builder = newTopologyBuilder();

builder.setSpout( "sentence", newKestrelSpout("kestrel.backtype.com",

22133,

"sentence_queue",

newStringScheme()

)

);

builder.setBolt("split", newSplitSentence(), 10 )

.shufferGrouping("sententces");

builder.setBolt("count", newWordCount(), 20 )

.fieldsGrouping("split", newFields("word"));

上面的topology从Kestrel queue读取句子,将这些句子划分成词组,然后按照前面划分词组时统计的每个词的次数发送每个词。离开spout的某个tuple可能会触发创建很多基于它的tuples:句子中每个单词都会对应一个tuple,同时每个单词的次数也会对应一个tuple。消息的树状结构如下所示:

当tuple树状图产生并且树状图中每条消息都被处理过,Storm就认为离开spout的tuple已经被“fully processed”(完全处理)。可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置

当消息完整处理或者处理失败的时候都发生了什么?

要理解这个问题,需要看一下tuple在离开spout之后的生命周期。作为参考,下面是spout实现的接口(更多接口信息需要查看Javadoc)。

public interface ISpout extendsSerializable {

voidopen(Map conf, TopologyContext context,SpoutOutputCollector collector);

voidclose();

voidnextTuple();

voidack(Object msgId);

voidfail(Object msgId);

}

首先,Storm会通过Spout的nextTuple方法从Spout申请一个tuple。在open方法中,Spout使用此方法提供的SpoutOutputCollector去发射一个tuple到输出streams中去。当发射一个tuple时,Spout会提供一个“message id”,用来后面区分不同的tuple。例如, KestrelSpout从kestrel队列中读取消息,然后在发射时会将Kestrel为消息提供的id作为“message id”。发射一条消息到SpoutOutputCollector,如下所示:

_collector.emit(newValues("field1", "field2", 3), msgId);

然后,这个tuple会发送到消费bolts,同时Storm会跟踪已被创建的消息树状图。如果Storm检测到一个tuple已被“fully processed”, Storm将会原始的Spout task(即发射这个tuple的Spout)上调用ack方法,参数msgId就是这个Spout提供给Storm的“message id”。类似的,如果这个tuple超时了, Storm会在原始的Spout task上调用fail方法。注意, 一个tuple只能被创建它的Spouttask进行acked或者failed。因此,即使一个Spout在集群上正在执行很多tasks,一个tuple也只能被创建它的task进行acked或failed,而其他的task则不行。

再次使用KestrelSpout作为例子,看一下Spout是怎样保证消息处理的。当KestrleSpout从Kestrel 队列中拿出消息后,它将“opens”这个消息。这就意味着消息并不会真正被拿出队列,而是处于等待状态,即需要确认消息已被完整处理。当处于等待状态时,消息不会发送给队列的其他消费者。另外, 如果某个客户端与所有处于等待状态的消息(当然这些消息是提供给这个客户端的)断开连接,那这些消息会被放回到队列中。当消息一旦被打开, Kestrle会为客户端提供消息的数据内容以及一条唯一的id。 KestrelSpout在想SpoutOutputCollector发射tuple时,就需要使用上述id作为“message
id”。稍后, 当KestrleSpout调用ack或者fail方法时,KestrelSpout会发送一条ack或者fail的消息给Kestrel,当然,消息内容需要使用上面提到的“message id”作为区别,等Kestrel接到这条消息后才能确定是将消息真正拿出队列还是将它放回去。

Storm 可靠性API是哪些?

作为用户,想要从Storm的可靠能力中受益,你需要做两件事情。首先, 你需要告诉Storm,在tuples的树状图中,何时创建的新连接。其次, 你需要告诉storm,何时完成的单个tuple的 处理。 做完这些事情, Storm就可以检测tuple的树状图是否已被“fullyprocessed”,然后才可以对相应的spout tuple调用ack或者fail方法。 Storm AP提供可以同时做这两件事的简洁方法。

指定tuple树状图中新连接的方法称为anchoring。 anchoring在你发射新tuple时可以完成。下面将以一个bolt作为例子,这个bolt会将包含一个句子的tuple划分成包含每个词的tuple。

public class SplitSentence extendsBaseRichBolt {

OutputCollector_collector;

publicvoid prepare(Map conf, TopologyContext context, OutputCollector collectors) {

_collector= collector;

}

publicvoid execute(Tuple tuple){

Stringsentence = tuple.getString(0);

for(Stringword:  sentence.split(" ")){

_collector.emit(tuple, new Values(word));

}

_collector.ack(tuple);

}

publicvoid declareOutputFields( OutputFieldDeclarer declarer){

declarer.declare(new Fields(“word”));

}

}

每个单词tuple在调用emit方法时,通过指定输入tuple作为emit方法的第一个参数这种方式,这个单词tuple就被anchored。一旦单词tuple被anchord,如果此tuple在后面处理时failed,那么树状图中根部的spout tuple稍后就会重新进行一次完整的处理。作为对比,可以看一下当单词tuple如下发射时会发生什么:

_collector.emit(new Values(word));

通过这种方式发射单词tuple会使它被anchored。如果tuple在后面处理时failed,根部tuple就不会重新处理。这取决于在topology中所使用容错性保证机制,某些时候可能发射unanchored的tuple更加合适。

一个输出tuple可能会被anchored到多个输入tuple。这在处理流合并或者流聚合时非常有用。 multi-anchored的tuple如果处理失败,则会引起多个tuples在spouts重新处理。multi-anchoring可以通过指定一系列tuples实现,而不是仅仅一个单独的tuple。例如:

List<Tuple> anchors = newArrayList<Tuple>();

anchors.add(tuple1);

anchors.add(tuple2);

_collector.emit( anchors. new Values(1, 2,3));

multi-anchoring会将输出tuple加到多个tuple树状图中。注意,multi-anchoring也可能会打破树状图,并且创建tuple DAGs,就像下面

就像实现树状图一样(前面说的是只能实现树状图,即“tuple tress”主干),Storm的同样实现DAGs。

Anchoring就是如何制定tuple tree----下一个也是最后一个有关Storm可靠性API,就是指定合适完成tuple树中某个tuple的处理。 这通过调用OutputCollector类的ack和fail方法完成。 如果回看SplitSentence例子, 可以看到, 输入tuple是在所有wordtuple发射之后才能被acked。

可以使用OutputCollector类的fail方法直接fail掉tuple树中根部的spout tuple。例如, 可以应用于选择捕捉数据库客户端异常并且显式的fail 输入tuple。通过显式的fail的方式,spout tuple可以在tuple超时之前就重新处理。

每个你处理的tuple都必须acked或者failed。 Storm使用内存跟踪每个tuple,因此如果你没有ack/fail 每个tuple, task 最终会溢出。

大量的bolts采用通用方式读取输入tuple,并发射输入tuple,然后在execute方法末尾会ack这个tuple。这些bolts按类别可以分为过滤器和执行简单的功能。Storm有一个称为BasicBolt的接口,它用来封装这些方式。 SplitSentence的例子可以如下:

public class SplitSentence extends BaseBasicBolt{

publicvoid execute (Tuple tuple, BasicOutputCollector collector){

Stringsentence = tuple.getString(0);

for( Stringword: sentence.split(" ")){

collector.emit( newValues(word));

}

}

publicvoid declareOutputFields( outputFieldsDeclarer declarer) {

declarer.declare(new Fields("word");

}

}

这个实现要比前面的实现简单,并且语义上是相同的。发射给BasicOutputCollector 的tuple会自动anchored到输入tuple,同时当execute方法完成时,输入tuple将同时自动被acked。

相比之下,做聚合或者合并的bolts会延迟ack一个tuple,直到bolts已经从这个tuple所在tuples范围内获得聚合或者合并结果后,才能ack。聚合和合并将通常对它们输出的tuples使用multi-anchor的方式。上述操作均不属于IBasicBolt的简单方式。

怎样使这些应用(需要重新处理tuples)正常工作?

就像通常的软件设计,答案是“it depends”。 Storm 0.7.0 引入了“transactional topologies”特征,使得你在大多数计算下,可以获得一次具有容错性能的完整消息传递语义。 更多有关transactionaltopologies信息请阅读这里

Storm以一种有效的方式实现可靠性?

Storm topology 有一系列特定的“acker”任务,它们可以跟踪每个spouttuple的DAG。当acker确认DAG完整处理了,它会给spout task发送消息,而这个spout task就是创建tuple并等待ack消息的task。你可以在topology配置选项中设置topology中acker tasks数目,具体配置选项为Config.TOPOLOGY_ACKERS.  Storm默认TOPOLOGY_ACKERS是1个task-----一旦你需要处理大量的数据,你需要提高这个值。

理解Storm可靠性具体实现的最好方式就是查看tuples以及tuples DAGs的生命周期。当topology中创建tuple时,无论是在spout还是bolt中,它都会获得一个随机的64 bit 的id。这些ids由ackers用来跟踪tuple DAG中每个spout tuple。

每个tuple都知道它所在tuple树中所有spout tuples的ids。当bolt发射新tuple时,来自tuple‘s anchors中的spout tuple ids就会拷贝到新tuple中去。当一个tuple被acked时,它会发送有关tuple变化信息给相应的acker tasks。特别是,它会告诉acker:在当前spout tuple树中,一个tuple已经处理完毕,同时输出树中新的tuple,当然这个新tuple 是anchored处理完毕的那个tuple。

例如, tuples “D”和“E”是以tuple “C”为基础创建的,这就是当“C”被acked的时候,树状图发生的变化。

当“C”从树中移除时,同一时刻,“D”和“E”会加到树中。树永远不能过早的结束。

还有一些细节需要说一下,即有关Storm如何追踪tuple树的。就像上面已经提到的,你在topology中可能有任意数量的acker tasks。这会引起下面的问题:当topology中一个tuple被acked时候,它是如何知道应该向那个acker task发送信息。

Storm使用模运算hash来映射spouttuple id和acker task。因为每个tuple都会携带它的spout tuple id(无论它在哪个树中,都是一样的),这样tuple就知道了应该和哪个acker tasks通信。

上面是讲了Storm 的acker tasks是如何跟踪每一个spout tuple的,即通过tuple id 和acker task之间的hash映射。

Storm的另外一个细节时: acker tasks是如何跟踪spouttasks的。当spout task发射一个新tuple时,它会发送一个简单的消息通知相应acker,并告诉它负责发送这个spout tuple的task id。然后,当一个acker发现某个树已经结束了,它就会知道是哪个task id发送的结束消息。

acker tasks不是显式的跟踪tuples的树。 对于大规模的tuple 树,例如包含成千上万个节点甚至更多的树, 追踪所有的tuple树会耗尽内存。ackers会采取不同策略: 每个spout tuple只需要一个固定数量的空间(大约20个字节)。这种追踪算法是Storm如何工作的关键并且也是其主要技术突破之一。

acker task中存放了一个spout tuple id和一对值的映射。 第一个值就是创建这个spouttuple的task id,它稍后会用来发送结束消息。第二值就是称为“ack val”的64bit数字。 ack val代表了整个tuple树的状态,无论多大还是多小。它是树中所有tuple ids的简单异或,包括已被才创建或者已被acked的tuple。

acker task当发现一个“ack val”已经变为0, 它就会知道tuple树已经结束了。 因为tuple ids都是64bit的随机数, “ack val”意外变成0的概率极其小。 如果使用数学计算一下就会发现, 每秒钟10k acks, 那么每5000万年才可能出现一次失误。 即使出现失误, 如果tuple在topology中遇到fail时也仅仅会引起数据丢失。

现在你已经知道了可靠性算法,下面浏览一下各种失败情况下Storm怎样避免数据丢失:

l  a tuple isn‘t acked because task died:即因task死掉而造成tuple不能acked; 这种情况下, 引起failed的tuple所在树的根节点的spouttuple ids会因超时而重新处理。

l  acker task dies:因acker task 死掉造成tuple不能acked;这种情况下, 这个acker task跟踪的所有spouttuples都会因超时而重新处理。

l  Spout task dies:因spout task死掉造成tuple不能acked;这种情况下,spout获取数据的数据源应该负责重新处理数据。例如, 向Kestrel和RabbiMQ的队列将在客户端失联的情况下重新把所有等待的数据放回队列中。

如你所看,Storm的可靠性机制是完全分布式的、可伸缩的、并且是容错的。

调优可靠性

Acker task是轻量级的,因此在topology中并不需要很多。 可以使用Storm UI(即组件id“__acker")跟踪他们的性能。如果吞吐量看起来不大对, 可能需要增加ackertasks。

如果可靠性对你而言不是那么重要----即,你不必担心失败情况下会损失tuples-------那么你可以放弃追踪spout tuples的树,从而改善性能。不追踪tuple树的话,可以减少一半的消息传递,因为通常没法送一个tuple就会发送一条ack消息;另外,不追踪tuple树的话,还可以减少每个下游tuple处理需要保存ids,并减少带宽消耗。

三种方式可以放弃使用可靠性:

第一种方式是设置Config.TOPOLOGY_ACKER为0;这种情况下, Storm在spout发送一个tuple之后直接调用ack方法,这样一来,就不会最终tuple树了。

第二种方式是使用消息的基本设置。SpoutOutputCollector.emit方法会发射单个tuple,你可以通过这种方法的设置关闭对tuple的追踪。

第三种方式是,如果topology中tuples下游中某个特定子集处理失败,而你又不关心这个,那你可以将这些特定子集作为unanchored的tuples发送。因为他们没有anchored到任何spout tuples,如果这些tuples没有被acked,也就不会引起任何spout tuples作为失败处理的。

时间: 2024-10-05 04:09:37

storm文档(9)----消息处理保证机制的相关文章

storm文档(7)----基本概念

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41546195 源地址:http://storm.apache.org/documentation/Concepts.html 本文介绍了storm的主要概念,并且给出相关链接供你查看更多信息.本文讨论的概念如下所示: 1.Topologies 2.Streams 3.Spouts 4.Bolts 5.Stream Grouping 6.Reliability 7.Tasks 8

理解Linux文档的默认安全机制、隐藏属性、特殊权限,妈妈在也不用担心你从删库到跑路!!!

写在前面 前面的章节 详解Linux文档属性.拥有者.群组.权限.差异,介绍了文档的基本权限,包括读写执行(r,w,x),还有若干的属性,包括是否为目录(d).文件(-).链接文件(l).拥有者.所属群组.容量大小(字节数).最后修改时间等等,可以通过chown.chgrp.chmod来变更这些属性和权限. 默认安全机制 首先,我们使用管理员root账户分别创建文件file001和目录dir001: 从上图可以看到, 文件file001的默认权限为rw-r--r--,即拥有者可读写,同群组下账户

storm文档(6)----storm手册目录

源地址:http://storm.apache.org/documentation/Documentation.html storm基础知识 l  Javadoc l  概念 l  配置 l  保证消息处理机制 l  容错性能 l  命令行客户端 l  理解storm topology并行机制 l  FAQ trident 对storm来说,trident是可选接口.它提供了准确的一次性处理.事务性数据存储保持以及一系列通用数据流分析操作. l  Trident指导-----基本概念及浏览 l 

storm 文档(3)----入门指导

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41517897 源地址:http://storm.apache.org/documentation/Tutorial.html 本文主要讲述了如何创建Storm topologies以及如何将它们部署在Storm集群中.Java是主要使用的语言,但是依然使用少量Python例子证明了Storm的多语言特性. 初步配置: 本文使用的例子源自storm-start项目.建议你复制这个

storm文档(10)----容错

源地址:http://storm.apache.org/documentation/Fault-tolerance.html 本文主要介绍Storm作为容错系统的设计细节. 当worker死掉时会发生什么? 当worker死掉时, supervisor将重启它. 如果worker启动总是失败,则worker就不能发送心跳消息给Nimbus, 那Nimbus就会重新在另一台machine上启动它. 当node死掉时会发生什么? 分配到这个节点的所有tasks都会超时,那Nimbus会将这些task

Storm文档详解

1.Storm基础概念 1.1.什么是storm? Apache Storm is a free and open source distributed realtime computation system. Storm是免费开源的分布式实时计算系统 实时和离线的区别: 1 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.***任务调度 2 流式计算:数据实时产生.

storm文档(12)----自己搭建storm集群

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41802543 ubuntu下  storm  安装步骤 安装storm之前首先需要安装一些依赖库: zookeeper.JDK 6.python2.6.6.jzmq.zeromq 这些库所需要的依赖库不再一一笔述. 以下为具体安装过程: 一.安装JDK zookeeper要求安装JDK 6或更高版本( 目前最新稳定版本为JDK8), 但是由于storm要求安装JDK 6, 因此

storm文档(5)----创建storm新项目

源地址:http://storm.apache.org/documentation/Creating-a-new-Storm-project.html 本文主要介绍如何配置开发的storm项目.步骤如下: 1.将storm jar包加到classpath中 2.如果使用多语言特性,将多语言实现的目录加到classpath中 下面跟着一块看一下在Eclipse环境中如何配置storm-starter项目. 将Storm jars包加到classpath中 你需要将storm jars包加到你的cl

storm文档(11)----搭建storm集群

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41684717 源地址:http://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html 本文叙述了storm集群搭建和运行步骤.如果你打算在AWS上进行的话,可以使用storm-deploy项目.storm-deploy在EC2上完全自动进行下载.配置.以及storm集群的安装等步骤.它也为你配置了Gan