kafka一直rebalance故障,重复消费

今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重。错误日志如下

08-09 11:01:11 131 pool-7-thread-3 ERROR [] -
commit failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?

分析问题

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms

该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

image.png

如上图,在while循环里,我们会循环调用poll拉取broker中的最新消息。每次拉取后,会有一段处理时长,处理完成后,会进行下一轮poll。引入该配置的用途是,限制两次poll之间的间隔,消息处理逻辑太重,每一条消息处理时间较长,但是在这次poll()到下一轮poll()时间不能超过该配置间隔,协调器会明确地让使用者离开组,并触发新一轮的再平衡。

max.poll.interval.ms默认间隔时间为300s

分析日志

从日志中我们能看到poll量有时能够达到250多条

image.png

一次性拉取250多条消息进行消费,而由于每一条消息都有一定的处理逻辑,根据以往的日志分析,每条消息平均在500ms内就能处理完成。然而,我们今天查到有两条消息处理时间超过了1分钟。

消息处理日志1

08-09 08:50:05 430 pool-7-thread-3 INFO [] - [RestKafkaConsumer] receive message (收到消息,准备过滤,然后处理), topic: member_1.0.0_event ,partition: 0 ,offset: 1504617
08-09 08:50:05 431 pool-7-thread-3 INFO [] - [RestKafkaConsumer]:解析消息成功,准备请求调用!
08-09 08:51:05 801 pool-7-thread-3 INFO [] - [HttpClient]:response code: {"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送没有可用的营销活动--老pos机"},"fullAmountSendRes":{"status":400,"info":"满额送没有可用的营销活动--老pos机"}},"info":"发券流程执
行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event内
容:{"id":36305914,"score":16,"orderPrice":15.9,"payTime":1533775401000,"thirdTransId":"4200000160201808

消息处理日志2

08-09 08:51:32 450 pool-7-thread-3 INFO [] - [RestKafkaConsumer] receive message (收到消息,准备过滤,然后处理), topic: member_1.0.0_event ,partition: 0 ,offset: 1504674
08-09 08:51:32 450 pool-7-thread-3 INFO [] - [RestKafkaConsumer]:解析消息成功,准备请求调用!
08-09 08:52:32 843 pool-7-thread-3 INFO [] - [HttpClient]:response code: {"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送没有可用的营销活动--老pos机"},"fullAmountSendRes":{"status":400,"info":"满额送没有可用的营销活动--老pos机"}},"info":"发券流程执
行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event内
容:{"id":36306061,"score":3,"orderPrice":3.0,"payTime":1533775482000,"thirdTransId":"420000016320180809

我们看到消息消费时间都超过了1分钟。

分析原因

如下是我们消费者处理逻辑(省略部分代码)

 while (isRunning) {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) {

            for (ConsumerRecord<KEY, VALUE> record : records) {
                dealMessage(bizConsumer, record.value());
                try {
                    //records记录全部完成后,才提交
                      consumer.commitSync();
                } catch (CommitFailedException e) {
                      logger.error("commit failed,will break this for loop", e);
                        break;
                }
            }
}

poll()方法该方法轮询返回消息集,调用一次可以获取一批消息。

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结构没办法及时处理完成,会有什么后果呢?服务端约定了和客户端max.poll.interval.ms,两次poll最大间隔。如果客户端处理一批消息花费的时间超过了这个限制时间,服务端可能就会把消费者客户端移除掉,并触发rebalance

拉取偏移量与提交偏移量

kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。

如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。

所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。

解决方案

1.增加max.poll.interval.ms处理时长

kafka消费者默认此间隔时长为300s

max.poll.interval.ms=300

2.设置分区拉取阈值

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用轮询方法poll()。每次轮询后,在处理完这一批消息后,才会继续下一次的轮询。

max.poll.records = 50

3.poll到的消息,处理完一条就提交一条,当出现提交失败时,马上跳出循环,这时候kafka就会进行rebalance,下一次会继续从当前offset进行消费。

 while (isRunning) {
            ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
            if (records != null && records.count() > 0) {

            for (ConsumerRecord<KEY, VALUE> record : records) {
                dealMessage(bizConsumer, record.value());
                try {
                    //records记录全部完成后,才提交
                      consumer.commitSync();
                } catch (CommitFailedException e) {
                      logger.error("commit failed,will break this for loop", e);
                        break;
                }
            }
}

附录 查询日志 某个topic的 partition 的rebalance过程

member_1分区

时间 revoked position revoked committed 时间 assigned
08:53:21 1508667 1508509 08:57:17 1508509
09:16:31 1509187 1508509 09:21:02 1508509
09:23:18 1509323 1508509 09:26:02 1508509
09:35:16 1508509 1508509 09:36:03 1508509
09:36:21 1508509 1508509 09:41:03 1508509
09:42:15 1509323 1508509 09:46:03 1508509
09:47:19 1508509 1508509 09:51:03 1508509
09:55:04 1509323 1509323 09:56:03 1509323
多余消费 被回滚 重复消费 10:01:03 1509323
10:02:20 1510205 1509323 10:06:03 1509323
10:07:29 1509323 1509323 10:08:35 1509323
10:24:43 1509693 1509693 10:25:18 1509693
10:28:38 1510604 1510604 10:35:18 1510604
10:36:37 1511556 1510604 10:40:18 1510604
10:54:26 1511592 1511592 10:54:32 1511592
- - - 10:59:32 1511979
11:01:11 1512178 1512178 11:03:40 1512178
11:04:35 1512245 1512245 11:08:49 1512245
11:12:47 1512407 1512407 11:12:49 1512407

作者:枫叶lhz
链接:https://www.jianshu.com/p/271f88f06eb3
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

原文地址:https://www.cnblogs.com/leochenliang/p/10069477.html

时间: 2024-08-01 14:45:33

kafka一直rebalance故障,重复消费的相关文章

Kafka重复消费和丢失数据研究

Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交. 原因1:强行kill线程,导致消费后的数据,offset没有提交. 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费.例如: try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); }

