RocketMQ消费批拉超过32不生效

由于一些原因,我需要RocketMQ消费的时候,一批拉400条,一批处理400条。设置如下:

为了简单验证是否正确,消费如下:

直接通过打印msgs.size()观察情况即可。

现象

实验的topic里面的消息数量很多很多,但是启动消费端,消费端的日志如下:

奇怪啦,明明已经进行了修改

为什么还是每次只能消费32条呢?

调试RocketMQ源码

通过跟踪consumer代码:

这里的确已经设置为400了,那么我们需要跟踪到broker服务端进行查看了。

broker接受到的也是400,我们只有继续跟踪

因为返回为true导致下面的if判断成立,for循环就break了,所有只取了32条数据返回到consumer了:

如果需要让可以一次拉取400条,需要修改broker这里的数据配置才可(其实关于一条消息大小也是类似的,虽然说最大设置16M,但是也是和这里同理需要设置broker相关的数据配置才可生效)。

修改broker的配置文件,添加:maxTransferCountOnMessageInMemory=400 重启broker即可,效果如下:

效果正常,其实如果消息过大,还需要考虑下面几个参数:

备注:也可以不用重启broker,通过命令实时修改:

sh mqadmin updateBrokerConfig -c 集群名称 -n namesrv  -k maxTransferCountOnMessageInMemory -v 400

总结

需要RocketMQ消费的时候,一批拉400条,一批处理400条。设置如下:

还是达不到效果的,还需要设置broker相关的参数,本次里面设置broker的maxTransferCountOnMessageInMemory=400即可,其他情况需要考虑如下等参数。

本文转载自http://www.jiangxinlingdu.com/rocketmq/2018/12/05/pullbatchsize.html

原文地址:https://www.cnblogs.com/king1302217/p/10739348.html

时间: 2024-10-07 17:13:45

RocketMQ消费批拉超过32不生效的相关文章

shell脚本汇总 2 删除创建时间超过32天的文件

shell脚本汇总 2 删除创建时间超过32天的文件 #!/bin/bash #date:2017-04-11 #version:clear_ballcloud_back_v1.0 #contents: removing the  /www/web/ballcloud_bakck/* , if the file is created above 32 day # cd /www/web #sum the size of already delete directory dirsize=`du -

RocketMQ消费端默认注入rocketmqClient原因

spring 依赖注入关系问题

rocketmq消费队列代码

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constant.operationLogGroup); try { consumer.setNamesrvAddr(Constant.rocketQueneAddr); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(Messag

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

RocketMQ 自己的整理和理解

不能有中文路径! 不能有中文路径! 不能有中文路径! 关系 两个接口 interface MQProducer  //生产者接口 { 实现该接口的只有一个 默认的 DefaultMQProducer DefaultMQProducer 实现 MQProducer 接口的时候 还继承了 ClientConfig类 (客户端配置类) 可以配置如  sendMsgTimeout 超时时间  producerGroup  消息最大多少 超过多少压缩等等 关键方法 : send(Message) 发送一个

类比 RocketMq 和 淘宝消息服务:

rocketMq建立监听: 一个groupId下通常会挂载多个consumer实例. 集群订阅方式 (默认):一个监听到之后,另一个consumer实例就不会再监听到(不管在不在一个服务器上). 由于默认集群订阅方式,只能有一个监听到,所以,本地测试和服务器上topic不能一致,否则会影响服务器上监听不到消息.而topic 的不一致导致本地测试和服务端测试,groupID也不一致,但多台服务器上需要多个groupId, 淘宝消息服务: 通过SDK接受消息,Java接口使用说明: public i

SpringBoot如何优雅的使用RocketMQ

MQ,是一种跨进程的通信机制,用于上下游传递消息.在传统的互联网架构中通常使用MQ来对上下游来做解耦合. 举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理. 什么是RocketMQ? 官方说明: 随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈.我们尽力通过节流,断路器或降级来解决此问题,但效果不佳.因此,我们那时开始关注流行的消息传递解决方案Kafka.

kafka一直rebalance故障,重复消费

今天我司线上kafka消息代理出现错误日志,异常rebalance,而且平均间隔2到3分钟就会rebalance一次,分析日志发现比较严重.错误日志如下 08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already r

RocketMQ之Consumer

一.Consumer 介绍 1.1 核心参数 * consumerGroup:消费者组名 * MessageModel:消息模型,定义了消息传递到消费者的方式,默认是 MessageModel.CLUSTERING * MessageModel.BROADCASTING:广播 * MessageModel.CLUSTERING:集群 * consumeFromWhere: 消费者开始消费的位置,默认值是 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET * Co