Kafka消费者组再均衡问题

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。

所以对于Rebalance来说,Coordinator起着至关重要的作用,那么怎么查看消费者对应的Coordinator呢,我们知道某个消费者组对应__consumer_offsets中的哪个Partation是通过hash计算出来的:partation=hash("test_group_1")%50=28,表示test_group_1这个消费者组属于28号partation,通过命令:

./kafka-topics.sh --zookeeper 192.168.33.11:2181 --describe --topic __consumer_offsets

可以找到28号Partation所对应的信息:

从而可以知道coordinator对应的broker为1

在Rebalance期间,消费者会出现无法读取消息,造成整个消费者群组一段时间内不可用,假设现在消费者组当中有A,代码逻辑执行10s,如果消费者组在消费的过程中consumer B加入到了该消费者组,并且B的代码逻辑执行20s,那么当A处理完后先进入Rebalance状态等待,只有当B也处理完后,A和B才真正通过Rebalance重新分配,这样显然A在等待的过程中浪费了资源。

消费者A:

 1 """
 2 consumer_rebalance_a.py a消费者
 3 """
 4 import pickle
 5 import uuid
 6 import time
 7 from kafka import KafkaConsumer
 8 from kafka.structs import TopicPartition, OffsetAndMetadata
 9 from kafka import ConsumerRebalanceListener
10
11 consumer = KafkaConsumer(
12     bootstrap_servers=[‘192.168.33.11:9092‘],
13     group_id="test_group_1",
14     client_id="{}".format(str(uuid.uuid4())),
15     enable_auto_commit=False,
16     key_deserializer=lambda k: pickle.loads(k),
17     value_deserializer=lambda v: pickle.loads(v)
18 )
19
20 # 用来记录最新的偏移量信息.
21 consumer_offsets = {}
22
23
24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
25     def on_partitions_revoked(self, revoked):
26         """
27         再均衡开始之前 下一轮poll之前触发
28         :param revoked:
29         :return:
30         """
31         print(‘再均衡开始之前被自动触发.‘)
32         print(revoked, type(revoked))
33         consumer.commit_async(offsets=consumer_offsets)
34
35     def on_partitions_assigned(self, assigned):
36         """
37         再均衡完成之后  即将下一轮poll之前 触发
38         :param assigned:
39         :return:
40         """
41         print(‘在均衡完成之后自动触发.‘)
42         print(assigned, type(assigned))
43
44
45 consumer.subscribe(topics=(‘round_topic‘,), listener=MineConsumerRebalanceListener())
46
47
48 def _on_send_response(*args, **kwargs):
49     """
50     提交偏移量涉及回调函数
51     :param args:
52     :param kwargs:
53     :return:
54     """
55     if isinstance(args[1], Exception):
56         print(‘偏移量提交异常. {}‘.format(args[1]))
57     else:
58         print(‘偏移量提交成功‘)
59
60
61 try:
62     start_time = time.time()
63     while True:
64         # 再均衡其实是在poll之前完成的
65         consumer_records_dict = consumer.poll(timeout_ms=100)
66
67         # 处理逻辑.
68         for k, record_list in consumer_records_dict.items():
69             for record in record_list:
70                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
71                     record.topic, record.partition, record.offset, record.key, record.value)
72                 )
73
74                 consumer_offsets[
75                     TopicPartition(record.topic, record.partition)
76                 ] = OffsetAndMetadata(
77                     record.offset + 1, metadata=‘偏移量.‘
78                 )
79
80         try:
81             consumer.commit_async(callback=_on_send_response)
82             time.sleep(10)
83         except Exception as e:
84             print(‘commit failed‘, str(e))
85
86 except Exception as e:
87     print(str(e))
88 finally:
89     try:
90         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
91         consumer.commit()
92         print("同步补救提交成功")
93     except Exception as e:
94         consumer.close()

