kafka之consumer参数auto.offset.reset 0.10+

https://blog.csdn.net/dingding_ting/article/details/84862776

https://blog.csdn.net/xianpanjia4616/article/details/84347087

kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中)

kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面)

auto.offset.reset: 可理解为kafka consumer读取数据的策略,本地用的kafka版本为0.10,因此该参数可填earliest|latest|none。

earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

测试前提: 我们主要做的是Flink的Streaming sql,在创建kafka source的时候封装了该参数,查看是否消费数据,我们借助了jmeter和Flink web ui上的metrics等工具。

测试过程:
earliest模式: kafka source的名称为a1
1.在a1中,topic为test1,groupId为0001,0001从未被消费过,数据(24条)提前发送,再启动sql1(select * from a1 ),会从头开始消费,显示24条数据

2.停掉1中所提到的sql1,发送不同的6条数据到kafka中,不更换a1的groupId,再启动sql1(select * from a1 ),会接着上次消费的位置开始往后消费,显示6条数据

latest模式:kafka source的名称为a2
1.在a2中,topic为b,groupId为0002,0002未被消费,数据提前发送,再启动sql2(select * from a2),在jmeter上未看到结果,在flink中查看相关metrics,无数据读入;在不杀掉sql2的前提下,发送一批(8条)数据,只消费后发送的8条数据。

2.停掉1中的sql2,不更换a2中的groupId,发送7条数据到b中,启动sql2,只显示后发送的7条数据

none模式: kafka source的名称为a3
1.在a3中,topic为c,设置groupId为0001(未被消费过),数据提前发送,再启动sql3(select * from a3),sql执行失败,在日志中报错:

2.在a3中,topic为c,设置groupId为0002(被消费过),启动sql3(select * from a3),发送8条数据到c中,jmeter中显示8条数据

原文地址:https://www.cnblogs.com/peterkang202/p/10443991.html

时间: 2024-10-10 01:34:25

kafka之consumer参数auto.offset.reset 0.10+的相关文章

Kafka单线程Consumer及参数详解

请使用0.9以后的版本: 示例代码 Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.p

kafka brokers配置参数详解

基本配置如下: -broker.id-log.dirs-zookeeper.connect Topic-level配置以及其默认值将在下面讨论. Property Default Description broker.id   每个broker都可以用一个唯一的非负整数id进行标识:这个id可以作为broker的"名字",并且它的存在使得broker无须混淆consumers就可以迁移到不同的host/port上.你可以选择任意你喜欢的数字作为id,只要id是唯一的即可. log.di

kafka producer consumer

package demo; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class producer { private final Producer<String, String> producer; public final static

Kafka消费者——重要参数配置

目录 bootstrap.servers group.id fetch.min.bytes fetch.max.bytes fetch.max.wait.ms max.partition.fetch.bytes max.poll.records connections.max.idle.ms exclude.internal.topics receive.buffer.bytes send.buffer.bytes request.timeout.ms metadata.max.age.ms r

Kafka Java consumer动态修改topic订阅

前段时间在Kafka QQ群中有人问及此事--关于Java consumer如何动态修改topic订阅的问题.仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然后调用subscribe进行修改,consumer端必然会抛出异常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access 和KafkaProducer不同的是,KafkaConsumer不

kafka0.9.0及0.10.0配置属性

问题导读 1.borker包含哪些属性?2.Producer包含哪些属性?3.Consumer如何配置? borker(0.9.0及0.10.0)配置Kafka日志本身是由多个日志段组成(log segment).一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据 * borker配置就是指配置server.properties文件 * 最小配置 通常情况下需要在减压缩kafka后,修改config/server.proper

kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

使用0.9开始增加的KafkaProducer和KafkaConsumer. Pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.o

重置kafka topic consumer的offset

如果你在使用Kafka来分发消息,在数据处理的过程中可能会出现处理程序出异常或者是其它的错误,会造成数据丢失或不一致.这个时候你也许会想要通过kafka把数据从新处理一遍,我们知道kafka默认会在磁盘上保存到7天的数据,你只需要把kafka的某个topic的consumer的offset设置为某个值或者是最小值,就可以使该consumer从你设置的那个点开始消费. 查询topic的offset的范围 用下面命令可以查询到topic:DynamicRange broker:SparkMaster

Kafka0.7运行时报错 kafka/javaapi/consumer/ConsumerConnector : Unsupported major.minor version 51.0 解决

目前中央库中 org.apache.kafka 是用jdk1.7编译的, 故跑在1.6的jvm中会报错 解决方案: 1. 下载kafka源码, 本地sbt进行install, 编译前 java -version确认classpath中的jdk版本是1.6 2. 编译打包成功后, 进入当前kafka目录的 core/target/scala_2.8.0/, 找到kafka-0.7.2, 将此jar包通过maven install -file 或者 maven deploy -file 添加到本地或