Kafka-消费者-偏移量的提交方式

Kafka-消费者-偏移量的提交方式

每次调用poll()方法,它总是返回由生产者写入Kafka但还没有被消费者读取过的记录,可以追踪到哪些记录是被群组里的哪个消费者读取的。

更新分区当前位置的操作叫做提交。

消费者往一个叫做 _consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一个提交的偏移量,然后从偏移量指定的地方继续处理。

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

自动提交偏移量

最简单的方式是让消费者自动提交偏移量。如果enable.auto.commit被设为true,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms控制,默认值是5s。自动提交也是在轮询里进行的,消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用close()方法之前也会进行自动提交)。

手动提交当前偏移量

开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。

把enable.auto.commit设为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量最简单最可靠。这个API会提交由poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。在处理完所有记录后要确保调用了commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。

异步提交当前偏移量

手动提交由一个不足之处,在broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。此时可以使用异步提交API,无需等待broker的相应

Consumer.commitAsync();

异步的缺点:提交失败之后不会进行重试,因为在收到服务器相应的时候,可能有一个更大的偏移量已经提交成功。可以使用异步提交的回调记录下错误信息和偏移量。

同步和异步组合提交

如果一切正常,使用commitAsync()方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。

如果直接关闭消费者,就没有所谓的下一次提交了。在异常处理后使用commitSync()提交。

提交特定的偏移量

上述几种方式的提交偏移量的频率与处理消息批次的频率是一样的,如果poll()方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,可以在调用commitAsync()和commitSync()方法时传进去希望提交的分区和偏移量的map。

代码如下

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @Author FengZhen
 * @Date 2020-04-06 11:07
 * @Description kafka消费者
 */
public class KafkaConsumerTest {
    private static Properties kafkaProps = new Properties();
    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("group.id", "test");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) {
       commitAuto();
    }

    /**
     * 自动提交偏移量
     */
    public static void commitAuto(){
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps);
        //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
        //如:test.*,订阅test相关的所有主题
        consumer.subscribe(Collections.singleton("test_partition"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                }
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出应用前使用close方法关闭消费者。
            //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
            consumer.close();
        }
    }

    /**
     * 同步提交偏移量
     */
    public static void commitSelfSync(){
        //关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
        kafkaProps.put("enable.auto.commit", false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps);
        //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
        //如:test.*,订阅test相关的所有主题
        consumer.subscribe(Collections.singleton("test_partition"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                }
                //同步手动提交偏移量,只要没有发生不可恢复的错误,commitSync方法会一直尝试直至提交成功。
                consumer.commitSync();
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出应用前使用close方法关闭消费者。
            //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
            consumer.close();
        }
    }

    /**
     * 异步提交偏移量
     */
    public static void commitSelfAsync(){
        //关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
        kafkaProps.put("enable.auto.commit", false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps);
        //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
        //如:test.*,订阅test相关的所有主题
        consumer.subscribe(Collections.singleton("test_partition"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                }
                //异步提交
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (null != exception){
                            System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception)));
                        }
                    }
                });
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出应用前使用close方法关闭消费者。
            //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
            consumer.close();
        }
    }

    /**
     * 同步异步结合使用提交偏移量
     */
    public static void commitSelfSyncAndAsync(){
        //关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
        kafkaProps.put("enable.auto.commit", false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps);
        //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
        //如:test.*,订阅test相关的所有主题
        consumer.subscribe(Collections.singleton("test_partition"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                }
                //异步提交(结合下方同步提交)
                consumer.commitAsync();
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //同步提交
            consumer.commitSync();
            //退出应用前使用close方法关闭消费者。
            //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
            consumer.close();
        }
    }

    /**
     * 指定偏移量提交
     */
    public static void commitSelfAppoint(){

        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        int count = 0;

        //关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
        kafkaProps.put("enable.auto.commit", false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProps);
        //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
        //如:test.*,订阅test相关的所有主题
        consumer.subscribe(Collections.singleton("test_partition"));
        System.out.println("==== subscribe success ====");
        try {
            while (true){
                //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                System.out.println("==== data get ====");
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                    if (count % 2 == 0){
                        //每2次提交一次,还可以根据时间间隔来提交
                        consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
                            @Override
                            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                                if (null != exception){
                                    System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, ExceptionUtil.getStackTrace(exception)));
                                }
                            }
                        });
                    }
                    count++;
                }
                //异步提交(结合下方同步提交)
                consumer.commitAsync();
            }
        } catch(Exception e){
            e.printStackTrace();
        } finally {
            //退出应用前使用close方法关闭消费者。
            //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
            consumer.close();
        }
    }

}

原文地址:https://www.cnblogs.com/EnzoDin/p/12642057.html

时间: 2024-11-12 22:36:41

Kafka-消费者-偏移量的提交方式的相关文章

Kafka消费者之提交消息的偏移量

原文链接:https://cloud.tencent.com/developer/article/1462432 一.概述 在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中.把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交. 参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x ,图中也用了 lastCo

Kafka(七)消费者偏移量

在Kafka0.9版本之前消费者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID.新版消费者不在保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中"__consumer_offsets",该主题默认有50个分区,每个分区3个副本,分区数量有参数offset.topic.num.partition设置.通过消费者组ID的哈希值和该参数取模的方式来确定某个消费者组已消费的偏移量保存

kafka 消费者offset记录位置和方式

我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同. 首先来说说消费者如果是根据javaapi来消费,也就是[kafka.javaapi.consumer.ConsumerConnector],我们会配置参数[zookeeper.connect]来消费.这种情况下,消费者的offset会更新到zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录下,例如: [z

Kafka消费者组再均衡问题

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费.Rebalance的过程如下: 第一步:所有成员都向coordinator发送请求,请求入组.一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader. 第二步:leader开始分配消费方

Kafka 系列(四)—— Kafka 消费者详解

一.消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响.Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度.此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段. 需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费

Kafka消费者——API开发

目录 消费者客户端 订阅主题 订阅分区 取消订阅 订阅总结 消息消费 poll ConsumerRecord 位移提交 自动提交 手动提交 控制和关闭消费 指定位移消费 再均衡 消费者拦截器 消费者客户端 消费步骤: 1.配置消费者客户端参数并创建相应的消费者实例. 2.订阅主题. 3.拉取消息并消费 4.提交消费位移 5.关闭消费者实例 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_C

一文精通kafka 消费者的三种语义

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等. (一) 创建topic bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1 (二)

Kafka消费者——结合spring开发

Kafka消费者端 可靠性保证 作为消费端,消费数据需要考虑的是: 1.不重复消费消息 2.不缺失消费消息 自动提交 offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(true) auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s) 手动提交offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(false) 异步提交也个缺点,那就

Spring整合kafka消费者和生产者&amp;redis的步骤

==================================================================================一.整合kafka(生产者)步骤1.导入依赖(pom.xml)2.编写配置文件,修改配置文件的ip和端口号,修改主题(producer.xml)3.如果再ssm项目中可以让spring.xml来加载这个配置文件 <import resource="classpath:XXX.xml" /> 如果是再测试类中如何加