kafka深入研究之路(1)-剖析各原理02

kafka深入研究之路(1)-剖析各原理02

接着上一文的内容 继续升入研究

topic如何创建于删除的

topic的创建

具体流程文字为:

1、 controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2、 controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
     2.1、 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
     2.2、 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3、 controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

注意:此部分 和 partition 的leader选举过程很类似 都是需要 zk参与 相关信息都是记录到zk中
controller在这些过程中启到非常重要的作用。

topic的删除

文字过程:

1、 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2、 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。


前面我们讲到的很多的处理故障过程 包括 topic创建删除 partition leader的转换 broker发生故障的过程中如何保证高可用 都涉及到了一个组件 controller,关于kafka中出现的相关概念名词,我会专门的写一个博客,这里先简单的提一下。
Controller:Kafka 集群中的其中一个服务器,用来进行 Leader Election 以及各种 Failover。

大家有没有想过一个问题,就是如果controller出现了故障,怎么办,如何failover的呢?我们往下看。

首先我们最一个实验,我们在zk中找到controller在哪个broker上,并查看controller_epoch的次数

[zk: localhost:2181(CONNECTED) 14] ls /kafkagroup/controller
controller_epoch   controller
[zk: localhost:2181(CONNECTED) 14] ls /kafkagroup/controller
[]
[zk: localhost:2181(CONNECTED) 15] get /kafkagroup/controller
{"version":1,"brokerid":1002,"timestamp":"1566648802297"}
[zk: localhost:2181(CONNECTED) 22] get /kafkagroup/controller_epoch
23

我们可以看到当前的controller在1002上 在此之前发了23次controller的切换

