Kafka设计解析(四)- Kafka Consumer设计解析

本文转发自Jason’s Blog原文链接 http://www.jasongj.com/2015/08/09/KafkaColumn4

摘要

   本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景。以及未来版本中对High Level Consumer的重新设计–使用Consumer Coordinator解决Split Brain和Herd等问题。

High Level Consumer

   很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费(广播)。因此,Kafka Hight Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。   

Consumer Group

  High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中(Kafka从0.8.2版本开始同时支持将offset存于Zookeeper中与将offset存于专用的Kafka Topic中)。 这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,而非某个Topic的。每一个High Level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group。
  Zookeeper中Consumer相关节点如下图所示

  
 
 很多传统的Message
Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证Queue的长度比较短,提高效率。而如上文所述,Kafka并不删除
已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer
Group里只会被某一个Consumer消费。与传统Message Queue不同的是,Kafka还允许不同Consumer
Group同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。

  
 
 实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时
使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的
Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。

  
 
 为了更清晰展示Kafka Consumer Group的特性,笔者进行了一项测试。创建一个Topic
(名为topic1),再创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,然后通过
Producer向topic1发送Key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时
group2中的3个Consumer分别收到了Key为1,2,3的消息,如下图所示。

  注:上图中每个黑色区域代表一个Consumer实例,每个实例只创建一个MessageStream。实际上,本实验将Consumer应用程序打成jar包,并在4个不同的命令行终端中传入不同的参数运行。

High Level Consumer Rebalance

  (本节所讲述Rebalance相关内容均基于Kafka High Level Consumer)
 
 Kafka保证同一Consumer
Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个Consumer实例只会消费某一个或多个特定
Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以
Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个Consumer
Group里的Consumer均匀消费数据,优势是每个Consumer不用都跟大量的Broker通信,减少通信开销,同时也降低了分配难度,实现也
更简单。另外,因为同一个Partition里的数据是有序的,这种设计可以保证每个Partition里的数据可以被有序消费。
  如果某
Consumer
Group中Consumer(每个Consumer只创建1个MessageStream)数量少于Partition数量,则至少有一个
Consumer会消费多个Partition的数据,如果Consumer的数量与Partition数量相同,则正好一个Consumer消费一个
Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消
息。
  如下例所示,如果topic1有0,1,2共三个Partition,当group1只有一个Consumer(名为consumer1)时,该 Consumer可消费这3个Partition的所有数据。
  
  
 
 增加一个Consumer(consumer2)后,其中一个Consumer(consumer1)可消费2个Partition的数据
(Partition 0和Partition
1),另外一个Consumer(consumer2)可消费另外一个Partition(Partition 2)的数据。
  
  
  再增加一个Consumer(consumer3)后,每个Consumer可消费一个Partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2。
  
  
  再增加一个Consumer(consumer4)后,其中3个Consumer可分别消费一个Partition的数据,另外一个Consumer(consumer4)不能消费topic1的任何数据。
  
  
  此时关闭consumer1,其余3个Consumer可分别消费一个Partition的数据。
  
  
  接着关闭consumer2,consumer3可消费2个Partition,consumer4可消费1个Partition。
  
  
  再关闭consumer3,仅存的consumer4可同时消费topic1的3个Partition。
  

  Consumer Rebalance的算法如下:

  • 将目标Topic下的所有Partirtion排序,存于PT
  • 对某Consumer Group下所有Consumer排序,存于CG,第i个Consumer记为Ci
  • N=size(PT)/size(CG),向上取整
  • 解除Ci对原来分配的Partition的消费权(i从0开始)
  • 将第i∗N到(i+1)∗N−1个Partition分配给Ci

  
  目前,最新版(0.8.2.1)Kafka的Consumer
Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发
Consumer Group的Rebalance,具体启动流程如下:

  • High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]
  • /consumers/[consumer group]/ids上注册Watch
  • /brokers/ids上注册Watch
  • 如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上也创建Watch
  • 强制自己在其Consumer Group内启动Rebalance流程

  在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发Consumer
Rebalance。因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer
Group的一致性,当一个Consumer触发了Rebalance时,该Consumer
Group内的其它所有其它Consumer也应该同时触发Rebalance。

  该方式有如下缺陷:

  根据Kafka社区wiki,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(Coordinator)
