Kafka 核心组件之协调器

1、消费者与消费者组

假设某 topic 有4个分区,消费者组中只有一个消费者,那么这个消费者将消费全部 partition 中的数据。

如果消费者组中有两个消费者,那么每个消费者消费两个 partition。

如果消费者组中有4个消费者,那么每个消费者消费一个partition。

如果消费者组中有5个消费者,那么有一个消费者就是空闲的。

注意:在同一个消费者组中,不要让消费者的数量大于分区的数量

多个消费者组之间不会互相影响。

2、协调器

在 kafka-0.10 版本,Kafka 在服务端引入了组协调器(GroupCoordinator),每个 Kafka Server 启动时都会创建一个 GroupCoordinator 实例,用于管理部分消费者组和该消费者组下的每个消费者的消费偏移量

在客户端引入了消费者协调器(ConsumerCoordinator),实例化一个消费者就会实例化一个 ConsumerCoordinator 对象,ConsumerCoordinator 负责同一个消费者组下各消费者与服务端的 GroupCoordinator 进行通信

2.1 消费者协调器(ConsumerCoordinator)

ConsumerCoordinator 定义的位置:

public class KafkaConsumer<K, V> implements Consumer<K, V> {

    private final ConsumerCoordinator coordinator;

}

ConsumerCoordinator 是 KafkaConsumer 的一个私有的成员变量,因此 ConsumerCoordinator 中存储的信息也只有与之对应的消费者可见,不同消费者之间是看不到彼此的 ConsumerCoordinator 中的信息的

ConsumerCoordinator 的作用:

  • 处理更新消费者缓存的 Metadata 请求
  • 向组协调器发起加入消费者组的请求
  • 对本消费者加入消费者前后的相应处理
  • 请求离开消费者组(例如当消费者取消订阅时)
  • 向组协调器发送提交偏移量的请求
  • 通过一个定时的心跳检测任务来让组协调器感知自己的运行状态
  • Leader消费者的 ConsumerCoordinator 还负责执行分区的分配,一个消费者组中消费者 leader 由组协调器选出,leader 消费者的 ConsumerCoordinator 负责消费者与分区的分配,然后把分配结果发送给组协调器,然后组协调器再把分配结果返回给其他消费者的消费者协调器,这样减轻了服务端的负担

ConsumerCoordinator 实现上述功能的组件是 ConsumerCoordinator 类的私有成员或者是其父类的私有成员:

 1 public final class ConsumerCoordinator extends AbstractCoordinator {
 2     private final List<PartitionAssignor> assignors;
 3     private final OffsetCommitCallback defaultOffsetCommitCallback;
 4     private final SubscriptionState subscriptions;
 5     private final ConsumerInterceptors<?, ?> interceptors;
 6     private boolean isLeader = false;
 7     private MetadataSnapshot metadataSnapshot;
 8     private MetadataSnapshot assignmentSnapshot;
 9
10     省略了部分代码....
11 }
12
13
14 public abstract class AbstractCoordinator implements Closeable {
15     private enum MemberState {
16         UNJOINED,    // the client is not part of a group
17         REBALANCING, // the client has begun rebalancing
18         STABLE,      // the client has joined and is sending heartbeats
19     }
20
21     private final Heartbeat heartbeat;
22     protected final ConsumerNetworkClient client;
23     private HeartbeatThread heartbeatThread = null;
24     private MemberState state = MemberState.UNJOINED;
25     private RequestFuture<ByteBuffer> joinFuture = null;
26
27     省略了部分代码....
28 }

2.2 组协调器(GroupCoordinator)

GroupCoordinator 的作用:

  • 负责对其管理的组员(消费者)提交的相关请求进行处理
  • 与消费者之间建立连接,并从与之连接的消费者之间选出一个 leader
  • 当 leader 分配好消费者与分区的订阅关系后,会把结果发送给组协调器,组协调器再把结果返回给各个消费者
  • 管理与之连接的消费者的消费偏移量的提交,将每个消费者的消费偏移量保存到kafka的内部主题中
  • 通过心跳检测消费者与自己的连接状态
  • 启动组协调器的时候创建一个定时任务,用于清理过期的消费组元数据以及过去的消费偏移量信息