【troubleshooting】记一次Kafka集群重启导致消息重复消费问题处理记录

因需要重启了Kafka集群,重启后发现部分topic出现大量消息积压,检查consumer日志,发现消费的数据竟然是几天前的.由于平时topic消息基本上无积压,consumer消费的数据都是最新的,明显是consumer在重新消费之前已经消费过的数据. 处理方法:将Kafka topic中consumer已经消费的offset值设置为最大值步骤如下:1.从Kafka查询出目前堵塞的topic消息队列中,最大的offset值(其实从Kafka的管理页面上也可以看到这值):命令:./kafka-r

Kafka消息保证不丢失和重复消费问题

使用同步模式的时候,有3种状态保证消息被安全生产,在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失.还有一种情况可能会丢失消息,就是使用异步模式的时候,当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立即丢弃掉. 在数据生产时避免数据丢失的方法: 只要能避免上述两种情况,那么就可以保证消息不会被丢失.就是说在同步模式的时候,确认机制设置为-1,也就是让消息写入leader和所有的副本.还有,

关于MQ的几件小事(三)如何保证消息不重复消费

1.幂等性 幂等(idempotent.idempotence)是一个数学与计算机学概念,常见于抽象代数中. 在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同.幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数.这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变.例如,"setTrue()"函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现. 简单来说,幂等性就是一个数据

如何保证消息不被重复消费?(如何保证消息消费时的幂等性)

首先就是比如rabbitmq.rocketmq.kafka,都有可能会出现消费重复消费的问题,正常.因为这问题通常不是mq自己保证的,是给你保证的.然后我们挑一个kafka来举个例子,说说怎么重复消费吧. kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧. 但是凡事总

RocketMQ(消息重发、重复消费、事务、消息模式)

RocketMQ基础:https://github.com/apache/rocketmq/tree/rocketmq-all-4.5.1/docs/cn 分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.

程序重启RocketMQ消息重复消费

最近在调试RocketMQ消息发送与消费的Demo时,发现一个问题:只要重启程序,RocketMQ消息就会重复消费. 那么这是什么原因导致的,又该如何解决呢? 经过一番排查,发现程序使用的RocketMQ客户端版本是3.6.2,而测试环境安装的RocketMQ环境的版本是4.1.0.原来是客户端和服务器端版本不一样导致的,消息并没有最终被消费,即没有ACK消息确认,只要程序重启就会重复消费. 解决方案:RocketMQ客户端版本使用与服务器端的同一版本,即4.1.0版本. 划重点:使用Rocke

kafka Consumer分区数与多线程消费topic

单线程消费数据适合在本地跑. 参考文档: http://kafka.apache.org/documentation.html 对于一个topic,可以发送给若干个partitions. partition在创建topic的时候就指定分区的数目. 分区.Offset.消费线程.group.id的关系 1)一组(类)消息通常由某个topic来归类,我们可以把这组消息"分发"给若干个分区(partition),每个分区的消息各不相同: 2)每个分区都维护着他自己的偏移量(Offset),记

kafka生产、存储、消费消息

Kafka架构组成:由producer(消息生产者).consumer(消息消费者).borker(kafka集群的server,负责处理消息读.写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker).topic(消息队列/分类相当于队列,里面有生产者和消费者模型).zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成. kafka里面的消息是有topic来组织的,简单的我们可