大体思想是为所有Consumer Group的子集选举出一个Broker作为Coordinator,由它Watch
Zookeeper,从而判断是否有Partition或者Consumer的增减,然后生成Rebalance命令,并检查是否这些Rebalance
在所有相关的Consumer中被执行成功,如果不成功则重试,若成功则认为此次Rebalance成功(这个过程跟Replication
Controller非常类似)。具体方案将在后文中详细阐述。
  

Low Level Consumer

  使用Low Level Consumer (Simple Consumer)的主要原因是,用户希望比Consumer Group更好的控制数据的消费。比如:

  • 同一条消息读多次
  • 只读取某个Topic的部分Partition
  • 管理事务,从而确保每条消息被处理一次,且仅被处理一次

  与Consumer Group相比,Low Level Consumer要求用户做大量的额外工作。

  • 必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息
  • 应用程序需要通过程序获知每个Partition的Leader是谁
  • 必须处理Leader的变化

  使用Low Level Consumer的一般流程如下

  • 查找到一个“活着”的Broker,并且找出每个Partition的Leader
  • 找出每个Partition的Follower
  • 定义好请求,该请求应该能描述应用程序需要哪些数据
  • Fetch数据
  • 识别Leader的变化,并对之作出必要的响应

Consumer重新设计

  根据社区社区wiki,Kafka在0.9.*版本中,重新设计Consumer可能是最重要的Feature之一。本节会根据社区wiki介绍Kafka 0.9.*中对Consumer可能的设计方向及思路。
  

设计方向

简化消费者客户端
 
 部分用户希望开发和使用non-java的客户端。现阶段使用non-java发SimpleConsumer比较方便,但想开发High Level
Consumer并不容易。因为High Level
Consumer需要实现一些复杂但必不可少的失败探测和Rebalance。如果能将消费者客户端更精简,使依赖最小化,将会极大的方便non-
java用户实现自己的Consumer。
  
中心Coordinator
 
 如上文所述,当前版本的High Level Consumer存在Herd Effect和Split
Brain的问题。如果将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,那么这两个问题即可解决。同时还可大大减少
Zookeeper的负载,有利于Kafka Broker的Scale Out。
  
允许手工管理offset
 
 一些系统希望以特定的时间间隔在自定义的数据库中管理Offset。这就要求Consumer能获取到每条消息的metadata,例如
Topic,Partition,Offset,同时还需要在Consumer启动时得到每个Partition的Offset。实现这些,需要提供新的
Consumer
API。同时有个问题不得不考虑,即是否允许Consumer手工管理部分Topic的Offset,而让Kafka自动通过Zookeeper管理其它
Topic的Offset。一个可能的选项是让每个Consumer只能选取1种Offset管理机制,这可极大的简化Consumer
API的设计和实现。
  
Rebalance后触发用户指定的回调
  一
些应用可能会在内存中为每个Partition维护一些状态,Rebalance时,它们可能需要将该状态持久化。因此该需求希望支持用户实现并指定一些
可插拔的并在Rebalance时触发的回调。如果用户使用手动的Offset管理,那该需求可方便得由用户实现,而如果用户希望使用Kafka提供的自
动Offset管理,则需要Kafka提供该回调机制。

非阻塞式Consumer API
  该需求源于那些实现高层流处理操作,如filter by, group by, join等,的系统。现阶段的阻塞式Consumer几乎不可能实现Join操作。

##如何通过中心Coordinator实现Rebalance
  成功Rebalance的结果是,被订阅的所有Topic的每一个
Partition将会被Consumer
Group内的一个(有且仅有一个)Consumer拥有。每一个Broker将被选举为某些Consumer
Group的Coordinator。某个Cosnumer Group的Coordinator负责在该Consumer
Group的成员变化或者所订阅的Topic的Partititon变化时协调Rebalance操作。

Consumer
  1)
Consumer启动时,先向Broker列表中的任意一个Broker发送ConsumerMetadataRequest,并通过
ConsumerMetadataResponse获取它所在Group的Coordinator信息。ConsumerMetadataRequest
和ConsumerMetadataResponse的结构如下

