kafka 部分问题处理记录

转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7600561.html

一  broker启动后ReplicaFetcherThread OOM

版本:0.8.2.2

错误现象,server启动日志:

WARN [ReplicaFetcherThread-1-21], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: ReplicaFetcherThread-1-21; ReplicaId: 20; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic_a,28] -> PartitionFetchInfo(13372825,400000000),[topic_b-live,19] -> PartitionFetchInfo(2382784277,400000000),[topic_b-user,2] -> PartitionFetchInfo(2230117428,400000000),[topic_b_like,20] -> PartitionFetchInfo(0,400000000),[topic_b-group,32] -> PartitionFetchInfo(0,400000000),[topic_b_live,1] -> PartitionFetchInfo(1352237,400000000),[topic_b-log,1] -> PartitionFetchInfo(12598395616,400000000),[topic_b_microvideo,18] -> PartitionFetchInfo(0,400000000),[topic_b-group,92] -> PartitionFetchInfo(0,400000000),[bl_generic_event_detail,10] -> PartitionFetchInfo(27302121,400000000),[__consumer_offsets,14] -> PartitionFetchInfo(0,400000000),[__consumer_offsets,18] -> PartitionFetchInfo(0,400000000),[topic_b_group,12] -> PartitionFetchInfo(7323,400000000),[topic_b_feed,19] -> PartitionFetchInfo(232378,400000000),[topic_b_nearby,26] -> PartitionFetchInfo(0,400000000),[topic_b_user,20] -> PartitionFetchInfo(265,400000000),[topic_b_banners,2] -> PartitionFetchInfo(0,400000000). Possible cause: java.lang.OutOfMemoryError: Java heap space (kafka.server.ReplicaFetcherThread)
[2017-08-10 11:09:55,310] WARN [ReplicaFetcherThread-3-23], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 5; ClientId: ReplicaFetcherThread-3-23; ReplicaId: 20; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic_b_microvideo,8] -> PartitionFetchInfo(37116,400000000),[topic_b-common,12] -> PartitionFetchInfo(5378147776,400000000),[topic_b_user,2] -> PartitionFetchInfo(76267,400000000),[topic_b-group,82] -> PartitionFetchInfo(0,400000000),[bl_generic_event_detail,24] -> PartitionFetchInfo(27457189,400000000),[topic_b-group,22] -> PartitionFetchInfo(0,400000000),[topic_b_like,10] -> PartitionFetchInfo(0,400000000),[topic_b-nearby,28] -> PartitionFetchInfo(471861516,400000000),[topic_b_nearby,16] -> PartitionFetchInfo(28,400000000),[topic_b_feed,9] -> PartitionFetchInfo(79605,400000000),[topic_a,10] -> PartitionFetchInfo(13365304,400000000),[topic_b-live,1] -> PartitionFetchInfo(2381944301,400000000),[topic_b_log,13] -> PartitionFetchInfo(366613,400000000). Possible cause: java.nio.channels.ClosedSelectorException (kafka.server.ReplicaFetcherThread)
[2017-08-10 11:09:55,310] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer)
[2017-08-10 11:09:55,671] ERROR OOME with size 491278751 (kafka.network.BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space

原因 :replica.fetch.max.bytes参数设置的太大,在集群正常运行的情况下不会有问题,一旦某台broker挂掉重启后需要追的分区数过多,每个需要追数的partition都是申请一块replica.fetch.max.bytes大小的内存造成OOM。

解决办法:减少replica.fetch.max.bytes的值

二 重启后分区完全无法自平衡leader

版本:0.8.2.2

错误现象:1 broker挂掉重启后启动正常,追数据正常,所有副本进入isr后controller不会自动指定为leader。所有服务端日志正常,crontroller不打日志

2 jstack crontroller 线程栈发现delete-topics-thread一直block,另有Controller-xx-to-broker-yy-send-thread block在deleteTopicStopReplicaCallback位置

"Controller-29-to-broker-26-send-thread" prio=10 tid=0x00007f6520041000 nid=0x4d9d waiting on condition [0x00007f649e90e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000005d3831d38> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:871)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at kafka.utils.Utils$.inLock(Utils.scala:533)
at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371)
at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338)
at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338)
at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:229)
at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:229)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:160)
- locked <0x000000065d279300> (a java.lang.Object)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"delete-topics-thread-29" prio=10 tid=0x00007f652006c000 nid=0x4da3 waiting on condition [0x00007f649dbdd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000065d2800e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:282)
at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
- locked <0x000000065d279858> (a java.lang.Object)
at kafka.controller.KafkaController.sendRequest(KafkaController.scala:668)
at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$9.apply(ControllerChannelManager.scala:312)
at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$9.apply(ControllerChannelManager.scala:309)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:309)
at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:302)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:302)
at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:115)
at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:337)
at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:327)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:327)
at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:360)
at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:306)
at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:305)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:305)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

