Storm可靠性实例解析——ack机制

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性

很显然,要做到这个特性,必须要track每个data的去向和结果。Storm是如何做到的呢——acker机制

先概括下acker所参与的工作流程:

  1. Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪;
  2. Bolt在处理Tuple成功或失败后,也会发一个消息通知acker;
  3. acker会找到发射该Tuple的Spout,回调其ack或fail方法。

我们说RichBolt和BasicBolt的区别是后者会自动ack。那么是不是我们只要实现了Spout的ack或fail方法就能看到反馈了呢?

试试在RandomSpout(extends BaseRichSpout )中加入如下代码:

 1 public class RandomSpout extends BaseRichSpout {
 2
 3     private SpoutOutputCollector collector;
 4
 5     private Random rand;
 6
 7     private static String[] sentences = new String[] {"edi:I‘m happy", "marry:I‘m angry", "john:I‘m sad", "ted:I‘m excited", "laden:I‘m dangerous"};
 8
 9     @Override
10     public void open(Map conf, TopologyContext context,
11             SpoutOutputCollector collector) {
12         this.collector = collector;
13         this.rand = new Random();
14     }
15
16     @Override
17     public void nextTuple() {
18         String toSay = sentences[rand.nextInt(sentences.length)];
19         this.collector.emit(new Values(toSay));
20     }
21
22     @Override
23     public void declareOutputFields(OutputFieldsDeclarer declarer) {
24         declarer.declare(new Fields("sentence"));
25     }
26
27 }  

public class RandomSpout extends BaseRichSpout

 1 @Override
 2
 3     public void ack(Object msgId) {
 4
 5         System.err.println("ack " + msgId);
 6
 7     }
 8
 9     @Override
10
11     public void fail(Object msgId) {
12
13         System.err.println("fail " + msgId);
14
15     } 

疑问:重新运行ExclaimBasicTopo,看下结果。并没有任何的ack 和 fail 出现?

分析:原因是,Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!

探讨:我们改下Spout代码,为每个消息加入一个唯一Id。同时,为了方便看结果,加入更多的打印,并且靠sleep减慢发送速度。(只是为了演示!)

 1 public class RandomSpout extends BaseRichSpout {
 2
 3     private SpoutOutputCollector collector;
 4
 5     private Random rand;
 6
 7     private AtomicInteger counter;
 8
 9     private static String[] sentences = new String[] {"edi:I‘m happy", "marry:I‘m angry", "john:I‘m sad", "ted:I‘m excited", "laden:I‘m dangerous"};
10
11     @Override
12
13     public void open(Map conf, TopologyContext context,
14
15             SpoutOutputCollector collector) {
16
17         this.collector = collector;
18
19         this.rand = new Random();
20
21         counter = new AtomicInteger();
22
23     }
24
25     @Override
26
27     public void nextTuple() {
28
29         Utils.sleep(5000);
30
31         String toSay = sentences[rand.nextInt(sentences.length)];
32
33         int msgId = this.counter.getAndIncrement();
34
35         toSay = "["+ msgId + "]"+ toSay; 
36
37         PrintHelper.print("Send " + toSay );
38
39         this.collector.emit(new Values(toSay), msgId);
40
41     }
42
43     @Override
44
45     public void declareOutputFields(OutputFieldsDeclarer declarer) {
46
47         declarer.declare(new Fields("sentence"));
48
49     }
50
51     @Override
52
53     public void ack(Object msgId) {
54
55         PrintHelper.print("ack " + msgId);
56
57     }
58
59     @Override
60
61     public void fail(Object msgId) {
62
63         PrintHelper.print("fail " + msgId);
64
65     }
66
67 }  

PrintHelper类:

 1 public class PrintHelper {
 2
 3     private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");
 4
 5     public static void print(String out){
 6
 7         System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);
 8
 9     }
10
11 }  

同时把PrintBolt里面打印也换成PrintHelper.print打印

