Kafka实践:到底该不该把不同类型的消息放在同一个主题中?

Kafka 主题最重要的一个功能是可以让消费者指定它们想要消费的消息子集。在极端情况下,将所有数据放在同一个主题中可能不是一个好主意,因为这样消费者就无法选择它们感兴趣的事件——它们需要消费所有的消息。另一种极端情况,拥有数百万个不同的主题也不是一个好主意,因为 Kafka 的每个主题都是有成本的,拥有大量主题会损害性能。

实际上,从性能的角度来看,分区数量才是关键因素。在 Kafka 中,每个主题至少对应一个分区,如果你有 n 个主题,至少会有 n 个分区。不久之前,Jun Rao 写了一篇博文,解释了拥有多个分区的成本(端到端延迟、文件描述符、内存开销、发生故障后的恢复时间)。根据经验,如果你关心延迟,那么每个节点分配几百个分区就可以了。如果每个节点的分区数量超过成千上万个,就会造成较大的延迟。

关于性能的讨论为设计主题结构提供了一些指导:如果你发现自己有数千个主题,那么将一些细粒度、低吞吐量的主题合并到粗粒度主题中可能是个明智之举,这样可以避免分区数量蔓延。

然而,性能并不是我们唯一关心的问题。在我看来,更重要的是主题结构的数据完整性和数据模型。我们将在本文的其余部分讨论这些内容。

主题等于相同类型事件的集合?

人们普遍认为应该将相同类型的事件放在同一主题中,不同的事件类型应该使用不同的主题。这种思路让我们联想到关系型数据库,其中表是相同类型记录的集合,于是我们就有了数据库表和 Kafka 主题之间的类比。

Confluent Avro Schema Registry 进一步强化了这种概念,因为它鼓励你对主题的所有消息使用相同的 Avro 模式(schema)。模式可以在保持兼容性的同时进行演化(例如通过添加可选字段),但所有消息都必须符合某种记录类型。稍后我会再回过头来讨论这个问题。

对于某些类型的流式数据,例如活动事件,要求同一主题中所有消息都符合相同的模式,这是合理的。但是,有些人把 Kafka 当成了数据库来用,例如事件溯源,或者在微服务之间交换数据。对于这种情况,我认为是否将主题定义为具有相同模式的消息集合就不那么重要了。这个时候,更重要的是主题分区中的消息必须是有序的。

想象一下这样的场景:你有一个实体(比如客户),这个实体可能会发生许多不同的事情,比如创建客户、客户更改地址、客户向帐户中添加新的信用卡、客户发起客服请求,客户支付账单、客户关闭帐户。

这些事件之间的顺序很重要。例如,我们希望其他事件必须在创建客户之后才能发生,并且在客户关闭帐户之后不能再发生其他事件。在使用 Kafka 时,你可以将它们全部放在同一个主题分区中来保持它们的顺序。在这个示例中,你可以使用客户 ID 作为分区的键,然后将所有事件放在同一个主题中。它们必须位于同一主题中,因为不同的主题对应不同的分区,而 Kafka 是不保证分区之间的顺序的。

顺序问题

如果你为 customerCreated、customerAddressChanged 和 customerInvoicePaid 事件使用了不同的主题,那么这些主题的消费者可能就看不到这些事件之间的顺序。例如,消费者可能会看到一个不存在的客户做出的地址变更(这个客户尚未创建,因为相应的 customerCreated 事件可能发生了延迟)。

如果消费者暂停一段时间(比如进行维护或部署新版本),那么事件出现乱序的可能性就更高了。在消费者停止期间,事件继续发布,并且这些事件被存储在特定定的主题分区中。当消费者再次启动时,它会消费所有积压在分区中的事件。如果消费者只消费一个分区,那就没问题:积压的事件会按照它们存储的顺序依次被处理。但是,如果消费者同时消费几个主题,就会按任意顺序读取主题中数据。它可以先读取积压在一个主题上的所有数据,然后再读取另一个主题上积压的数据,或者交错地读取多个主题的数据。

因此,如果你将 customerCreated、customerAddressChanged 和 customerInvoicePaid 事件放在三个单独的主题中,那么消费者可能会在看到 customerCreated 事件之前先看到 customerAddressChanged 事件。因此,消费者很可能会看到一个客户的 customerAddressChanged 事件,但这个客户却未被创建。

你可能会想到为每条消息附加时间戳,并用它来对事件进行排序。如果你将事件导入数据仓库,再对事件进行排序,或许是没有问题的。但在流数据中只使用时间戳是不够的:在你收到一个具有特定时间戳的事件时,你不知道是否需要等待具有较早时间戳的事件,或者所有之前的事件是否已经在当前事情之前到达。依靠时钟进行同步通常会导致噩梦,有关时钟问题的更多详细信息,请参阅“Designing Data-Intensive Applications”的第 8 章。

何时拆分主题,何时合并主题?

基于这个背景,我将给出一些经验之谈,帮你确定哪些数据应该放在同一主题中,以及哪些数据应该放在不同的主题中。