12345678910
ConsumerMetadataRequest{  GroupId                => String}

ConsumerMetadataResponse{  ErrorCode              => int16  Coordinator            => Broker}

  2)Consumer连接到Coordinator并发送HeartbeatRequest,如果返回的HeartbeatResponse没有任何错误码,Consumer继续fetch数据。若其中包含IllegalGeneration错误码,即说明Coordinator已经发起了Rebalance操作,此时Consumer停止fetch数据,commit offset,并发送JoinGroupRequest给它的Coordinator,并在JoinGroupResponse中获得它应该拥有的所有Partition列表和它所属的Group的新的Generation ID。此时Rebalance完成,Consumer开始fetch数据。相应Request和Response结构如下

123456789101112131415161718192021222324252627282930
HeartbeatRequest{  GroupId                => String  GroupGenerationId      => int32  ConsumerId             => String}

HeartbeatResponse{  ErrorCode              => int16}

JoinGroupRequest{  GroupId                     => String  SessionTimeout              => int32  Topics                      => [String]  ConsumerId                  => String  PartitionAssignmentStrategy => String}

JoinGroupResponse{  ErrorCode              => int16  GroupGenerationId      => int32  ConsumerId             => String  PartitionsToOwn        => [TopicName [Partition]]}TopicName => StringPartition => int32

Consumer状态机
  
  Down:Consumer停止工作
  Start up & discover coordinator:Consumer检测其所在Group的Coordinator。一旦它检测到Coordinator,即向其发送JoinGroupRequest。
  Part of a group:该状态下,Consumer已经是该Group的成员,并周期性发送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration错误码,则转换到Stopped Consumption状态。若连接丢失,HeartbeatResponse包含NotCoordinatorForGroup错误码,则转换到Rediscover coordinator状态。
  Rediscover coordinator:该状态下,Consumer不停止消费而是尝试通过发送ConsumerMetadataRequest来探测新的Coordinator,并且等待直到获得无错误码的响应。
  Stopped consumption:该状态下,Consumer停止消费并提交offset,直到它再次加入Group。
  
  
  
故障检测机制
  Consumer成功加入Group后,Consumer和相应的Coordinator同时开始故障探测程序。Consumer向Coordinator发起周期性的Heartbeat(HeartbeatRequest)并等待响应,该周期为 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms内未收到HeartbeatResponse,或者发现相应的Socket channel断开,它即认为Coordinator已宕机并启动Coordinator探测程序。若Coordinator在session.timeout.ms内没有收到一次HeartbeatRequest,则它将该Consumer标记为宕机状态并为其所在Group触发一次Rebalance操作。
  Coordinator Failover过程中,Consumer可能会在新的Coordinator完成Failover过程之前或之后发现新的Coordinator并向其发送HeatbeatRequest。对于后者,新的Cooodinator可能拒绝该请求,致使该Consumer重新探测Coordinator并发起新的连接请求。如果该Consumer向新的Coordinator发送连接请求太晚,新的Coordinator可能已经在此之前将其标记为宕机状态而将之视为新加入的Consumer并触发一次Rebalance操作。

Coordinator
  1)稳定状态下,Coordinator通过上述故障探测机制跟踪其所管理的每个Group下的每个Consumer的健康状态。
  2)刚启动时或选举完成后,Coordinator从Zookeeper读取它所管理的Group列表及这些Group的成员列表。如果没有获取到Group成员信息,它不会做任何事情直到某个Group中有成员注册进来。
  3)在Coordinator完成加载其管理的Group列表及其相应的成员信息之前,它将为HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete错误码。此时,Consumer会重新发送请求。
  4)Coordinator会跟踪被其所管理的任何Consumer Group注册的Topic的Partition的变化,并为该变化触发Rebalance操作。创建新的Topic也可能触发Rebalance,因为Consumer可以在Topic被创建之前就已经订阅它了。
  Coordinator发起Rebalance操作流程如下所示。