看下打印结果:

 1 53:33:891 [Thread-26-spout] Send [0]ted:I‘m excited
 2 53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I‘m excited!
 3 53:38:895 [Thread-26-spout] Send [1]edi:I‘m happy
 4 53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I‘m happy!
 5 53:38:895 [Thread-26-spout] ack 0
 6 53:43:896 [Thread-26-spout] Send [2]edi:I‘m happy
 7 53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I‘m happy!
 8 53:43:896 [Thread-26-spout] ack 1
 9 53:48:896 [Thread-26-spout] Send [3]edi:I‘m happy
10 53:48:896 [Thread-26-spout] ack 2
11 53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I‘m happy!
12 53:53:896 [Thread-26-spout] Send [4]ted:I‘m excited
13 53:53:896 [Thread-26-spout] ack 3
14 53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I‘m excited!
15 53:58:897 [Thread-26-spout] Send [5]laden:I‘m dangerous
16 53:58:897 [Thread-26-spout] ack 4
17 53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I‘m dangerous! 

很明显看到:

  1. 并发度为1的Spout确实是一个线程,并发度为3的Bolt确实是三个线程;
  2. 消息完全处理完成后,确实回调了ack(Object msgId)方法,而且msgId的值,即为我们emit的msgId;
  3. 虽然我们在topology中定义了两个bolt,但实际上ack对于每个tuple只调用了一次;
  4. spout发出tuple后,Bolt很快就完成了,但是ack直到5秒后spout醒来才打印。

Tuple树

  对于Spout创建的Tuple,在topology定义的流水线中经过Bolt处理时,可能会产生一个或多个新的Tuple。源Tuple+新产生的Tuple构成了一个Tuple树。当整棵树被处理完成,才算一个Tuple被完全处理,其中任何一个节点的Tuple处理失败或超时,则整棵树失败。

  超时的值,可以通过定义topology时,conf.setMessageTimeoutSecs方法指定。


Anchor

在我们例子中ExclaimRichBolt用

附注:

 1 public class ExclaimBasicBolt extends BaseBasicBolt {
 2
 3     @Override
 4     public void execute(Tuple tuple, BasicOutputCollector collector) {
 5         //String sentence = tuple.getString(0);
 6         String sentence = (String) tuple.getValue(0);
 7         String out = sentence + "!";
 8         collector.emit(new Values(out));
 9     }
10
11     @Override
12     public void declareOutputFields(OutputFieldsDeclarer declarer) {
13         declarer.declare(new Fields("excl_sentence"));
14     }
15
16 }  

ExclaimBasicBolt 原实现方式

collector.emit(inputTule, new Values(newTupleValue));

发射一个新的tuple。

第一个参数是传入Bolt的tuple,第二个参数是新产生的tuple的value,这种emit的方式,在Storm中称为: "anchor"。


Tuple的ack

  前面我们一直提到acker,看到这里,你应该能猜出acker其实就是Storm里面track一个Tuple保证其一定被处理的功能。acker也是一个component

我们来看看acker的工作流程

1. Spout在初始化时会产生一个tasksId

2. Spout中创建新的Tuple,其id是一个64位的随机数;

3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:

  • Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail
  • 一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。

4. 一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;

5. 该Bolt调用OutputCollector.ack()时,Storm会做如下操作:

  • 将anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0XOR tuple-id-1
  • Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker

6. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。

fail的机制类似,在发现fail后直接回调Spout的fail方法。

——Storm就是通过这个acker的机制来保证数据不丢失。

  回头再看看上面的打印结果,b、c两条得到很好的解释了。那d是为什么呢?

  在最开始时,我曾经提到过,Storm的设计模型中,Spout是源源不断的产生数据的,所以其nextTuple()方法在任何时候不应该被打断。ack,fail 和 nextTuple是在同一个线程中完成的。

  所以,虽然acker发现一个Tuple已经完全处理完成,但是由于Spout线程在Sleep,无法回调。

  在设计中,我们应尽量避免在Spout、Bolt中去Sleep。如果确实需要控制,最好用异步线程来做,例如用异步线程读取数据到队列,再由Spout去取队列中数据。异步线程可以随意控制速度等。

另外,