GroupCoordinator 依赖的组件及其作用

  • KafkaConfig:实例化 OffsetConfig 和 GroupConfig
  • ZkUtils:分消费者分配组协调器时从Zookeeper获取内部主题的分区元数据信息。
  • GroupMetadataManager:负责管理 GroupMetadata以及消费偏移量的提交,并提供了一系列的组管理的方法供组协调器调用。GroupMetadataManager 不仅把 GroupMetadata 写到kafka内部主题中,而且还在内存中缓存了一份GroupMetadata,其中包括了组员(消费者)的元数据信息,例如消费者的 memberId、leaderId、分区分配关系,状态元数据等。状态元数据可以是以下五种状态:
    • PreparingRebalance:消费组准备进行平衡操作
    • AwaitingSync:等待leader消费者将分区分配关系发送给组协调器
    • Stable:消费者正常运行状态,心跳检测正常
    • Dead:处于该状态的消费组没有任何消费者成员,且元数据信息也已经被删除
    • Empty:处于该状态的消费组没有任何消费者成员,但元数据信息也没有被删除,知道所有消费者对应的消费偏移量元数据信息过期。
  • ReplicaManager:GroupMetadataManager需要把消费组元数据信息以及消费者提交的已消费偏移量信息写入 Kafka 内部主题中,对内部主题的操作与对其他主题的操作一样,先通过 ReplicaManager 将消息写入 leader 副本,ReplicaManager 负责 leader 副本与其他副本的管理。
  • DelayedJoin:延迟操作类,用于监视处理所有消费组成员与组协调器之间的心跳超时
  • GroupConfig:定义了组成员与组协调器之间session超时时间配置

3、消费者协调器和组协调器的交互

3.1 心跳

消费者协调器通过和组协调器发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询获取消息或提交偏移量时发送心跳。

如果消费者停止发送心跳的时间足够长,会话就会过期,组协调器认为它已经死亡,就会触发一次再均衡。

在 0.10 版本里,心跳任务由一个独立的心跳线程来执行,可以在轮询获取消息的空档发送心跳。这样一来,发送心跳的频率(也就是组协调器群检测消费者运行状态的时间)与消息轮询的频率(由处理消息所花费的时间来确定)之间就是相互独立的。在0.10 版本的 Kafka 里,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livelock),比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配置与 session.timeout.ms 是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。

3.2 分区再平衡

发生分区再均衡的3种情况:

  • 一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。
  • 当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。如果一个消费者主动离开消费组,消费者会通知组协调器它将要离开群组,组协调器会立即触发一次再均衡,尽量降低处理停顿。如果一个消费者意外发生崩溃,没有通知组协调器就停止读取消息,组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。
  • 在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡

再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

3.3 leader 消费者分配分区的策略

当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为leader消费者。leader消费者从组协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。

每个消费者的消费者协调器在向组协调器请求加入组时,都会把自己支持的分区分配策略报告给组协调器(轮询或者是按跨度分配或者其他),组协调器选出该消费组下所有消费者都支持的的分区分配策略发送给leader消费者,leader消费者根据这个分区分配策略进行分配。

完毕之后,leader消费者把分配情况列表发送给组协调器,消费者协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有leader消费者知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

3.4 消费者入组过程

  • 消费者创建后,消费者协调器会选择一个负载较小的节点,向该节点发送寻找组协调器的请求
  • KafkaApis 处理请求,调用返回组协调器所在的节点,过程如下:

  • 找到组协调器后,消费者协调器申请加入消费组,发送 JoinGroupRequest请求
  • KafkaApis 调用 handleJoinGroup() 方法处理请求
    • 把消费者注册到消费组中
    • 把消费者的clientId与一个UUID值生成一个memberId分配给消费者
    • 构造器该消费者的MemberMetadata信息
    • 把该消费者的MemberMetadata信息注册到GroupMetadata中
    • 第一个加入组的消费者将成为leader
  • 把处理JoinGroupRequest请求的结果返回给消费者
  • 加入组成功后,进行分区再均衡

原文地址:https://www.cnblogs.com/hyunbar/p/12527014.html

时间: 2024-12-30 00:04:28

Kafka 核心组件之协调器的相关文章

Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

阅读背景:您需要对Zk,Kafka有基础的了解 本章主题:详尽的梳理ZkCoordinator的过程 package com.mixbox.storm.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mixbox.storm.kafka.trident.GlobalPartitionInformation; import java.util.*; import static com.mixbox

CC2530作为协调器或路由器,最大可以带多少子节点

默认值是5.20.6,共3万个节点. MAX_DEPTH决定了网络的最大深度.协调器(Coordinator)位于深度0,MAX_DEPTH参数限制了网络在物理上的长度,这个值默认是5,最大不能超过15.    MAX_CHILDREN决定了一个路由(Router)或者一个协调器节点可以处理的儿子节点的最大个数.    MAX_ROUTER 决定了一个路由(Router)或者一个协调器(Coordinator)节点可以处理的具有路由功能的儿子节点的最大个数.这个参数是 MAX_CHILDREN的