首先,需要保持固定顺序的事件必须放在同一主题中(并且需要使用相同的分区键)。如果事件属于同一实体,那么事件的顺序就很重要。因此,我们可以说,同一实体的所有事件都应该保存在同一主题中。
如果你使用事件溯源进行数据建模,事件的排序尤为重要。聚合对象的状态是通过以特定的顺序重放事件日志而得出的。因此,即使可能存在不同的事件类型,聚合所需要的所有事件也必须在同一主题中。
对于不同实体的事件,它们应该保存在相同的主题中还是不同的主题中?我想说,如果一个实体依赖于另一个实体(例如一个地址属于一个客户),或者经常需要同时用到它们,那么它们也应该保存在同一主题中。另一方面,如果它们不相关,并且属于不同的团队,那么最好将它们放在不同的主题中。
另外,这也取决于事件的吞吐量:如果一个实体类型的事件吞吐量比其他实体要高很多,那么最好将它分成几个主题,以免让只想消费低吞吐量实体的消费者不堪重负(参见第 4 点)。不过,可以将多个具有低吞吐量的实体合并起来。
如果一个事件涉及多个实体该怎么办?例如,订单涉及到产品和客户,转账至少涉及到两个账户。
我建议在一开始将这些事件记录为单个原子消息,而不是将其分成几个属于不同主题的消息。在记录事件时,最好可以保持原封不动,即尽可能保持数据的原始形式。你可以随后使用流式处理器来拆分复合事件,但如果过早进行拆分,想要重建原始事件会难得多。如果能够为初始事件分配一个唯一 ID(例如 UUID)就更好了,之后如果你要拆分原始事件,可以带上这个 ID,从而可以追溯到每个事件的起源。
看看消费者需要订阅的主题数量。如果几个消费者都订阅了一组特定的主题,这表明可能需要将这些主题合并在一起。
如果将细粒度的主题合并成粗粒度的主题,一些消费者可能会收到他们不需要的事件,需要将其忽略。这不是什么大问题:消费消息的成本非常低,即使最终忽略了一大半的事件,总的成本可能也不会很大。只有当消费者需要忽略绝大多数消息(例如 99.9%是不需要的)时,我才建议将大容量事件流拆分成小容量事件流。
用作 Kafka Streams 状态存储(KTable)的变更日志主题应该与其他主题分开。在这种情况下,这些主题由 Kafka Streams 流程来管理,所以不应该包含其他类型的事件。
最后,如果基于上述的规则依然无法做出正确的判断,该怎么办?那么就按照类型对事件进行分组,把相同类型的事件放在同一个主题中。不过,我认为这条规则是最不重要的。
模式管理

如果你的数据是普通文本(如 JSON),而且没有使用静态的模式,那么就可以轻松地将不同类型的事件放在同一个主题中。但是,如果你使用了模式编码(如 Avro),那么在单个主题中保存多种类型的事件则需要考虑更多的事情。

如上所述,基于 Avro 的 Kafka Confluent Schema Registry 假设了一个前提,即每个主题都有一个模式(更确切地说,一个模式用于消息的键,一个模式用于消息的值)。你可以注册新版本的模式,注册表会检查模式是否向前和向后兼容。这样设计的一个好处是,你可以让不同的生产者和消费者同时使用不同版本的模式,并且仍然保持彼此的兼容性。

Confluent 的 Avro 序列化器通过 subject 名称在注册表中注册模式。默认情况下,消息键的 subject 为-key,消息值的 subject 为-value。模式注册表会检查在特定 subject 下注册的所有模式的相互兼容性。

最近,我为 Avro 序列化器提供了一个补丁(https://github.com/confluentinc/schema-registry/pull/680 ),让兼容性检查变得更加灵活。这个补丁添加了两个新的配置选项:key.subject.name.strategy(用于定义如何构造消息键的 subject 名称)和 value.subject.name.strategy(用于定义如何构造消息值的 subject 名称)。它们的值可以是如下几个:

io.confluent.kafka.serializers.subject.TopicNameStrategy(默认):消息键的 subject 名称为-key,消息值为-value。这意味着主题中所有消息的模式必须相互兼容。
io.confluent.kafka.serializers.subject.RecordNameStrategy:subject 名称是 Avro 记录类型的完全限定名。因此,模式注册表会检查特定记录类型的兼容性,而不管是哪个主题。这个设置允许同一主题包含不同类型的事件。
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy:subject 名称是-,其中是 Kafka 主题名,是 Avro 记录类型的完全限定名。这个设置允许同一主题包含不同类型的事件,并进一步对当前主题进行兼容性检查。
有了这个新特性,你就可以轻松地将属于特定实体的所有不同类型的事件放在同一个主题中。现在,你可以自由选择主题的粒度,而不仅限于一个类型对应一个主题。

喜欢小编轻轻点关注哦!

原文地址:http://blog.51cto.com/13952975/2299354

时间: 2024-10-30 09:45:19

Kafka实践:到底该不该把不同类型的消息放在同一个主题中?的相关文章

C++ Primer 学习笔记_37_STL实践与分析(11)--set类型

STL实践与分析 --set类型 引: map容器是键-值对的集合,好比人名为键的地址和电话号码.相反的,set容器类型只是单纯的键的集合.当只想知道一个键是否存在时,使用set容器是最合适的. 除了两种例外情况,set容器支持大部分的map操作,包括下面几种: 1)第10.2节列出的所有通用的容器操作. 2)表10.3描述的构造函数. 3)表10.5描述的 insert操作. 4)表10.6描述的 count和 find操作. 5)表10.7描述的 erase操作. 两种例外包括:set不支持