我们手动到 1002节点上杀死kafka进程
[[email protected]$ jps
11665 Jps
10952 Kafka
11068 ZooKeeperMain
10495 QuorumPeerMain
[[email protected]$ kill -9 10952
[[email protected]$ jps
11068 ZooKeeperMain
11678 Jps
10495 QuorumPeerMain

再看zk上的信息,相关信息已经同步到zk中了

[zk: localhost:2181(CONNECTED) 16] get /kafkagroup/controller
{"version":1,"brokerid":1003,"timestamp":"1566665835022"}
[zk: localhost:2181(CONNECTED) 22] get /kafkagroup/controller_epoch
24
[zk: localhost:2181(CONNECTED) 25] ls /kafkagroup/brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 26] ls /kafkagroup/brokers/ids
[1003, 1001]

在后台日志中就会看到很多
[[email protected] logs]$ vim state-change.log
[2019-08-25 01:01:07,886] TRACE [Controller id=1003 epoch=24] Received response {error_code=0} for request UPDATE_METADATA wit
h correlation id 7 sent to broker 10.211.55.13:9092 (id: 1003 rack: null) (state.change.logger)
[[email protected] logs]$ pwd
/data/kafka/kafka-server-logs/logs
state改变的信息

[[email protected] logs]$ tailf controller.log
[2019-08-25 01:05:42,295] TRACE [Controller id=1003] Leader imbalance ratio for broker 1002 is 1.0 (kafka.controller.KafkaCont
roller)
[2019-08-25 01:05:42,295] INFO [Controller id=1003] Starting preferred replica leader election for partitions (kafka.controll
er.KafkaController)

接下来,我们具体的分析一下,他到底内部发生了什么,如何切换的
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:

1、 读取并增加 Controller Epoch。
2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4、 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5、 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6、 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7、 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8、 启动 replicaStateMachine 和 partitionStateMachine。
9、 将 brokerState 状态设置为 RunningAsController。
10、 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11、 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

可以看到,都是在zk上进行交互,controller的从新选举会依次通知 zk中相关的位置 并注册watcher ,在此过程中 就会发送 partition的leader的选举,还会发生partition-rebalanced 删除无用的topic等一系列操作(因为我们这里是直接考虑的最糟糕的情况就是broker宕机了一个,然而宕机的这台上就是controller)



consumer是如何消费消息的

重要概念:每个 Consumer 都划归到一个逻辑 Consumer Group 中,一个 Partition 只能被同一个 Consumer Group 中的一个 Consumer 消费,但可以被不同的 Consumer Group 消费。

若 Topic 的 Partition 数量为 p,Consumer Group 中订阅此 Topic 的 Consumer 数量为 c, 则:
p < c: 会有 c - p 个 consumer闲置,造成浪费
p > c: 一个 consumer 对应多个 partition
p = c: 一个 consumer 对应一个 partition
应该合理分配 Consumer 和 Partition 的数量,避免造成资源倾斜,
本人建议最好 Partiton 数目是 Consumer 数目的整数倍。

在consumer消费的过程中如何把partition分配给consumer?
也可以理解为consumer发生rebalance的过程是如何的?
生产过程中 Broker 要分配 Partition,消费过程这里,也要分配 Partition 给消费者。

类似 Broker 中选了一个 Controller 出来,消费也要从 Broker 中选一个 Coordinator,用于分配 Partition。// Coordinator 和 Controller 都是一个概念,协调者 组织者

当 Partition 或 Consumer 数量发生变化时,比如增加 Consumer,减少 Consumer(主动或被动),增加 Partition,都会进行 consumer的Rebalance。//发生rebalance发生在consumer端

见图:

文字信息为:
1、Consumer 给 Coordinator 发送 JoinGroupRequest 请求。这时其他 Consumer 发 Heartbeat 请求过来时,Coordinator 会告诉他们,要 Rebalance了。其他 Consumer 也发送 JoinGroupRequest 请求。
2、Coordinator 在 Consumer 中选出一个 Leader,其他作为 Follower,通知给各个 Consumer,对于 Leader,还会把 Follower 的 Metadata 带给它。
3、Consumer Leader 根据 Consumer Metadata 重新分配 Partition。
4、Consumer 向 Coordinator 发送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 会包含分配的情况。
5、Coordinator 回包,把分配的情况告诉 Consumer,包括 Leader。

接下来思考一个问题,consumer是如何取消息的 Consumer Fetch Message
Consumer 采用"拉模式"消费消息,这样 Consumer 可以自行决定消费的行为。
Consumer 调用 Poll(duration)从服务器拉取消息。拉取消息的具体行为由下面的配置项决定:

#consumer.properties
#消费者最多 poll 多少个 record
max.poll.records=500
#消费者 poll 时 partition 返回的最大数据量
max.partition.fetch.bytes=1048576
#Consumer 最大 poll 间隔
#超过此值服务器会认为此 consumer failed
#并将此 consumer 踢出对应的 consumer group
max.poll.interval.ms=300000

小结:
1、在 Partition 中,每个消息都有一个 Offset。新消息会被写到 Partition 末尾(最新的一个 Segment 文件末尾), 每个 Partition 上的消息是顺序消费的,不同的 Partition 之间消息的消费顺序是不确定的。
2、若一个 Consumer 消费多个 Partition, 则各个 Partition 之前消费顺序是不确定的,但在每个 Partition 上是顺序消费。
3、若来自不同 Consumer Group 的多个 Consumer 消费同一个 Partition,则各个 Consumer 之间的消费互不影响,每个 Consumer 都会有自己的 Offset。

举个官方小栗子:



Offset 如何保存?
Consumer 消费 Partition 时,需要保存 Offset 记录当前消费位置。

Offset 可以选择自动提交或调用 Consumer 的 commitSync() 或 commitAsync() 手动提交,相关配置为:

#是否自动提交 offset
enable.auto.commit=true
#自动提交间隔。enable.auto.commit=true 时有效
auto.commit.interval.ms=5000
//enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。5 秒

Offset 保存在名叫 __consumeroffsets 的 Topic 中。写消息的 Key 由 GroupId、Topic、Partition 组成,Value 是 Offset。

一般情况下,每个 Key 的 Offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存,第一次就会遍历 Partition 建立缓存,然后查询返回

__consumeroffsets 的 Partition 数量由下面的 Server 配置决定:

offsets.topic.num.partitions=50

默然的consumeroffsets是没有repale副本的 需要我们通过在一开始的参数指定,或者通过后期的增加 consumeroffsets 的副本json的方式动态添加

auto.create.topics.enable=true
default.replication.factor=2
num.partitions=3

Offset 保存在哪个分区上,即 __consumeroffsets 的分区机制,可以表示为
groupId.hashCode() mode groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 是上面配置的分区数。因为一个 Partition 只能被同一个 Consumer Group 的一个 Consumer 消费,因此可以用 GroupId 表示此 Consumer 消费 Offeset 所在分区。



参考链接Wie:
Kafka学习之路 (三)Kafka的高可用 https://www.cnblogs.com/qingyunzong/p/9004703.html
理解Kafka消费者属性的enable.auto.commit https://blog.csdn.net/chaiyu2002/article/details/89472416

原文地址:https://blog.51cto.com/12445535/2432350

时间: 2024-10-11 15:55:18

kafka深入研究之路(1)-剖析各原理02的相关文章

kafka深入研究之路(1)-剖析各原理

kafka深入研究之路(1)-剖析各原理 引言:来到了新公司,需要对kafka组件有很深的研究,本人之前对老版的kafka有过一定的研究,但是谈不上深入,新公司力推kafka,比较kafka作为消息系统在目前的市场上的占有率还是很高的,可以看本人之前kafka的博客中有关kafka的优点和为什么要用kafka.在众多优点中,我本人认为最重要的2个优点如下: 1.削峰数据库的处理能力是有限的,在峰值期,过多的请求落到后台,一旦超过系统的处理能力,可能会使系统挂掉. 如上图所示,系统的处理能力是 2

kafka深入研究之路(2) kafka简介与专业术语解释说明

目录:1.kafka简介 什么是kafka? 设计目标是什么?2.kafka的优缺点3.kafka中专业术语解释说明 官方网站: http://kafka.apache.org/introkafka中文教程 http://orchome.com/kafka/index 1/ kafka 简介Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等

kafka深度研究之路(3)-kafka 与zk 集群启停脚本

编写 kafka集群 启动 和 停止脚本 [[email protected] scripts]$ cat kafka_cluster_stop.sh #!/bin/bash -x #date 20190801 #autnor majihui clush -g all "source /etc/profile ;jps|grep Kafka|awk '{print $1}' | xargs kill -9 " [[email protected] scripts]$ cat kafka

kafka深度研究之路(4)-kafka和zookeeper 配置文件详细说明(来龙去脉)之zk配置

目录1/Zookeeper配置文件详解2/kafka配置文件参数详解3/生产环境 zk 与 kafka 配置文件备注4/kafka命令详解 1/安装完zookeeper 对其配置文件详解 zookeeper-3.4.14.tar.gz在安装zookeeper的时候我们要去修改zookeeper预装是conf目录下面的zoo_sample.cfg这个文件,首先我们要做的事就是重命名这个文件[[email protected] conf]$ cp zoo_sample.cfg zoo.cfg[[em

kafka深度研究之路(4)-kafka和zk 配置文件详细说明(来龙去脉)之kafka配置

2/kafka配置文件参数详解 默认必须配置的参数 默认 kafka server.properties 配置如下: ############################# Server Basics ############################# # 服务器基础知识 # The id of the broker. This must be set to a unique integer for each broker. # 必须为每个代理设置一个唯一的整数 broker.id=

kafka深度研究之路(5)-kafka新版常用命令汇总

小结:1/列出topic的命令为:kafka-topics.sh --zookeeper 10.211.55.11:2181,10.211.55.12:2181,10.211.55.13:2181/kafkagroup --list2/删除topic的命令为:kafka-topics.sh --delete --zookeeper 10.211.55.11:2181,10.211.55.12:2181,10.211.55.13:2181/kafkagroup --topic topic-maji

剖析Vue原理&amp;实现双向绑定MVVM

剖析Vue原理&实现双向绑定MVVM vue.js 双向绑定 javascript 邓木琴居然被盗用了 2016年08月16日发布 推荐 24 推荐 收藏 195 收藏,10.6k 浏览 本文能帮你做什么?1.了解vue的双向数据绑定原理以及核心代码模块2.缓解好奇心的同时了解如何实现双向绑定为了便于说明原理与实现,本文相关代码主要摘自vue源码, 并进行了简化改造,相对较简陋,并未考虑到数组的处理.数据的循环依赖等,也难免存在一些问题,欢迎大家指正.不过这些并不会影响大家的阅读和理解,相信看完

TRUNCATE TABLE恢复系列一:深层剖析内部原理

叮叮铛-今天我们推出Oracle异常恢复的第一个系列:"TRUNCATE TABLE恢复系列",这个系列主要围绕truncate table实现的内部原理和几种恢复方式来展开. 深层剖析内部原理 众所周知,truncate table是一种快速清空表内数据的一种方式,与delete方式不同,truncate只产生非常少的redo和undo,就实现了清空表数据并降低表HWM的功能.我们通过10046和redo dump来分析truncate的整个操作过程,其中10046用于观察trunc

KMP的自我研究之路(一)

经过一天的酝酿思考,我尝试去理解KMP算法的创造过程,最终得出了那么一点皮毛,今天我就来记录一下我的结果吧 首先,介绍KMP算法的详细资料网络上有很多,大家随意google.wiki.百度应该都能找到了. 我这里要讲的不是KMP算法要怎么实现,而是KMP算法的出现过程,这个过程是我自己原创的,不具有任何绝对的说法,重点只是在于帮助自己或者别人理解这个让人难以理解的东西. 下面就是我的思考路径: 首先我们随便找两个字符串,一个称为"主串",另一个称为"匹配串",我们的