Zigbee协议栈是怎样分别执行协调器、路由器、终端的任务的?

这个问题虽然简单,却困扰了差不多一个星期,归根结底还是自己对协议栈的不熟悉. 废话不多说! 假如组网后,协调器需要执行的任务是:组播,终端要执行的任务是:单播.那协议栈是怎么做到协调器只执行组播的任务,而不知执行单播的任务,终端又是怎样只执行单播的任务而不执行组播的任务呢? 下面是官方给出的代码 1 // Received whenever the device changes state in the network 2 case ZDO_STATE_CHANGE://网络状态发生改变 3 S

无线传感网CC2530终应端、路由以及协调器系统用

要做到目视千里,耳听八方是人类长久的梦想,现代卫星技术的出现虽然使人们离这目标又进了一步,但卫星高高在上,洞察全局在行,明察细微就不管用了.这个时候,本文的主角—无线传感器网络就排上用场了.将大量的传感器节点遍撒指定区域,数据通过无线电波传回监控中心,监控区域内的所有信息就会尽收观察者的眼中了. 闲话不说,直接进入正题.想让传感数据回来,总得有一套可以“采集传感器数据,打包发送数据给上层”的系统,这里就程序简单说明一下该系统的实现. 主开发程序在/ZStack-CC2530-2.3.0-1.4.

ZigBee中协调器如何向子节点发消息?

注意:以下所有内容均以TI公司的CC2530和Z-Stack为硬软件平台为实验背景讲述. 在一般的ZigBee教程中,子节点如何向协调器发送消息已经被描述的非常清楚了:即子节点直接使用API向地址为0x0000的协调器发送消息即可.用到的函数如下: afStatus_t AF_DataRequest( afAddrType_t *dstAddr, endPointDesc_t *srcEP,uint16 cID, uint16 len, uint8 *buf, uint8 *transID,ui

读取cc2530节点的设备类型、协调器、路由器、终端。

建立网络.加入网络流程分析 协调器节点:在1-10  实验8 网络通信实验2 组播通信中 while(MSGpkt) { switch(MSGpkt->hdr.event) { case ZDO_STATE_CHANGE:  //建立网络后,设置事件 GenericApp_NwkState=(devStates_t)(MSGpkt->hdr.status);//??????? if(GenericApp_NwkState==DEV_ZB_COORD)//把该节点已初始化为协调器,则执行下面的

做一个终端发送和协调器接收实验

在终端的应用层任务挂上一个11号端点,接收(协调器)模块的应用层任务挂上7号端点和8号端点,其中7号端点上有两个簇0x0001和0x0002,8号端点上只有一个簇0x0001: 发送终端上有key3和key4,key5;接收器模块有led1,led2和led3,以及一个数码管. 按键3按下使接收模块的7号端点下面的簇0x0001接收消息,使LED1取反且让数码管显示3:按键4按下使led2取反,且使数码管显示4,同理,对于按键5对应8号端点的簇0x0001使led3取反,且数码管显示5. 在前面

运行中修改协调器PAN ID和Channel,协调器广播至所有的设备重启加入新建后的网络

从飞比论坛那边看到一个问题,记录下: 一个ZigBee网络运行中,协调器通过串口与上位机连接,上位机设置协调器的PAN ID和Channel,协调器收到配置信息后, 首先将PAN ID和Channel广播发送至网络中的所有节点,然后协调器自身重启,加入新网络.部分代码如下: //PAN ID uint16 panId = BUILD_UINT16(msg[LOCDONGLE_CONFIG_PANID_LO_IDX], msg[LOCDONGLE_CONFIG_PANID_HI_IDX]); //

TI Zigbee中协调器和其他设备固定PANID通信,设置PANID

最近一直在研究zigbee,因为有通过固定PANID通信的需求,因此做了大量的实验,上了很多论坛,我想我必须总结一下,以免大家再走一遍我这样的路. 非常感谢很多写下技术文档和在TI论坛上积极回复的工程师. 一. 通过配置文件设置PANID 最简单的方式是修改 f8wConfig.cfg  中的-DZDAPP_CONFIG_PAN_ID 另外预定义NV_RESTORE和NV_INIT. 但有两个问题: 1.当协调器断电之后,重新上电时,周围已经有另一协调器也是这个PANID,则设备无法与原协调器连