【Kafka踩坑系列之二】无限循环消费数据

一、Bug背景

业务上线后,发现Kafka的消费者一直在重复拉取同一批数据。被消费的topic配置了10个分区,只有每个分区的第一批数据能够出队,并且无限循环。

因测试环境数据量比较小,一直无法复现问题。只能查生产环境的日志排查。

二、解决问题的思路

      初步猜测数据被消费之后,没有正常commit到Kafka服务端,导致Topic分区offset再消费完毕后未进行更新,下次取数据时还是从老offset开始取数据。

    1. 检查客户端配置
             自动提交已开启(enable.auto.commit的默认配置为true), 自动提交时间为5s(offsets.commit.timeout.ms的默认配置为5000)。
             既然默认已开启自动提交,按道理offset应该会被更新吖。然而并没有,Why?
    2. 添加手动提交代码
             每批数据处理完毕后,执行 consumer.commitSync(); 
             然并卵,why?
    3. 只能添加日志埋点。
             发现消费者程序每批取出了6000多条数据,每批处理时间长达5分钟。
             另外一条关键日志,info级别,在每批数据处理完毕后打印出来:o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483646 dead.
             可以看出,等到每批数据处理完毕时,消费者已经被标记为dead。可以推断出处理超时了!
    4. 查看客户端超时配置
            会话超时时间为5s(session.timeout.ms的默认值为5000),而消费者程序处理一批数据竟然要5分钟!
            先尝试修改会话超时时间为30分钟,结果提示其余几个超时时间也必须同步修改为合理值。
            仔细一想,Kafka作为流式处理系统,目的就是快速响应,把会话时间改为30分钟明显是不合理的。
    5. 优化消费者程序的性能
            尝试优化程序性能,每条数据逐条处理改为成批处理。速度明显提升,但是要6000+条数据在30秒内处理完毕,臣妾还是办不到啊!!
    6. 修改每次拉取的字节数
            查看文档,发现消费者每次拉取数据的最大字节数(max.partition.fetch.bytes)为 1048576 Byte,即:1 MB。
            1MB可取出6000+条数据,那我改成100KB,岂不是只取出600+条数据?  
            consumerConfig.put("max.partition.fetch.bytes", 100 * 1024); //100kb
            果然奏效!每次只取出600+条数据,加上原先的性能优化,每批数据都控制在10秒内处理完毕。
            至此,不再出现日志 Marking the coordinator 2147483646 dead. 数据也不再死循环了。

      问题至此解决,可以安心下班了。(*^_^*)

时间: 2024-07-31 13:01:10

【Kafka踩坑系列之二】无限循环消费数据的相关文章

【Kafka踩坑系列之一】消费者拉不出数据

一.Bug背景 因业务需要,我们部署了两个Kafka集群.Kafka集群A的版本号为:0.11.0.1,Kafka集群B的版本号为0.9.0.1. 因两个Kafka集群的版本号不一致,尝试了多种解决方案,发现总有一个集群出不来数据,无法互相兼容. 二.Kafka的客户端版本号必须与服务端版本号一致 客户端v0.11.0.1的Maven配置 <dependency>     <groupId>org.apache.kafka</groupId>     <artif

人工智能(AI)库TensorFlow 踩坑日记之二

上次 踩坑日志之一 遗留的问题终于解决了,所以作者(也就是我)终于有脸出来写第二篇了. 首先还是贴上 卷积算法的示例代码地址 :https://github.com/tensorflow/models   这个库里面主要是一些常用的模型用tensorflow实现之后的代码.其中我用的是 models/tree/master/tutorials/image/cifar10 这个示例,上一篇也大致讲过了. 关于上次遇到问题是: 虽然训练了很多次,但是每次实际去用时都是相同的结果.这个问题主要原因是

quick cocos2dx 3.3rc1 踩坑日记(二)------ 声音引擎

昨天在群里有人说setSoundsVolume和setMusicVolume设置声音大小无效...我记得cocos2dx是没实现内容的,就贸贸然回答说,是这你的没用,但群里有人说是有用的,我就去虚心请教,大神人很好,回答了我很多问题,告诉我说实现了,而且是针对不同的平台.实现的文件位置,如下图: 我先测试setMusicVolume,在win32下setMusicVolume是无效的,在Android上是可以的. 但在测试setSoundsVolume时,就遇见问题了,当调用audio.play

cocos2dx 3.3rc0 踩坑日记(二)------ HttpClient 的使用

昨天写了Curl的简单使用,遇见了几个坑,今天用HttpClient写样例还是遇见了坑~ 按照网上的资料写总是遇见问题...应该是版本不一样!!!下面我来说说HttpClient的使用方法. 首先要引入头文件和命名空间 #include "network/HttpClient.h" using namespace cocos2d::network; 但是这样还不行,编译的话会报错,无法解析的外部符号....... 出现这个需要添加network库,添加库的方式和以前有点不同.3.x有些

H5踩坑系列(一)

提到移动端适配,首先心里可能会问,我们为什么要做移动端的适配,怎么去做移动端端的适配 我们为什么要进行移动端的适配 首先一个页面在pc上边打开,是正常显示的,但是我们用手机打开的时候,由于手机的屏幕尺寸并不能完整的吧页面全部显示出来,就算是手动进行缩放也会出现比如说滚动条,页面布局错乱等等各种五花八门的问题,对于用户的体验非常的不好 于是乎就有了移动端的适配, 移动端适配的目的是在不同尺寸的设备上,页面达到合理的展示(自适应)或者说是能够保持统一效果. 在我们了解移动端适配之前 我们首先要先了解

ReactNative踩坑系列--构建失败

初始化项目后执行react-native run-ios,构建失败: ** BUILD FAILED ** The following commands produced analyzer issues: Analyze /Users/lcz/workspace/APP/temp/node_modules/react-native/ReactCommon/yoga/yoga/YGNodeList.c Analyze /Users/lcz/workspace/APP/temp/node_modul

新人跳坑系列《二十》文本溢出

分享者:toBeMN 文本溢出是一个非常常见的问题,尤其是在手机端上<ignore_js_op> 这是理想中的状态,但是对于长标题来说,可能就要崩溃了 轻者换行<ignore_js_op> 重者样式崩坏<ignore_js_op> 当一行文字超过了限定范围,就会出现换行等现象,这就叫文本溢出该如何解决文本溢出呢,其实只要短短一块css代码即可<ignore_js_op> 现在界面好看了吧(至少从我的审美出发还可以)<ignore_js_op> 超

JavaScript日常会跳的坑系列(二)

1.Number()将部分非数字类型转换为0 强制转换为数值类型函数: parseFloat.parseInt 优点:对非数值类型统一返回NaN 缺点:会将一部分符合数值类型的字符串也识别为数值 parseFloat("1")//1 parseFloat("1s")//1,不是想要的 parseFloat(null)//NaN parseFloat(undefined)//NaN parseFloat("")//NaN Number 优点:对于字

bootstrap之js插件踩坑系列

<html> <head>     <link rel="stylesheet" href="http://cdn.bootcss.com/bootstrap/3.3.0/css/bootstrap.min.css">     <!-- 可选的Bootstrap主题文件(一般不用引入) -->     <link rel="stylesheet" href="http://cdn.b