Coordinator状态机
  
  Down:Coordinator不再担任之前负责的Consumer Group的Coordinator
  Catch up:该状态下,Coordinator竞选成功,但还未能做好服务相应请求的准备。
  Ready:该状态下,新竞选出来的Coordinator已经完成从Zookeeper中加载它所负责管理的所有Group的metadata,并可开始接收相应的请求。
  Prepare for rebalance:该状态下,Coordinator在所有HeartbeatResponse中返回IllegalGeneration错误码,并等待所有Consumer向其发送JoinGroupRequest后转到Rebalancing状态。
  Rebalancing:该状态下,Coordinator已经收到了JoinGroupRequest请求,并增加其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功后,它会等待接收包含新的Consumer Generation ID的HeartbeatRequest,并转至Ready状态。

Coordinator Failover
  如前文所述,Rebalance操作需要经历如下几个阶段
  1)Topic/Partition的改变或者新Consumer的加入或者已有Consumer停止,触发Coordinator注册在Zookeeper上的watch,Coordinator收到通知准备发起Rebalance操作。
  2)Coordinator通过在HeartbeatResponse中返回IllegalGeneration错误码发起Rebalance操作。
  3)Consumer发送JoinGroupRequest
  4)Coordinator在Zookeeper中增加Group的Generation ID并将新的Partition分配情况写入Zookeeper
  5)Coordinator发送JoinGroupResponse
  
  在这个过程中的每个阶段,Coordinator都可能出现故障。下面给出Rebalance不同阶段中Coordinator的Failover处理方式。
  1)如果Coordinator的故障发生在第一阶段,即它收到Notification并未来得及作出响应,则新的Coordinator将从Zookeeper读取Group的metadata,包含这些Group订阅的Topic列表和之前的Partition分配。如果某个Group所订阅的Topic数或者某个Topic的Partition数与之前的Partition分配不一致,亦或者某个Group连接到新的Coordinator的Consumer数与之前Partition分配中的不一致,新的Coordinator会发起Rebalance操作。
  2)如果失败发生在阶段2,它可能对部分而非全部Consumer发出带错误码的HeartbeatResponse。与第上面第一种情况一样,新的Coordinator会检测到Rebalance的必要性并发起一次Rebalance操作。如果Rebalance是由Consumer的失败所触发并且Cosnumer在Coordinator的Failover完成前恢复,新的Coordinator不会为此发起新的Rebalance操作。
  3)如果Failure发生在阶段3,新的Coordinator可能只收到部分而非全部Consumer的JoinGroupRequest。Failover完成后,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。与第1种情况类似,它将发起新一轮的Rebalance操作。
  4)如果Failure发生在阶段4,即它将新的Group Generation ID和Group成员信息写入Zookeeper后。新的Generation ID和Group成员信息以一个原子操作一次性写入Zookeeper。Failover完成后,Consumer会发送HeartbeatRequest给新的Coordinator,并包含旧的Generation ID。此时新的Coordinator通过在HeartbeatResponse中返回IllegalGeneration错误码发起新的一轮Rebalance。这也解释了为什么每次HeartbeatRequest中都需要包含Generation ID和Consumer ID。
  5)如果Failure发生在阶段5,旧的Coordinator可能只向Group中的部分Consumer发送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已经失效的Coordinator发送HeartbeatRequest或者提交Offset时会检测到它已经失败。此时,它将检测新的Coordinator并向其发送带有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer将检测新的Coordinator并向其发送JoinGroupRequest,这将促使新的Coordinator发起新一轮的Rebalance。

时间: 2024-10-14 08:30:06

Kafka设计解析(四)- Kafka Consumer设计解析的相关文章

kafka入门二:Kafka的设计思想、理念

本节主要从整体角度介绍Kafka的设计思想,其中的每个理念都可以深入研究,以后我可能会发专题文章做深入介绍,在这里只做较概括的描述以便大家更好的理解Kafka的独特之处.本节主要涉及到如下主要内容: Kafka设计基本思想 Kafka中的数据压缩 Kafka消息转运过程中的可靠性 Kafka集群镜像复制 Kafka 备份机制 一.kafka由来 由于对JMS日常管理的过度开支和传统JMS可扩展性方面的局限,LinkedIn(www.linkedin.com)开发了Kafka以满足他们对实时数据流