Storm是否会自动重发失败的Tuple?

这里答案已经很明显了。fail方法如何实现取决于你自己。只有在fail中做了重发机制,才有重发。

注:Trident除外。这是Storm提供的特殊的事务性API,它确实会帮你自动重发的。


Unanchor

  如果我们在Bolt中用OutputCollector.emit()发射一个新的Tuple时,并没有指定输入的Tuple(IBasicBolt的实现类用的是BasicOutPutCollector,其emit方法实际上还是调用OutputCollector.emit(),只不过内部会帮你填上输入的Tuple),那么行为称之为“Unanchor”。

  是否用Unanchor方式取决于你的实现。


调整可靠性

  在某些特定的情况下,你或许想调整Storm的可靠性。例如,你并不关心数据是否丢失,或者你想看看后面是否有某个Bolt拖慢了Spout的速度?

那么,有三种方法可以实现:

  1. 在build topology时,设置acker数目为0,即conf.setNumAckers(0);
  2. 在Spout中,不指定messageId,使得Storm无法追踪;
  3. 在Bolt中,使用Unanchor方式发射新的Tuple。


本文转自Edison徐storm应用系列之——可靠性与ack机制

时间: 2024-10-25 20:12:57

Storm可靠性实例解析——ack机制的相关文章

Storm的BaseBasicBolt源码解析ack机制

我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt.在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack.在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTup

Storm的ack机制

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调

Storm的ack机制在项目应用中的坑

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调

ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据. 因此,通过Ack机制,很容易做到保证所有数据均被处理,一条都不漏. 另外需要注意的,当spout触发fail动作时,不会自动重发失败的tuple,需要spout自己重新获取数据,手动重新再发送一次 ack机制即, spout发送的每一条消

Android开发之UI更新交互机制与实例解析

android开发过程中,经常需要更新UI的状态和文案等.这是就需要对UI进行 更新.在android中更新UI一般有三种方法,handler机制.RunOnUiThread方法以及AsyncTask异步类方法等 本文下面就这三种方法进行了演示和代码实现. a.Handler机制通过使用消息机制来实现 b.RunOnUiThread方法是通过运行UI线程来达到更新UI的目的 c.AsyncTask是异步类,通过异步更新来更新UI 效果图如下:           (1)Java功能实现代码如下:

kafkaspot在ack机制下如何保证内存不溢

新浪微博:intsmaze刘洋洋哥. storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送:如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送. 但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为

xor算法在storm可靠性中的应用

1.先看一下数学中的异或 异或xor是一个数学运算符.它应用于逻辑运算.异或符号为“^”. 异或也叫半加运算,其运算法则相当于不带进位的二进制加法:二进制下用1表示真,0表示假,则异或的运算法则为:0异或0=0,1异或0=1,0异或1=1,1异或1=0(同为0,异为1), 既然相同的对象XOR操作,结果是0,那么有这样一个公式, A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次. 2.storm可靠性的机制 storm中有一个系统级别的组件是acker,acker

RabbitMq + Spring 实现ACK机制

摘要: 理解 Ack,设置为手动 Ack,如何在异常时,进行数据回返,我们再次不理解基础的发送和接受的功能,官网的实例已经很满足学习的要求了,其实在队列的配置中,最复杂的也就是消费者的逻辑,我这边讲解的适用于开发大型网站,对数据的处理要非常的谨慎的,如果是简单学习,不建议看. 概念性解读(Ack的灵活) 首先啊,有的人不是太理解这个Ack是什么,讲的接地气一点,其实就是一个通知,怎么说呢,当我监听消费者,正常情况下,不会出异常,但是如果是出现了异常,甚至是没有获取的异常,那是不是这条数据就会作废

ActiveMQ讯息传送机制以及ACK机制

http://blog.csdn.net/lulongzhou_llz/article/details/42270113 ActiveMQ消息传送机制以及ACK机制详解 AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的. 一. ActiveMQ消息传送机制 Producer客户端使用来发送消息的, Consumer客户端用来消费消息:它们的协同中心就是ActiveMQ br