Kafka单线程Consumer及参数详解

请使用0.9以后的版本:

示例代码

 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);

2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);

3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")

使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重写这个接口来实现 分区变更时的逻辑。如果设置了enable.auto.commit = true 就不用理会这个逻辑。

4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);

5、处理消息(打印了offset key value 这里写处理逻辑)。

6、关闭KafkaConsumer(可以传一个timeout值 等待秒数 默认是30)。

参数详解

bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非自己配置了ip)

deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。

默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定义:定义serializer格式 创建自定义deserializer类实现Deserializer 接口 重写逻辑

?

除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer

还有session.timeout.ms "coordinator检测失败的时间"

是检测consumer挂掉的时间 为了可以及时的rebalance 默认是10秒 可以设置更小的值避免消息延迟。

max.poll.interval.ms "consumer处理逻辑最大时间"

处理逻辑比较复杂的时候 可以设置这个值 避免造成不必要的 rebalance ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。

auto.offset.reset "无位移或者位移越界时kafka的应对策略"

所以如果启动了一个group从头消费 成功提交位移后 重启后还是接着消费 这个参数无效

所以3个值的解释是:

earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费

latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

(注意kafka-0.10.1.X版本之前:?auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、

我们这是说的是新版本:kafka-0.10.1.X版本之后:?auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))

enable.auto.commit 是否自动提交位移

true 自动提交 false需要用户手动提交 有只处理一次需要的 最近设置为false自己控制。

fetch.max.bytes consumer单次获取最大字节数

max.poll.records 单次poll返回的最大消息数

默认500条 如果消费很轻量 可以适当提高这个值 增加消费速度。

hearbeat.interval.ms consumer其他组员感知rabalance的时间

该值必须小于 session.timeout.ms 如果检测到 consumer挂掉 也就根本无法感知rabalance了

connections.max.idle.ms 定期关闭连接的时间

默认是9分钟 可以设置为-1 永不关闭

更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

原文地址:https://www.cnblogs.com/tree1123/p/11362252.html

时间: 2024-08-29 17:58:00

Kafka单线程Consumer及参数详解的相关文章

JVM 参数详解

在一些规模稍大的应用中,Java虚拟机(JVM)的内存设置尤为重要,想在项目中取得好的效率,GC(垃圾回收)的设置是第一步. PermGen space:全称是Permanent Generation space.就是说是永久保存的区域,用于存放Class和Meta信息,Class在被Load的时候被放入该区域Heap space:存放Instance.GC(Garbage Collection)应该不会对PermGen space进行清理,所以如果你的APP会LOAD很多CLASS的话,就很可

jquery ajax 方法及各参数详解

jquery ajax 方法及各参数详解 1.$.ajax() 只有一个参数:参数 key/value 对象,包含各配置及回调函数信息. 参数列表: 参数名 类型 描述 url String (默认: 当前页地址) 发送请求的地址. type String (默认: "GET") 请求方式 ("POST" 或 "GET"), 默认为 "GET".注意:其它 HTTP 请求方法,如 PUT 和 DELETE 也可以使用,但仅部分

Nginx内置变量以及日志格式变量参数详解

Nginx内置变量以及日志格式变量参数详解 $args #请求中的参数值 $query_string #同 $args $arg_NAME #GET请求中NAME的值 $is_args #如果请求中有参数,值为"?",否则为空字符串 $uri #请求中的当前URI(不带请求参数,参数位于$args),可以不同于浏览器传递的$request_uri的值,它可以通过内部重定向,或者使用index指令进行修改,$uri不包含主机名,如"/foo/bar.html". $d

MySQL配置文件mysql.ini参数详解、MySQL性能优化

MySQL配置文件mysql.ini参数详解.MySQL性能优化 my.ini(Linux系统下是my.cnf),当mysql服务器启动时它会读取这个文件,设置相关的运行环境参数. my.ini分为两块:Client Section和Server Section.   Client Section用来配置MySQL客户端参数.   要查看配置参数可以用下面的命令: show variables like '%innodb%'; # 查看innodb相关配置参数 show status like

JQuery中$.ajax()方法参数详解

url: 要求为String类型的参数,(默认为当前页地址)发送请求的地址. type: 要求为String类型的参数,请求方式(post或get)默认为get.注意其他http请求方法,例如put和 delete也可以使用,但仅部分浏览器支持. timeout: 要求为Number类型的参数,设置请求超时时间(毫秒).此设置将覆盖$.ajaxSetup()方法的全局设 置. async:要求为Boolean类型的参数,默认设置为true,所有请求均为异步请求. 如果需要发送同步请求,请将此选项

day01_linux中与Oracle有关的内核参数详解

linux中与Oracle有关的内核参数详解 在安装Oracle的时候需要调整linux的内核参数,但是各参数代表什么含义呢,下面做详细解析. Linux安装文档中给出的最小值: fs.aio-max-nr = 1048576 fs.file-max = 6815744 kernel.shmall = 2097152 kernel.shmmax = 4294967295 kernel.shmmni = 4096 kernel.sem = 250 32000 100 128 net.ipv4.ip

SWFTOOLS PDF2SWF 参数详解

转载至:http://blog.csdn.net/iamduoluo/article/details/6820329 SWFTools提供了一系列将各种文件转成swf的工具: font2swf.exe gif2swf.exe jpeg2swf.exe pdf2swf.exe png2swf.exe wav2swf.exe 具体的功能就顾名思义了. 其中把pdf转成swf的工具就是pdf2swf了.在命令行中运行pdf2swf src.pdf des.swf一般能满足需求.而命令行参数可以通过pd

httpUrlConnection的参数详解

post方式的的请求过程: // 设置是否向httpUrlConnection输出,因为这个是post请求,参数要放在 // http正文内,因此需要设为true, 默认情况下是false; httpUrlConnection.setDoOutput(true); // 设置是否从httpUrlConnection读入,默认情况下是true; httpUrlConnection.setDoInput(true); // Post 请求不能使用缓存 httpUrlConnection.setUse

PHP CURL参数详解

PHP CURL参数详解 curl用法:cookie及post一.cookie用法 <?php $cookie_jar = tempnam('./tmp','cookie'); // login $c=curl_init('http://login_url?username=... curl_setopt($c, CURLOPT_RETURNTRANSFER, 1); curl_setopt($c, CURLOPT_COOKIEJAR, $cookie_jar); curl_exec($c);