Kafka消费者——重要参数配置

目录

  • bootstrap.servers
  • group.id
  • fetch.min.bytes
  • fetch.max.bytes
  • fetch.max.wait.ms
  • max.partition.fetch.bytes
  • max.poll.records
  • connections.max.idle.ms
  • exclude.internal.topics
  • receive.buffer.bytes
  • send.buffer.bytes
  • request.timeout.ms
  • metadata.max.age.ms
  • reconnect.backoff.ms
  • auto.offset.reset
  • enable.auto.commit
  • auto.commit.interval.ms
  • partition.assignment.strategy
  • interceptor.class

bootstrap.servers

broker集群地址,格式:ip1:port,ip2:port...,不需要设定全部的集群地址,设置两个或者两个以上即可。

group.id

消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。

fetch.min.bytes

该参数用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量,默认值为1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。

fetch.max.bytes

该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。

如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成无法消费呢?该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes(对应于主题端参数 max.message.bytes)来设置。

fetch.max.wait.ms

这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待500ms。这个参数的设定和 Consumer 与 Kafka 之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为1048576(B),即1MB。这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。

max.poll.records

这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。

connections.max.idle.ms

这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。

exclude.internal.topics

Kafka 中有两个内部的主题: __consumer_offsets 和 __transaction_state。exclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。

receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果 Consumer 与 Kafka 处于不同的机房,则可以适当调大这个参数值。

send.buffer.bytes

这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

request.timeout.ms

这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为30000(ms)。

metadata.max.age.ms

这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入

reconnect.backoff.ms

这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向 broker 发送的所有请求。

auto.offset.reset

参数值为字符串类型,有效值为“earliest”“latest”“none”,配置为其余值会报出异常

enable.auto.commit

boolean 类型,配置是否开启自动提交消费位移的功能,默认开启

auto.commit.interval.ms

当enbale.auto.commit参数设置为 true 时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔

partition.assignment.strategy

消费者的分区分配策略

interceptor.class

用来配置消费者客户端的拦截器

原文地址:https://www.cnblogs.com/luckyhui28/p/12001833.html

时间: 2024-11-07 21:14:32

Kafka消费者——重要参数配置的相关文章

Kafka生产者——重要参数配置

目录 acks max.request.size retries和retry.backoff.ms connections.max.idele.ms linger.ms receive.buffer.bytes send.buffer.bytes request.timeout.ms enable.idempotence bootstrap.servers acks 这个参数用老指定分区中必须由多少个副本收到消息,之后生产者才会认为这条消息写入是成功的.acks参数有三种类型的值(都是字符串类型

Kafka消费者Consumer常用配置

bootstrap.servers:broker服务器集群列表,格式为 host1:port1, host2:port2 key.deserializer:定义序列化的接口 value.deserializer:实现序列化接口的类 group.id:消费者分组Id consumer.timeout.ms:消费者连接超时时间,默认为5000毫秒 zookeeper.connect:Zookeeper服务器地址,格式为 host1:port1, host2:port2 zookeeper.conne

kafka 消费者offset记录位置和方式

我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同. 首先来说说消费者如果是根据javaapi来消费,也就是[kafka.javaapi.consumer.ConsumerConnector],我们会配置参数[zookeeper.connect]来消费.这种情况下,消费者的offset会更新到zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录下,例如: [z

Kafka 系列(四)—— Kafka 消费者详解

一.消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响.Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度.此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段. 需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费

Kafka消费者——API开发

目录 消费者客户端 订阅主题 订阅分区 取消订阅 订阅总结 消息消费 poll ConsumerRecord 位移提交 自动提交 手动提交 控制和关闭消费 指定位移消费 再均衡 消费者拦截器 消费者客户端 消费步骤: 1.配置消费者客户端参数并创建相应的消费者实例. 2.订阅主题. 3.拉取消息并消费 4.提交消费位移 5.关闭消费者实例 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_C

Kafka消费者——结合spring开发

Kafka消费者端 可靠性保证 作为消费端,消费数据需要考虑的是: 1.不重复消费消息 2.不缺失消费消息 自动提交 offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(true) auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s) 手动提交offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(false) 异步提交也个缺点,那就

Kafka消费者之提交消息的偏移量

原文链接:https://cloud.tencent.com/developer/article/1462432 一.概述 在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中.把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交. 参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x ,图中也用了 lastCo

Spring整合kafka消费者和生产者&redis的步骤

==================================================================================一.整合kafka(生产者)步骤1.导入依赖(pom.xml)2.编写配置文件,修改配置文件的ip和端口号,修改主题(producer.xml)3.如果再ssm项目中可以让spring.xml来加载这个配置文件 <import resource="classpath:XXX.xml" /> 如果是再测试类中如何加

Kafka中Topic级别配置

一.Kafka中topic级别配置 1.Topic级别配置 配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值. 创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic"例子,它设置了2个参数max message size 和 flush rate. (A)创建topic时配置参数 bin/kafka-topics.sh --zookeeper