kafka将消费进度重置到最新的位置

/** *重置kafka消费进度 *参数中需要指定kafka集群中一台broker地址,要重置的topic名称,消费组,以及partition个数 */public static void seekLatest(String broker, String topic, String group, int partitionCnt){
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);
    configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    DefaultKafkaConsumerFactory<Long, byte[]> factory = new DefaultKafkaConsumerFactory<>(configProps, new LongDeserializer(), new ByteArrayDeserializer());
    org.apache.kafka.clients.consumer.KafkaConsumer<Long, byte[]> consumer = (org.apache.kafka.clients.consumer.KafkaConsumer<Long, byte[]>) factory.createConsumer();
    consumer.subscribe(Arrays.asList(topic));

    List<TopicPartition> topicPartitionLists = new ArrayList<>();

    for(int i = 0 ; i < partitionCnt; i ++){
        topicPartitionLists.add(new TopicPartition(topic, i));
    }
    consumer.poll(1);
    Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitionLists);
    for(TopicPartition tp : topicPartitionLongMap.keySet()){
        consumer.seek(tp, topicPartitionLongMap.get(tp));
    }

    consumer.close();
}

原文地址:https://www.cnblogs.com/ZhengQiZHou/p/12627921.html

时间: 2024-10-15 23:44:31

kafka将消费进度重置到最新的位置的相关文章

从外部重置一个运行中consumer group的消费进度

对于0.10.1以上版本的kafka, 如何从外部重置一个运行中的consumer group的进度呢?比如有一个控制台,可以主动重置任意消费组的消费进度重置到12小时之前. 需要这么几个步骤: 1. 加入这个group 2. 踢掉所有其它group memeber 3. try assign all TopicPartition to this client 4. commit offsets 5. leave group 其中第二步是为了让自己当上leader,当然有可能不需要踢掉其它所有成

监控Kafka消费进度

使用Kafka作为消息中间件消费数据时,监控Kafka消费的进度很重要.其中,在监控消费进度的过程中,主要关注消费Lag. 常用监控Kafka消费进度的方法有三种,分别是使用Kafka自带的命令行工具.使用Kafka Consumer API和Kafka自带的JMX监控指标,这里介绍前两种方法. 注: 内网IP:10.12.100.126 10.12.100.127 10.12.100.128 外网IP:47.90.133.76 47.90.133.77 47.90.133.78 用户名:ser

kafka 相关命令 偏移重置

kafka官方文档 https://kafka.apache.org/documentation.html#quickstart kafka 安装文档 https://www.jianshu.com/p/c74e0ec577b0 相关命令: kafka 启动:kafka官方文档 https://kafka.apache.org/documentation.html#quickstart kafka 安装文档 https://www.jianshu.com/p/c74e0ec577b0 主题相关命

kafka查看消费数据

一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-consumer-groups.sh 普通版 查看所有组 要想查询消费数据,必须要指定组.那么线上运行的kafka有哪些组呢?使用以下命令: bin/kafka-consumer-groups.sh --bootstrap-server kafka-1.default.svc.cluster.local

Kafka重复消费和丢失数据研究

Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交. 原因1:强行kill线程,导致消费后的数据,offset没有提交. 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费.例如: try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); }

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

Kafka 温故(五):Kafka的消费编程模型

Kafka的消费模型分为两种: 1.分区消费模型 2.分组消费模型 一.分区消费模型 二.分组消费模型 Producer : package cn.outofmemory.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Hello wo

kafka多线程消费

建立kafka消费类ConsumerRunnable ,实现Runnable接口: import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; impo

kafka 的offset的重置

最近在spark读取kafka消息时,每次读取都会从kafka最新的offset读取.但是如果数据丢失,如果在使用Kafka来分发消息,在数据处理的过程中可能会出现处理程序出异常或者是其它的错误,会造成数据丢失或不一致.这个时候你也许会想要通过kafka把数据从新处理一遍,或者指定kafka的offset读取.kafka默认会在磁盘上保存到7天的数据,你只需要把kafka的某个topic的consumer的offset设置为某个值或者是最小值,就可以使该consumer从你设置的那个点开始消费.