kafkaspot在ack机制下如何保证内存不溢

新浪微博:intsmaze刘洋洋哥。

storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。

但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为key。

public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;
    // key:messageId,Data
    private HashMap<String, String> waitAck = new HashMap<String, String>();
    private SpoutOutputCollector collector;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void nextTuple() {
        String sentence = "the cow jumped over the moon";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
        //指定messageId,开启ackfail机制
        collector.emit(new Values(sentence), messageId);
    }
    @Override
    public void ack(Object msgId) {
        System.out.println("消息处理成功:" + msgId);
        System.out.println("删除缓存中的数据...");
        waitAck.remove(msgId);
    }
    @Override
    public void fail(Object msgId) {
        System.out.println("消息处理失败:" + msgId);
        System.out.println("重新发送失败的信息...");
        //重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游
        collector.emit(new Values(waitAck.get(msgId)),msgId);
    }
}

那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢?如果这样,如果消息处理不断失败,不断重发,消息不断积累在kafkaspout节点上,kafkaspout端会不就会出现内存溢出?

其实并没有,回想kafka的原理,Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。也就是说,kafkaspot在消费kafka的数据是,通过offset读取到消息并发送给bolt后,kafkaspot只是保存者当前的offset值。

当失败或成功根据msgId查询offset值,然后再去kafka消费该数据来确保消息的重新发送。

那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?

其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。

kafkaspot中发送数据的代码

collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

可以看到msgID里面包装了offset参数。

它不缓存已经发送出去的数据信息。

当他接收到来至bolt的响应后,会从接收到的msgId中得到offset。以下是从源码中折取的关键代码:

public void ack(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
          m.ack(id.offset);
     }
 }
 m.ack(id.offset);
 public void ack(Long offset) {
     _pending.remove(offset);//处理成功移除offset
     numberAcked++;
 }

 public void fail(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
         m.fail(id.offset);
      }
  }
  m.fail(id.offset);
  public void fail(Long offset) {
     failed.add(offset);//处理失败添加offset
        numberFailed++;
   }

    SortedSet<Long> _pending = new TreeSet<Long>();
    SortedSet<Long> failed = new TreeSet<Long>();

关于kafkaspot的源码解析大家可以看这边博客:http://www.cnblogs.com/cruze/p/4241181.html

源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。

时间: 2024-12-25 20:15:03

kafkaspot在ack机制下如何保证内存不溢的相关文章

Storm的ack机制

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调

Storm的ack机制在项目应用中的坑

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调

Linux下TCP延迟确认(Delayed Ack)机制导致的时延问题分析

版权声明:本文由潘安群原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/105 来源:腾云阁 https://www.qcloud.com/community 案例一:同事随手写个压力测试程序,其实现逻辑为:每秒钟先连续发N个132字节的包,然后连续收N个由后台服务回显回来的132字节包.其代码简化如下: char sndBuf[132]; char rcvBuf[132]; while (1) { for (int i

Linux下TCP延迟重庆时 时 彩源码下载确认(Delayed Ack)机制导致的时延问题分析

重庆时 时 彩下载联系方式:QQ:2747044651 网址在实际测试中发现,当N大于等于3的情况,第2秒之后,每次第三个recv调用,总会阻塞40毫秒左右,但在分析Server端日志时,发现所有请求在Server端处理时耗均在2ms以下. 当时的具体定位过程如下:先试图用strace跟踪客户端进程,但奇怪的是:一旦strace attach上进程,所有收发又都正常,不会有阻塞现象,一旦退出strace,问题重现.经同事提醒,很可能是strace改变了程序或系统的某些东西(这个问题现在也还没搞清

一文讲透微服务下如何保证事务的一致性

原文地址:梁桂钊的博客 博客地址:http://blog.720ui.com 欢迎关注公众号:「服务端思维」.一群同频者,一起成长,一起精进,打破认知的局限性. 从本地事务到分布式事务的演变 什么是事务?回答这个问题之前,我们先来看一个经典的场景:支付宝等交易平台的转账.假设小明需要用支付宝给小红转账 100000 元,此时,小明帐号会少 100000 元,而小红帐号会多 100000 元.如果在转账过程中系统崩溃了,小明帐号少 100000 元,而小红帐号金额不变,就会出大问题,因此这个时候我

Storm可靠性实例解析——ack机制

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性 很显然,要做到这个特性,必须要track每个data的去向和结果.Storm是如何做到的呢——acker机制. 先概括下acker所参与的工作流程: Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪: Bolt在处理Tuple成功或失败后,也会发一个消息通知acker: acker会找到发射该Tuple的Spout,回调其ack或fail方法. 我们说RichBolt和Basi

Linux 下增大tomcat内存

我的服务器的配置: # OS specific support.  $var _must_ be set to either true or false. JAVA_OPTS="-Xms1024m -Xmx4096m -Xss1024K -XX:PermSize=512m -XX:MaxPermSize=2048m" 正文: 常见的内存溢出有以下两种: java.lang.OutOfMemoryError: PermGen space java.lang.OutOfMemoryErro

Redis系列--内存淘汰机制(含单机版内存优化建议)

https://blog.csdn.net/Jack__Frost/article/details/72478400?locationNum=13&fps=1 每台redis的服务器的内存都是有限的,而且也不是所有的内存都用来存储信息.而且redis的实现并没有在内存这块做太多的优化,所以实现者为了防止内存过于饱和,采取了一些措施来管控内存. 文章结构:(1)内存策略:(2)内存释放机制原理:(3)项目中如何合理应用淘汰策略:(4)单机版Redis内存优化注意点. 一.内存策略:先来吃份官方文档

ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据. 因此,通过Ack机制,很容易做到保证所有数据均被处理,一条都不漏. 另外需要注意的,当spout触发fail动作时,不会自动重发失败的tuple,需要spout自己重新获取数据,手动重新再发送一次 ack机制即, spout发送的每一条消