kafka 分区和副本以及kafaka 执行流程,以及消息的高可用

1.Kafka概览 Apache下的项目Kafka(卡夫卡)是一个分布式流处理平台,它的流行是因为卡夫卡系统的设计和操作简单,能充分利用磁盘的顺序读写特性.kafka每秒钟能有百万条消息的吞吐量,因此很适合实时的数据流处理.例如kafka在线日志收集系统可作为flume的实时消息sink端,再通过kafka的消费者将消息实时写入hbase数据库中. 卡夫卡以topic分类对记录进行存储,每个记录包含key-value和timestamp. 1.1卡夫卡系统的组件.角色 broker: 每个正在运

Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?

1.kafka在高并发的情况下,如何避免消息丢失和消息重复? 消息丢失解决方案: 首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功 消息重复解决方案: 消息可以使用唯一id标识 生产者(ack=all 代表至少成功发送一次) 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 落表(主键或者唯一索引的方式,避免重复数据) 业务逻辑处理(选择唯一主键存储到R

C++ Primer 学习笔记_36_STL实践与分析(10)--map类型(下

STL实践与分析 --map类型(下) 六.查找并读取map中的元素 map容器提供了两个操作:count和find,用于检查某个键是否存在而不会插入该键: 不修改map对象的查询 m.count(k) 返回m中k的出现次数 m.find(k) 如果m容器中存在k索引的元素,则返回指向该元素的迭代器.如果不存在,则返回超出末端的迭代器 1.使用count检查map对象中某键是否存在 因为map容器只允许一个键对应一个实例,所以,对于map对象,count成员的返回值只能是1或0. 如果返回值非0

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。

需求: 目前kafka的topic上有一批数据,这些数据被分配到9个不同的partition中(就是发布时key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)这些数据的唯一键值:int_id+start_time,其中int_id和start_time是topic record中的记录.这9组数据按照唯一键值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.prim

同一个磁盘中从根分区缩小空间到扩展SWAP分区空间实践步骤

操作背景: 公司中有一部分主机在开始设计的时候,并未考虑到SWAP的大小,只是分配了物理内存的2倍-4G,导致后期应用部署完成后对系统的SWAP分区要求达不到标准.因此经过部分和领导讨论后,决定在同一块磁盘上来减少根分区的大小,并使用缩减出来的大小去增加swap分区大小的操作,这中间要保障数据的完整和业务应用不丢失.这样做既可以保障数据盘不受影响,也保证应用系统在完成后可以正常使用,也减少了以后的一些风险(比如要使用额外的磁盘来增加swap空间大小需要考虑额外磁盘的稳定等). 操作分析: 由于根

【Netty4 简单项目实践】六、断掉未鉴权的TCP长连接--ChannelHandelContext中的定时器用法

在TCP长连接模式下,我们需要及时释放那些未授权的TCP链接,让系统运行得更稳健一些. 首先是connect上来的TCP报文需要设置一个存活期,通过在pipleline上设置超时处理器ReadTimeoutHandler ch.pipeline().addLast(new ReadTimeoutHandler(120)); 使得一个TCP在120秒内没有收到数据就断掉. 这样做的目的是让连接者必须发TCP报文才能维持连接. 下一步在业务层对ChannelHandlerContext进行鉴权.与H

kafka专题-1

大数据技术之Kafka一 Kafka概述 1.1 Kafka是什么 在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算. 1)Apache Kafka是一个开源消息系统,由Scala写成.是由Apache软件基金会开发的一个开源消息系统项目. 2)Kafka最初是由LinkedIn公司开发,并于2011年初开源.2012年10月从Apache Incubator毕业.该项目的目标是为处理实时数据提供一个统一.高通量.低等待的平台. 3)Kafka是一个分布式消息

Kafka 消息队列系列之分布式消息队列Kafka

介绍 ApacheKafka®是一个分布式流媒体平台.这到底是什么意思呢?我们认为流媒体平台具有三个关键功能:它可以让你发布和订阅记录流.在这方面,它类似于消??息队列或企业消息传递系统.它允许您以容错方式存储记录流.它可以让您在发生记录时处理记录流.什么是卡夫卡好?它被用于两大类的应用程序:构建可在系统或应用程序之间可靠获取数据的实时流数据管道构建实时流应用程序,可以转换或响应数据流要了解卡夫卡如何做这些事情,让我们深入探索卡夫卡的能力.首先几个概念:Kafka作为一个或多个服务器上的集群运行