一、Bug背景
业务上线后,发现Kafka的消费者一直在重复拉取同一批数据。被消费的topic配置了10个分区,只有每个分区的第一批数据能够出队,并且无限循环。
因测试环境数据量比较小,一直无法复现问题。只能查生产环境的日志排查。
二、解决问题的思路
初步猜测数据被消费之后,没有正常commit到Kafka服务端,导致Topic分区offset再消费完毕后未进行更新,下次取数据时还是从老offset开始取数据。
- 检查客户端配置
自动提交已开启(enable.auto.commit的默认配置为true), 自动提交时间为5s(offsets.commit.timeout.ms的默认配置为5000)。
既然默认已开启自动提交,按道理offset应该会被更新吖。然而并没有,Why? - 添加手动提交代码
每批数据处理完毕后,执行 consumer.commitSync();
然并卵,why? - 只能添加日志埋点。
发现消费者程序每批取出了6000多条数据,每批处理时间长达5分钟。
另外一条关键日志,info级别,在每批数据处理完毕后打印出来:o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483646 dead.
可以看出,等到每批数据处理完毕时,消费者已经被标记为dead。可以推断出处理超时了! - 查看客户端超时配置
会话超时时间为5s(session.timeout.ms的默认值为5000),而消费者程序处理一批数据竟然要5分钟!
先尝试修改会话超时时间为30分钟,结果提示其余几个超时时间也必须同步修改为合理值。
仔细一想,Kafka作为流式处理系统,目的就是快速响应,把会话时间改为30分钟明显是不合理的。 - 优化消费者程序的性能
尝试优化程序性能,每条数据逐条处理改为成批处理。速度明显提升,但是要6000+条数据在30秒内处理完毕,臣妾还是办不到啊!! - 修改每次拉取的字节数
查看文档,发现消费者每次拉取数据的最大字节数(max.partition.fetch.bytes)为 1048576 Byte,即:1 MB。
1MB可取出6000+条数据,那我改成100KB,岂不是只取出600+条数据?
consumerConfig.put("max.partition.fetch.bytes", 100 * 1024); //100kb
果然奏效!每次只取出600+条数据,加上原先的性能优化,每批数据都控制在10秒内处理完毕。
至此,不再出现日志 Marking the coordinator 2147483646 dead. 数据也不再死循环了。问题至此解决,可以安心下班了。(*^_^*)
时间: 2024-10-13 08:07:37