Kafka消费者手动提交消息偏移

生产者每次调用poll()方法时,它总是返回由生产者写入Kafka但还没有消费的消息,如果消费者一致处于运行状态,那么分区消息偏移量就没什么用处,但是如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费可能分配到新的分区,而不是之前处理的那个,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量制定的地方开始工作。消费者会往一个__consumer_offser的主题发送消息,消息里包含每个分区的偏移量。

1.同步提交

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by zhangpeiran on 2018/10/9.
 */
public class MyConsumer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:9092");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id","DemoConsumerGroup");

        //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
        //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
        //置为earliest,表示从分区起始位置读取消息
        properties.put("auto.offset.reset","earliest");

        //设置手动提交消息偏移
        properties.put("enable.auto.commit","false");

        //一次拉取的最大消息条数
        properties.put("max.poll.records",10);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Collections.singletonList("Demo3"));

        int count = 0;
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(10);
                for(ConsumerRecord<String ,String> record : records){
                    count ++;
                    if(count == 50)
                        consumer.commitSync();
                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                }
                System.out.println(count);
            }
        } finally {
            consumer.close();
        }
    }
}

说明:在上述例子中,主题Demo3中已经有100条消息,第一次远行Consumer时,在读取到50条消息时,提交一次偏移量,输出的count值为100;第二次不改变消费group,会从51条开始读取,所以输出的count值为50

2. 异步提交,同步提交时,在broker回应指,会一直阻塞、重试,限制应用的吞吐量,因此可以采用异步提交,异步提交失败时不会重试,因为如果提交失败时因为临时的问题导致的,那么后续的提交总户有成功的。

consumer.commitAsync();

3. 同步、异步组合提交,确保消费者在关闭或者再均衡之前提交成功

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by zhangpeiran on 2018/10/9.
 */
public class MyConsumer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:9092");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id","DemoConsumerGroup");

        //默认值为latest,当消费者读取的分区没有偏移量或偏移量无效时,消费者将从最新的记录开始读
        //当一个消费group第一次订阅主题时,符合这种情况,在Consumer启动之前,Producer生产的数据不会被读取
        //置为earliest,表示从分区起始位置读取消息
        properties.put("auto.offset.reset","earliest");

        //设置手动提交消息偏移
        properties.put("enable.auto.commit","false");

        //一次拉取的最大消息条数
        properties.put("max.poll.records",10);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Collections.singletonList("Demo3"));

        int count = 0;
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(10);
                for(ConsumerRecord<String ,String> record : records){
                    count ++;
                    //if(count == 50)
                        //consumer.commitAsync();
                        //consumer.commitSync();
                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                }
                consumer.commitAsync();
                //System.out.println(count);
            }
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
            //consumer.close();
        }
    }
}

原文地址:https://www.cnblogs.com/darange/p/9768791.html

时间: 2024-07-30 08:41:42

Kafka消费者手动提交消息偏移的相关文章

Kafka消费者之提交消息的偏移量

原文链接:https://cloud.tencent.com/developer/article/1462432 一.概述 在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中.把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交. 参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x ,图中也用了 lastCo

kafka 消费者拉取消息

本文只跟踪消费者拉取消息的流程.对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient. 入口在 KafkaConsumer#pollOnce 中,抽出主要步骤: // 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送 fetcher.sendFetches(); // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll client.poll(pol

kafka 0.10.2 消息消费者

package cn.xiaojf.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Kaf

kafka 消费者offset记录位置和方式

我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同. 首先来说说消费者如果是根据javaapi来消费,也就是[kafka.javaapi.consumer.ConsumerConnector],我们会配置参数[zookeeper.connect]来消费.这种情况下,消费者的offset会更新到zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录下,例如: [z

[kfaka] Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kaf

转 Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kaf

Apache Kafka —一个不同的消息系统

Apache已经发布了Kafka 0.8,也是自从成为Apache软件基金会的顶级项目后Kafka的 第一个主版本. Apache Kafka是发布—订阅消息传递,实现了分布式提交日志,适用于离线和在线消息消费.它最初由LinkedIn开发的消息系统,用于低延迟的收集和发送大 量的事件和日志数据.最新版本包括群集内复制和多数据目录支持.目前请求处理也是异步的,使用请求处理线程的附属线程池来实现.日志文件可以按年龄进行覆 盖,并且日志级别可通过JMX进行动态设置.性能测试工具已提供,帮助解决存在的

Apache Kafka:下一代分布式消息系统

[http://www.infoq.com/cn/articles/apache-kafka/]分布式发布-订阅消息系统. Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同:它被设计为一个分布式系统,易于向外扩展:它同时为发布和订阅提供高吞吐量:它支持多订阅者,当失败时能自动平衡消费者:它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kafka的架构

一文精通kafka 消费者的三种语义

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等. (一) 创建topic bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1 (二)