中间件 | kafka简介、使用场景、设计原理、主要配置及集群搭建

开源Java学习 公众号 一.入门 1.简介 Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker.无论是kafka集群,还是producer和c

Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 本文的图片是通过PPT截图出的,读者如果修改意见请联系我 一.Consumer为何需要实现多线程 假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息.该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用AP

得用户者得天下  解析明基的保时捷设计水准

谈到保时捷,相信很多人都非常了解,世界名车啊,只有高富帅才玩儿得起.不过,如果由保时捷的设计师来设计一款显示器,水准一流.质地厚道,且价格亲民,你怎么看? 如最近京东上热销的明基GW2760护眼显示器正是这样一款产品,它出自德国保时捷汽车车体设计师的王千睿大师之手,近十年来由他执掌的明基集团数字时尚设计团队累计摘获375项世界级设计大奖,包括德国iF.reddot.美国IDEA及日本G-mark等,获奖记录雄踞全球华人企业之冠. 在这种情况下,GW2760自然备受期待.正所谓"隔行如隔山&quo

四种生成和解析XML文档的方法详解(介绍+优缺点比较+示例)

四种生成和解析XML文档的方法详解(介绍+优缺点比较+示例) 众所周知,现在解析XML的方法越来越多,但主流的方法也就四种,即:DOM.SAX.JDOM和DOM4J 下面首先给出这四种方法的jar包下载地址 DOM:在现在的Java JDK里都自带了,在xml-apis.jar包里 SAX:http://sourceforge.net/projects/sax/ JDOM:http://jdom.org/downloads/index.html DOM4J:http://sourceforge.

c# 轻量级 ORM 框架 之 Model解析 (四)

关于orm框架设计,还有必要说的或许就是Model解析了,也是重要的一个环节,在实现上还是相对比较简单的. Model解析,主要用到的技术是反射了,即:把类的属性与表的字段做映射. 把自己的设计及实现思路写出来也希望能有人给很好的优化建议,同时也给新手一点启发吧. 首先先给Model属性定义特性,先普及一下"特性"的概念和为什么用特性(Attribute). 简单来说,特性是给一个类,或方法,或属性 打上一个标记(或者叫附加信息),具体理解还是看例子比较好吧, 在做类与表之间映射时,我

QT开发(四十四)——流方法解析XML

QT开发(四十四)--流方法解析XML 一.流方法解析XML简介 QT 4.3开始,QT引入了两个新的类来读取和写入XML文档:QXmlStreamReader和QXmlStreamWriter. QXmlStreamReader类提供了一个快速的解析器通过一个简单的流API来读取良构的XML文档,是作为QT的SAX解析器的替代者出现的,比SAX解析器更快更方便.    QXmlStreamReader可以从QIODevice或QByteArray中读取数据.QXmlStreamReader以一

Spring Security 解析(四) ——短信登录开发

Spring Security 解析(四) -- 短信登录开发 ??在学习Spring Cloud 时,遇到了授权服务oauth 相关内容时,总是一知半解,因此决定先把Spring Security .Spring Security Oauth2 等权限.认证相关的内容.原理及设计学习并整理一遍.本系列文章就是在学习的过程中加强印象和理解所撰写的,如有侵权请告知. 项目环境: JDK1.8 Spring boot 2.x Spring Security 5.x 一.如何在Security的基础上

SLAM+语音机器人DIY系列:(四)差分底盘设计——2.stm32主控软件设计

摘要 运动底盘是移动机器人的重要组成部分,不像激光雷达.IMU.麦克风.音响.摄像头这些通用部件可以直接买到,很难买到通用的底盘.一方面是因为底盘的尺寸结构和参数是要与具体机器人匹配的:另一方面是因为底盘包含软硬件整套解决方案,是很多机器人公司的核心技术,一般不会随便公开.出于强烈的求知欲与学习热情,我想自己DIY一整套两轮差分底盘,并且将完整的设计过程公开出去供大家学习.说干就干,本章节主要内容: 1.stm32主控硬件设计 2.stm32主控软件设计 3.底盘通信协议 4.底盘ROS驱动开发