因需要重启了Kafka集群,重启后发现部分topic出现大量消息积压,检查consumer日志,发现消费的数据竟然是几天前的。
由于平时topic消息基本上无积压,consumer消费的数据都是最新的,明显是consumer在重新消费之前已经消费过的数据。
处理方法:将Kafka topic中consumer已经消费的offset值设置为最大值
步骤如下:
1、从Kafka查询出目前堵塞的topic消息队列中,最大的offset值(其实从Kafka的管理页面上也可以看到这值):
命令:./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 -topic topic_name--time -1
查询结果
topic_name:2:1024
topic_name:1:1024
topic_name:0:1024
2、登录zookeeper,将堵塞的topic中consumer已经消费的offset值设置为最大的offset值
./zkCli.sh -server 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
然后执行如下命令,设置offset值
set /kafka/consumers/instance/offsets/topic_name/0 1024
set /kafka/consumers/instance/offsets/topic_name/1 1024
set /kafka/consumers/instance/offsets/topic_name/2 1024
3、重启所有consumer应用,这样consumer就从最大offset值处开始消费
原因分析:由于在关闭Kafka和zookeeper进程时,使用的是kill -9 PID,导致部分topic offset值未来得及提交,出现消息重复消费的现象。
关闭Kafka和zookeeper,需要按正常方式关闭应用,不能直接使用kill进程的方式。
./kafka-server-stop.sh
./zkServer.sh stop
参考信息:
修改kafka topic的offset几种方法
http://blog.csdn.net/yxgxy270187133/article/details/53666760
Kafka重复消费和丢失数据研究
http://blog.csdn.net/zollty/article/details/53958641
原文地址:http://blog.51cto.com/tryagaintry/2059359