Kafka消费者之提交消息的偏移量

原文链接:https://cloud.tencent.com/developer/article/1462432

一、概述

在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中。把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交

参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x ,图中也用了 lastConsumedOffset 这个单词来标识它。

不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x ,而是 x+1 ,对应上图中的 position ,它表示下一条需要拉取的消息的位置。

KafkaConsumer 类提供了 partition(TopicPartition) 和 committed(TopicPartition) 两个方法来分别获取上面所说的 postion 和 committed offset 的值。这两个方法的定义如下所示:

  • public long position(TopicPartition partition)
  • public OffsetAndMetadata committed(TopicPartition partition)

可通过 TestOffsetAndPosition.java 来测试consumed offset、committed offset、position之间的关系。该 TestOffsetAndPosition.java 文件的地址为:

https://github.com/841809077/hdpproject/blob/master/src/main/java/com/hdp/project/kafka/consumer/TestOffsetAndPosition.java

二、offset 提交的两种方式

1、自动提交

在 Kafka 中默认的消费位移的提交方式为自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true 。

在默认的配置下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

2、手动提交

Kafka 自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,但并没有为开发者留有余地来处理重复消费消息丢失的问题。自动位移提交无法做到精确的位移管理,所以Kafka还提供了手动位移提交的方式,这样就可以使得开发人员对消费位移的管理控制更加灵活。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false 。示例如下:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交又分为同步提交异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。

2.1、同步提交

消费者可以调用 commitSync() 方法,来实现位移的同步提交。

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交,只要没有发生不可回复的错误,它就会阻塞消费者线程直至位移提交完成。

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的。如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync() 的另一个含参方法,具体定义如下:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)

该方法提供了一个 offsets 参数,用来提交指定分区的位移。

2.2、异步提交

与 commitSync() 方法相反,异步提交的方式在执行的时候消费者线程不会被阻塞,可以在提交消费位移的结果还未返回之前就开始新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync() 方法有三个不同的重载方法:

public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)   

第一个无参方法和第三个方法中的 offsets 都很好理解,对照 commitSync() 方法即可。关键是第二个方法与第三个方法的 callback 参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete() 方法。如下图所示:

发送提交请求后可以继续做其它事情。如果提交失败,错误信息和偏移量会被记录下来。

三、同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在 关闭消费者 或 再均衡(分区的所属权从一个消费者转移到另一个消费者的行为) 前的最后一次提交,就要确保能够提交成功。

因此,在消费者关闭前一般会组合使用 commitAsync() 和 commitSync() 。使用 commitAsync() 方式来做每条消费信息的提交(因为该种方式速度更快),最后再使用 commitSync() 方式来做位移提交最后的保证。

try {
    while (true) {
        // 消费者poll并且执行一些操作
        // ...

        // 异步提交,也可使用有回调函数的异步提交。较同步提交速度更快。
        consumer.commitAsync();
    }
} catch (Exception e) {
    logger.error("Unexpected error" , e);
} finally {
    try {
        // 同步提交,来做位移提交最后的保证。
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

四、总结

本文主要讲解了消费者提交消息位移的两种方式,分为:

  • 自动提交
  • 手动提交

而 手动提交 又分为:

  • 同步提交
  • 异步提交

而在一般情况下,建议使用手动的方式:异步和同步组合提交消息位移。因为异步提交不需要等待提交的反馈结果,即可进行新一次的拉取消息操作,速度较同步提交更快。但在最后一次提交消息位移之前,为了保证位移提交成功,还是需要再做一次同步提交操作。

本文参考《Kafka权威指南》与《深入理解Kafka:核心设计与实践原理》,也推荐大家阅读这两本书。

原文地址:https://www.cnblogs.com/fswhq/p/12175172.html

时间: 2024-07-30 08:41:43

Kafka消费者之提交消息的偏移量的相关文章

Kafka消费者手动提交消息偏移

生产者每次调用poll()方法时,它总是返回由生产者写入Kafka但还没有消费的消息,如果消费者一致处于运行状态,那么分区消息偏移量就没什么用处,但是如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费可能分配到新的分区,而不是之前处理的那个,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量制定的地方开始工作.消费者会往一个__consumer_offser的主题发送消息,消息里包含每个分区的偏移量. 1.同步提交 import o

kafka 消费者拉取消息

本文只跟踪消费者拉取消息的流程.对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient. 入口在 KafkaConsumer#pollOnce 中,抽出主要步骤: // 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送 fetcher.sendFetches(); // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll client.poll(pol

[kfaka] Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kaf

转 Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kaf

Apache Kafka:下一代分布式消息系统

[http://www.infoq.com/cn/articles/apache-kafka/]分布式发布-订阅消息系统. Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同:它被设计为一个分布式系统,易于向外扩展:它同时为发布和订阅提供高吞吐量:它支持多订阅者,当失败时能自动平衡消费者:它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kafka的架构

Kafka 系列(四)—— Kafka 消费者详解

一.消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响.Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度.此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段. 需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费

Kafka消费者——API开发

目录 消费者客户端 订阅主题 订阅分区 取消订阅 订阅总结 消息消费 poll ConsumerRecord 位移提交 自动提交 手动提交 控制和关闭消费 指定位移消费 再均衡 消费者拦截器 消费者客户端 消费步骤: 1.配置消费者客户端参数并创建相应的消费者实例. 2.订阅主题. 3.拉取消息并消费 4.提交消费位移 5.关闭消费者实例 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_C

kafka 消费者offset记录位置和方式

我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同. 首先来说说消费者如果是根据javaapi来消费,也就是[kafka.javaapi.consumer.ConsumerConnector],我们会配置参数[zookeeper.connect]来消费.这种情况下,消费者的offset会更新到zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录下,例如: [z

Apache Kafka —一个不同的消息系统

Apache已经发布了Kafka 0.8,也是自从成为Apache软件基金会的顶级项目后Kafka的 第一个主版本. Apache Kafka是发布—订阅消息传递,实现了分布式提交日志,适用于离线和在线消息消费.它最初由LinkedIn开发的消息系统,用于低延迟的收集和发送大 量的事件和日志数据.最新版本包括群集内复制和多数据目录支持.目前请求处理也是异步的,使用请求处理线程的附属线程池来实现.日志文件可以按年龄进行覆 盖,并且日志级别可通过JMX进行动态设置.性能测试工具已提供,帮助解决存在的