消费者B:

  1 """
  2 consumer b.py 消费者B
  3 """
  4
  5 import pickle
  6 import uuid
  7 import time
  8 from kafka import KafkaConsumer
  9 from kafka.structs import TopicPartition, OffsetAndMetadata
 10 from kafka import ConsumerRebalanceListener
 11
 12 consumer = KafkaConsumer(
 13     bootstrap_servers=[‘192.168.33.11:9092‘],
 14     group_id="test_group_1",
 15     client_id="{}".format(str(uuid.uuid4())),
 16     enable_auto_commit=False,  # 设置为手动提交偏移量.
 17     key_deserializer=lambda k: pickle.loads(k),
 18     value_deserializer=lambda v: pickle.loads(v)
 19 )
 20
 21 consumer_offsets = {}  # 用来记录最新的偏移量信息.
 22
 23
 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
 25     def on_partitions_revoked(self, revoked):
 26         """
 27         再均衡开始之前 下一轮poll之前触发
 28         :param revoked:
 29         :return:
 30         """
 31         print(‘再均衡开始之前被自动触发.‘)
 32         print(revoked, type(revoked))
 33         consumer.commit_async(offsets=consumer_offsets)
 34
 35     def on_partitions_assigned(self, assigned):
 36         """
 37         再均衡完成之后  即将下一轮poll之前 触发
 38         :param assigned:
 39         :return:
 40         """
 41
 42         print(‘在均衡完成之后自动触发.‘)
 43         print(assigned, type(assigned))
 44
 45
 46 consumer.subscribe(topics=(‘round_topic‘,), listener=MineConsumerRebalanceListener())
 47
 48
 49 def _on_send_response(*args, **kwargs):
 50     """
 51     提交偏移量涉及回调函数
 52     :param args:
 53     :param kwargs:
 54     :return:
 55     """
 56
 57     if isinstance(args[1], Exception):
 58         print(‘偏移量提交异常. {}‘.format(args[1]))
 59     else:
 60         print(‘偏移量提交成功‘)
 61
 62
 63 try:
 64     start_time = time.time()
 65     while True:
 66         # 再均衡其实是在poll之前完成的
 67         consumer_records_dict = consumer.poll(timeout_ms=100)
 68
 69         record_num = 0
 70         for key, record_list in consumer_records_dict.items():
 71             for record in record_list:
 72                 record_num += 1
 73         print("---->当前批次获取到的消息个数是:{}".format(record_num))
 74
 75         # 处理逻辑.
 76         for k, record_list in consumer_records_dict.items():
 77             for record in record_list:
 78                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
 79                     record.topic, record.partition, record.offset, record.key, record.value)
 80                 )
 81
 82                 consumer_offsets[
 83                     TopicPartition(record.topic, record.partition)
 84                 ] = OffsetAndMetadata(record.offset + 1, metadata=‘偏移量.‘)
 85
 86         try:
 87             # 轮询一个batch 手动提交一次
 88             consumer.commit_async(callback=_on_send_response)
 89             time.sleep(20)
 90         except Exception as e:
 91             print(‘commit failed‘, str(e))
 92
 93 except Exception as e:
 94     print(str(e))
 95 finally:
 96     try:
 97         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
 98         consumer.commit()
 99         print("同步补救提交成功")
100     except Exception as e:
101         consumer.close()

消费者A和消费者B是同一个消费者组(test_group_1)的两个消费者,用time.sleep的方式模拟执行时间,A:10s,B:20s;首先A开始消费,当B新加入消费者组的时候会触发Rebalance,可以通过实现再均衡监听器(RebalanceListener)中的on_partitions_revoked和on_partitions_assigned方法来查看再均衡触发前后的partition变化情况,依次启动消费者A和B之后:

消费者A:
再均衡开始之前被自动触发.
{TopicPartition(topic=‘round_topic‘, partition=0), TopicPartition(topic=‘round_topic‘, partition=1), TopicPartition(topic=‘round_topic‘, partition=2)} <class ‘set‘>
<----------------------------------------
---------------------------------------->
在均衡完成之后自动触发.
{TopicPartition(topic=‘round_topic‘, partition=0), TopicPartition(topic=‘round_topic‘, partition=1)} <class ‘set‘>
<----------------------------------------

消费者B:
再均衡开始之前被自动触发.
set() <class ‘set‘>
<----------------------------------------
---------------------------------------->
在均衡完成之后自动触发.
{TopicPartition(topic=‘round_topic‘, partition=2)} <class ‘set‘>
<----------------------------------------

