经常有人问的一个问题就是:Kafka broker到底是不是无状态的?网上有这样的说法:
正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的。。。。。。
我猜想作者的意思应该是说:broker不保存消费者的状态。如果从这个角度来说,broker无状态的说法倒也没有什么问题。不过实际上,broker是有状态的服务:每台broker在内存中都维护了集群上所有节点和topic分区的状态信息——Kafka称这部分状态信息为元数据缓存(metadata cache)。本文就将讨论一下这个metadata cache的设计与实现。
1. cache里面存了什么?
首先,我们来看下cache里面都存了什么,我们以Kafka 1.0.0版本作为分析对象。Metadata cache中保存的信息十分丰富,几乎囊括了Kafka集群的各个方面,它包含了:
- controller所在的broker ID,即保存了当前集群中controller是哪台broker
- 集群中所有broker的信息:比如每台broker的ID、机架信息以及配置的若干组连接信息(比如配置了PLAINTEXT和SASL监听器就有两套连接信息,分别使用不同的安全协议和端口,甚至主机名都可能不同)
- 集群中所有节点的信息:严格来说,它和上一个有些重复,不过此项是按照broker ID和监听器类型进行分组的。对于超大集群来说,使用这一项缓存可以快速地定位和查找给定节点信息,而无需遍历上一项中的内容,算是一个优化吧
- 集群中所有分区的信息:所谓分区信息指的是分区的leader、ISR和AR信息以及当前处于offline状态的副本集合。这部分数据按照topic和分区ID进行分组,可以快速地查找到每个分区的当前状态。(注:AR表示assigned replicas,即创建topic时为该分区分配的副本集合)
2. 每台broker都保存相同的cache吗?
是的,至少Kafka在设计时的确是这样的愿景:每台Kafka broker都要维护相同的缓存,这样客户端程序(clients)随意地给任何一个broker发送请求都能够获取相同的数据,这也是为什么任何一个broker都能处理clients发来的Metadata请求的原因:因为每个broker上都有这些数据!要知道目前Kafka共有38种请求类型,能做到这一点的可谓少之又少。每个broker都能处理的能力可以缩短请求被处理的延时从而提高整体clients端的吞吐,因此用空间去换一些时间的做法是值得的。
3. cache是怎么更新的?
如前所述,用空间去换时间,好处是降低了延时,提升了吞吐,但劣势就在于你需要处理cache的更新并且维护一致性。目前Kafka是怎么更新cache的?简单来说,就是通过发送异步更新请求(UpdateMetadata request)来维护一致性的。既然是异步的,那么在某一个时间点集群上所有broker的cache信息就未必是严格相同的。只不过在实际使用场景中,这种弱一致性似乎并没有太大的问题。原因如下:1. clients并不是时刻都需要去请求元数据的,且会缓存到本地;2. 即使获取的元数据无效或者过期了,clients通常都有重试机制,可以去其他broker上再次获取元数据; 3. cache更新是很轻量级的,仅仅是更新一些内存中的数据结构,不会有太大的成本。因此我们还是可以安全地认为每台broker上都有相同的cache信息。
具体的更新操作实际上是由controller来完成的。controller会在一定场景下向特定broker发送UpdateMetadata请求令这些broker去更新它们各自的cache,这些broker一旦接收到请求便开始全量更新——即清空当前所有cache信息,使用UpdateMetadata请求中的数据来重新填充cache。
4. cache什么时候更新?
实际上这个问题等同于:controller何时向特定broker发送UpdateMetadata请求? 如果从源码开始分析,那么涉及到的场景太多了,比如controller启动时、新broker启动时、更新broker时、副本重分配时等等。我们只需要记住:只要集群中有broker或分区数据发生了变更就需要更新这些cache。
举个经常有人问的例子:集群中新增加的broker是如何获取这些cache,并且其他broker是如何知晓它的?当有新broker启动时,它会在Zookeeper中进行注册,此时监听Zookeeper的controller就会立即感知这台新broker的加入,此时controller会更新它自己的缓存(注意:这是controller自己的缓存,不是本文讨论的metadata cache)把这台broker加入到当前broker列表中,之后它会发送UpdateMetadata请求给集群中所有的broker(也包括那台新加入的broker)让它们去更新metadata cache。一旦这些broker更新cache完成,它们就知道了这台新broker的存在,同时由于新broker也更新了cache,故现在它也有了集群所有的状态信息。
5. 目前的问题?
前面说过了,现在更新cache完全由controller来驱动,故controller所在broker的负载会极大地影响这部分操作(实际上,它会影响所有的controller操作)。根据目前的设计,controller所在broker依然作为一个普通broker执行其他的clients请求处理逻辑,所以如果controller broker一旦忙于各种clients请求(比如生产消息或消费消息),那么这种更新操作的请求就会积压起来(backlog),造成了更新操作的延缓甚至是被取消。究其根本原因在于当前controller对待数据类请求和控制类请求并无任何优先级化处理——controller一视同仁地对待这些请求,而实际上我们更希望controller能否赋予控制类请求更高的优先级。社区目前已经开始着手改造当前的设计,相信在未来的版本中此问题可以得到解决。
本文探讨了一些关于metadata cache方面的内容,因为时间有限,并没有涵盖方方面面。不过对于我们了解cache的工作原理应该可以还是有帮助的~~
原文地址:https://www.cnblogs.com/huxi2b/p/8440429.html