Kafka Rebalance机制分析

什么是 Rebalance

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。

例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。

触发 Rebalance 的时机

Rebalance 的触发条件有3个。

  • 组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
  • 订阅的 Topic 个数发生变化。
  • 订阅 Topic 的分区数发生变化。

Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。

Rebalance 过程分析

Rebalance 过程分为两步:Join 和 Sync。

  1. Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

  1. Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

Rebalance 场景分析

新成员加入组

组成员“崩溃”

组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期(心跳周期)才能检测到这种崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。

组成员主动离开组

提交位移

如何避免不必要的rebalance

要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 发生的时机有三个:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后两个我们大可以人为的避免,发生rebalance最常见的原因是消费组成员的变化。

消费者成员正常的添加和停掉导致rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group。从而导致rebalance。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经 “死” 了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。这个时间可以通过Consumer 端的参数 session.timeout.ms进行配置。默认值是 10 秒。

除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。

通过上面的分析,我们可以看一下那些rebalance是可以避免的:

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 。

相关概念

coordinator

Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。Kafka在0.9之前是基于Zookeeper来存储Partition的Offset信息(consumers/{group}/offsets/{topic}/{partition}),因为ZK并不适用于频繁的写操作,所以在0.9之后通过内置Topic的方式来记录对应Partition的Offset。

每个Group都会选择一个Coordinator来完成自己组内各Partition的Offset信息,选择的规则如下:

  • 1,计算Group对应在__consumer_offsets上的Partition
  • 2,根据对应的Partition寻找该Partition的leader所对应的Broker,该Broker上的Group Coordinator即就是该Group的Coordinator

Partition计算规则:

partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

其中groupMetadataTopicPartitionCount对应offsets.topic.num.partitions参数值,默认值是50个分区

一次Rebalance所耗时间

测试环境

1个Topic,10个partition,3个consumer

在本地环境进行测试

测试结果

经过几轮测试发现每次rebalance所消耗的时间大概在 80ms~100ms平均耗时在87ms左右。

原文地址:https://www.cnblogs.com/yoke/p/11405397.html

时间: 2024-08-30 10:46:06

Kafka Rebalance机制分析的相关文章

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

QT开发(六十三)——QT事件机制分析

QT开发(六十三)--QT事件机制分析 一.事件机制 事件是由系统或者QT平台本身在不同的时刻发出的.当用户按下鼠标.敲下键盘,或者是窗口需要重新绘制的时候,都会发出一个相应的事件.一些事件在对用户操作做出响应时发出,如键盘事件等:另一些事件则是由系统自动发出,如计时器事件. 事件的出现,使得程序代码不会按照原始的线性顺序执行.线性顺序的程序设计风格不适合处理复杂的用户交互,如用户交互过程中,用户点击"打开文件"将开始执行打开文件的操作,用户点击"保存文件"将开始执

Linux通信之poll机制分析

poll机制分析 韦东山 2009.12.10 所有的系统调用,基于都可以在它的名字前加上"sys_"前缀,这就是它在内核中对应的函数.比如系统调用open.read.write.poll,与之对应的内核函数为:sys_open.sys_read.sys_write.sys_poll. 一.内核框架: 对于系统调用poll或select,它们对应的内核函数都是sys_poll.分析sys_poll,即可理解poll机制. sys_poll函数位于fs/select.c文件中,代码如下:

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

Nginx处理stale事件机制分析

Nginx为提高效率采用描述符缓冲池(连接池)来处理tcp连接,一个连接对应一个读事件和一个写事件,nginx在启动的时候会创建好所用连接和事件,当事件来的时候不用再创建,然而连接池的使用却存在stale事件的问题,以下将详细分析Nginx是如何处理stale事件的,该问题涉及到epoll.Nginx连接与事件的相关知识. 1      Epoll的实现原理 epoll相关的系统调用有:epoll_create, epoll_ctl和epoll_wait.Linux-2.6.19又引入了可以屏蔽

Linux x86_64 APIC中断路由机制分析

不同CPU体系间的中断控制器工作原理有较大差异,本文是<Linux mips64r2 PCI中断路由机制分析>的姊妹篇,主要分析Broadwell-DE X86_64 APIC中断路由原理.中断配置和处理过程,并尝试回答如下问题: 为什么x86中断路由使用IO-APIC/LAPIC框架,其有什么价值? pin/irq/vector的区别.作用,取值范围和分配机制? x86_64 APIC关键概念 Pin 此处的pin特指APIC的中断输入引脚,与内外部设备的中断输入信号相连.从上图中可以看出,

[转]易语言消息机制分析(消息拦截原理)

标 题: [原创]易语言消息机制分析(消息拦截原理)作 者: 红绡枫叶时 间: 2014-12-17,12:41:44链 接: http://bbs.pediy.com/showthread.php?t=195626 我自己做了个易语言的sig签名,方便分析的时候用.易语言例子是静态编译的.版本 5.11易语言其实是基于mfc的,它依然需要mfc的消息派发机制,只不过,自己当了系统与用户间的代理人.所有的消息都要经它转发而已.我在MFC的消息派发函数_AfxDispatchCmdMsg下断点,总

UVM基础之---------uvm report 机制分析

uvm 中的信息报告机制相对来说比较简单,功能上来说主要分为两部分: 第一通过ID对component的信息报告冗余级别进行控制,针对每个冗余级别进行不同的行为控制.这部分工作主要由uvm_report_hander来实现: 主要涉及到的方法有get_report_verbosity_level(severity, id)/get_report_action(severity,id) == uvm_action'(UVM_NO_ACTION) 第二是对message进行格式化的输出,这部分工作主