原因:kafka版本bug,原因参考https://issues.apache.org/jira/browse/KAFKA-2046和https://issues.apache.org/jira/browse/KAFKA-2122,controller内部事件处理共用一个与controller.message.queue.size大小相关的队列,如果这个参数配置为一个过小的值的话,会导致controller内部线程互相block。

解决办法:调大controller.message.queue.size或者注销掉(默认值为Int.MaxValue)然后滚动重启所有状态不正常的controller,直到controller落到一个配置正常的broker上

时间: 2025-01-02 00:33:23

kafka 部分问题处理记录的相关文章

华为云kafka POC 踩坑记录

2019/03/08 18:29 最近在进行华为云相关POC验证,个人主要负责华为云DMS kafka相关.大致数据流程是,从DIS取出数据,进行解析处理,然后放入kafka,再从kafka中取出数据然后放到ElasticSearch以及OBS里面.kafka作为中间层次,发挥着中间件的重要作用.关于华为云kafka的整合,这两天的确碰到一些坑,现进行相关总结加以记录. 第一个问题:kafka的jar包不要用开源的,而是用图中libs的华为官方的jar包.新建libs文件目录,然后把jarBao

Kafka日志及Topic数据清理

由于项目原因,最近经常碰到Kafka消息队列拥堵的情况.碰到这种情况为了不影响在线系统的正常使用,需要大家手动的清理Kafka Log.但是清理Kafka Log又不能单纯的去删除中间环节产生的日志,中间关联的很多东西需要手动同时去清理,否则可能会导致删除后客户端无法消费的情况. 在介绍手动删除操作之前,先简单的介绍一下Kafka消费Offset原理. 一.Kafka消费Offset 在通过Client端消费Kafka中的消息时,消费的消息会同时在Zookeeper和Kafka Log中保存,如

Flume+Kafka+Storm+Redis实时分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型.当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要

storm集成kafka

kafkautil: import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; public class KafkaUtil { @Value("#{sys['connect']}") private static

Flume+Kafka+Strom基于分布式环境的结合使用

目录: 一.Flume.Kafka.Storm是什么,如何安装? 二.Flume.Kafka.Storm如何结合使用? 1) 原理是什么? 2) Flume和Kafka的整合  3) Kafka和Storm的整合  4) Flume.Kafka.Storm的整合    一.Flume.Kafka.Storm是什么,如何安装? Flume的介绍,请参考这篇文章<Flume1.5.0的安装.部署.简单应用> Kafka的介绍,请参考这篇文章<kafka2.9.2的分布式集群安装和demo(j

Kafka 官网文档翻译

Apache Kafka? is a distributed streaming platform. What exactly does that mean? Apache Kafka?是一个分布式平台. 这究竟是什么意思? We think of a streaming platform as having three key capabilities: 我们认为kafka平台有三个关键功能: It lets you publish and subscribe to streams of re

Kafka 术语

什么是Kafka? Apache Kafka是一个分布式流媒体平台,允许你发布和订阅记录流,允许你以容错方式存储记录流,允许你处理数据流.或是说Kafka是一个分布式.支持分区.多副本的,基于zookeeper协调的分布式消息系统.那自己眼中的kafka是什么? Kafka的术语 Topic(主题):特指Kafka要处理的记录源的分类,每一类记录称为一个topic,每一个记录由一个key.value和timestamp组成. Broker:Kafka集群中的一台或多台服务器(即kafka节点).

Kafka实战系列--Kafka API使用体验

前言: kafka是linkedin开源的消息队列, 淘宝的metaq就是基于kafka而研发. 而消息队列作为一个分布式组件, 在服务解耦/异步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要讲解kafka的一些基本概念和api的使用. *) 准备工作1) 配置maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</

【转】Kafka某topic无法消费解决方案&amp;Kafka某Topic数据清理

由于项目原因,最近经常碰到Kafka消息队列某topic在集群宕机重启后无法消费的情况.碰到这种情况,有三步去判断原因所在: step A:如果用kafka串口(即console-consumer)是可以正常消费该topic,则排除kafka集群出现故障 step B:若平台业务能正常消费其他topic的消息,则排除平台业务代码逻辑问题 step C:不到万不得已,则只能手动删除kafka的对应topic的Log,但是清理Kafka Log又不能单纯的去删除中间环节产生的日志,中间关联的很多东西