在等待B的逻辑执行完后,A和B进入再均衡状态;再均衡前A处于partition 0、1、 2三个分区,B不占有任何partition;当再均衡结束后,A占有partition 0、1,B占有partition 2;然后A和B分别开始消费对应的partition。

在上述消费者A和B的代码中重写了RebalanceListener,主要是为了在发生再均衡之前提交最后一个已经处理记录的偏移量,因为再均衡时消费者将失去对一个分区的所有权,如果消费者已经消费了当前partition还没提交offset,这时候发生再均衡会使得消费者重新分配partition,可能使得同一个消息先后被两个消费者消费的情况,实现MineConsumerRebalanceListener再均衡前提交一次offset,确保每一个消费者在触发再均衡前提交最后一次offset:

 1 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
 2     def on_partitions_revoked(self, revoked):
 3         """
 4         再均衡开始之前 下一轮poll之前触发
 5         :param revoked:
 6         :return:
 7         """
 8         print(‘再均衡开始之前被自动触发.‘)
 9         print(revoked, type(revoked))
10         consumer.commit_async(offsets=consumer_offsets)
11
12     def on_partitions_assigned(self, assigned):
13         """
14         再均衡完成之后  即将下一轮poll之前 触发
15         :param assigned:
16         :return:
17         """
18
19         print(‘在均衡完成之后自动触发.‘)
20         print(assigned, type(assigned))

再均衡发生的场景有以下几种:

1. 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)2. 订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance3. 订阅主题的分区数发生变更鉴于触发再均衡后会造成资源浪费的问题,所以我们尽量不要触发再均衡

原文地址:https://www.cnblogs.com/FG123/p/10095125.html

时间: 2024-10-16 14:31:55

Kafka消费者组再均衡问题的相关文章

Kafka消费者——API开发

目录 消费者客户端 订阅主题 订阅分区 取消订阅 订阅总结 消息消费 poll ConsumerRecord 位移提交 自动提交 手动提交 控制和关闭消费 指定位移消费 再均衡 消费者拦截器 消费者客户端 消费步骤: 1.配置消费者客户端参数并创建相应的消费者实例. 2.订阅主题. 3.拉取消息并消费 4.提交消费位移 5.关闭消费者实例 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_C

Kafka消费者之提交消息的偏移量

原文链接:https://cloud.tencent.com/developer/article/1462432 一.概述 在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中.把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交. 参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x ,图中也用了 lastCo

Kafka消费者手动提交消息偏移

生产者每次调用poll()方法时,它总是返回由生产者写入Kafka但还没有消费的消息,如果消费者一致处于运行状态,那么分区消息偏移量就没什么用处,但是如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费可能分配到新的分区,而不是之前处理的那个,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量制定的地方开始工作.消费者会往一个__consumer_offser的主题发送消息,消息里包含每个分区的偏移量. 1.同步提交 import o

Kafka 系列(四)—— Kafka 消费者详解

一.消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响.Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度.此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段. 需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费

Kafka消费组(consumer group)

一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka社区邮件组已经在讨论是否应该正式使用新版本consumer替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本consumer的些许设计理念,希望能把consumer这点事说清楚,从而对广大使用者有所帮助. 在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东

一文精通kafka 消费者的三种语义

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等. (一) 创建topic bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1 (二)

kafka 消费者offset记录位置和方式

我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同. 首先来说说消费者如果是根据javaapi来消费,也就是[kafka.javaapi.consumer.ConsumerConnector],我们会配置参数[zookeeper.connect]来消费.这种情况下,消费者的offset会更新到zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录下,例如: [z

Kafka消费者——重要参数配置

目录 bootstrap.servers group.id fetch.min.bytes fetch.max.bytes fetch.max.wait.ms max.partition.fetch.bytes max.poll.records connections.max.idle.ms exclude.internal.topics receive.buffer.bytes send.buffer.bytes request.timeout.ms metadata.max.age.ms r

Kafka消费者——结合spring开发

Kafka消费者端 可靠性保证 作为消费端,消费数据需要考虑的是: 1.不重复消费消息 2.不缺失消费消息 自动提交 offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(true) auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s) 手动提交offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(false) 异步提交也个缺点,那就