RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个

问题现象

RocketMQ3.2.2版本,测试时尝试发送消息时自动创建Topic,设置了队列数量为8:

producer.setDefaultTopicQueueNums(8);

同时设置broker服务器的配置文件broker.properties:

defaultTopicQueueNums=16

但实际创建后从控制台及后台打印代码观察到该Topic只创建了4个队列,反复重试确认发送消息时自动创建Topic,最大创建4个队列。

查找原因

服务端与客户端配置对比

  阅读源码,在TopicConfigManager的createTopicInSendMessageMethod方法,有对比TopicConfig对象中的队列数和客户端设定队列数,并选择其中较小者为新建Topic队列数的逻辑:

int queueNums = clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig.getWriteQueueNums() : clientDefaultTopicQueueNums;

定位问题在服务端TopicConfig

打印这两个变量:

客户队列数clientDefaultTopicQueueNums为8,正确;

而defaultTopicConfig.getWriteQueueNums()为4,而非broker.properties中设定的16;

由可以确定是问题出在defaultTopicConfig上。

defaultTopicConfig数据来源

defaultTopicConfig是从ConcurrentHashMap<String, TopicConfig> topicConfigTable中取得,如下:

TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);

而defaultTopic默认值为MixAll.DEFAULT_TOPIC=“TBW102”。

  为了确认topicConfigTable中的为MixAll.DEFAULT_TOPIC的Config对象属性值的真实来源,继续阅读源码,发现borker有两处改写DEFAULT_TOPIC的Config对象的位置:

  一处是TopicConfigManager的构造方法,在borker服务器启动时运行,会读取broker.properties里的配置,此时DEFAULT_TOPIC的Config对象里的DefaultQueueNums为正确的我所配置的16;

  一处是在BrokerController的initialize方法里调用了TopicConfigManager.load方法:

  该load方法继承自ConfigManager类,读取了$ROCKETMQ_HOME\store\config下保存的配置信息,并调用抽象方法decode(),配置信息作为json字符串参数传入到decode();

  TopicConfigManager类的decode实现方法里,读取了$ROCKETMQ_HOME\store\config\topics.json里的配置信息,并覆写到topicConfigTable,而此前生成的topics.json的“TBW102”的配置信息里的writeQueueNums及readQueueNums均为4。

最终结论

  在发送消息自动创建Topic时,对于此前已运行的borker服务器,修改配置文件的defaultTopicQueueNums属性的值不起作用。

  因为发送消息自动创建Topic的实现里,队列数取小对比操作的变量——defaultTopicConfig写在topics.json的配置信息里的writeQueueNums及readQueueNums,读取自Topics.json,所以即使修改配置文件并重启borker服务器后也不会改变。而服务端最终会用topics.json的值覆盖发送消息自动创建Topic时的TopicConfig配置信息。

阿里的解释

队列是资源,所以管控权会放到服务器。

但是每个用户的默认策略又不一样,所以会有一个默认topic作为模板,在未创建默认topic前,系统会自动创建一个。

这个可以占到运维的角度思考,例如你运维了10个集群,为1000个用户服务。有些用户需要动态的创建topic,但是不能给他足够的权限,想创建多少创建多少。

所有会给他一个模板的topic,就是defaultTopic,动态创建topic继承于defaultTopic配置,队列数不能超过defaultTopic。

解决办法

  1. 通过producer.createTopic方法创建;
  2. 通过控制台方式创建;
  3. 修改metaq源码重新编译borker,使用broker的配置信息覆盖defaultTopic的配置信息。
时间: 2024-11-01 00:18:10

RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个的相关文章

kafka通过控制台模拟消息发送和消息接收正常,但是通过javaAPI操作生产者发送消息不成功 消费者接收不到数据解决方案?

通过命令行工具(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到. 但是java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息.仔细检查了下代码中IP.端口都没有写错. 解决办法将kafka/config/server.properties文件中advertise

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产

微信企业号开发—发送消息

开始回调模式后我们就要实现聊天功能了.平时使用微信聊天可以发送文本消息.语音.图片.视频等,这里只实现了其中的一些功能和大家分享. 一.与微信企业号建立连接 1.企业应用调用企业号提供的接口,管理或查询企业号后台所管理的资源.或给成员发送消息等,以下称主动调用模式. 2.企业号把用户发送的消息或用户触发的事件推送给企业应用,由企业应用处理,以下称回调模式. 3.用户在微信中阅读企业应用下发的H5页面,该页面可以调用微信提供的原生接口,使用微信开放的终端能力,以下称JSAPI模式. 这是微信企业号

Disruptor多个消费者不重复处理生产者发送过来的消息

1.定义事件事件(Event)就是通过 Disruptor 进行交换的数据类型. package com.ljq.disruptor; import java.io.Serializable; /** * 定义事件数据,本质是个普通JavaBean * * @author jqlin */ @SuppressWarnings("serial") public class LongEvent implements Serializable { private long value; pu

如何利用工具自动通过百度网盘好友请求并发送消息或文件

在百度网盘营销过程中,特别是资源类相关的网盘帐号,有时需要在自动通过好友请求并发送一些文字消息如(资源介绍)以及公众号二维码图片之类的需求,如果手工操作则非常痛苦,下面介绍如何利用工具来达到自动通过好友请求并发送文字消息或文件. 一,首先下载工具“https://pan.baidu.com/s/13yPBCs9Et_tQ9HVVCu5b2A”: 二,在帐号管理中登录并获取COOKIE,保存后,右键帐号登录. ? 第三步:切换到自动通过好友请求选项卡 ? ①设置每次间隔运行时间,如果要发送消息则勾

Golang使用amqp发送消息

1.为什么使用信道(channel)而不使用TCP连接发送AMQP命令? 对操作系统来说频繁的建立和销毁TCP连接开销非常昂贵,而操作系统每秒建立的连接是有上限的,性能瓶颈不可避免,而只建立一条TCP连接无疑是一个很好的方案,在这条连接当中建立多条信道与RabbitMQ进行私密通信,相当于光纤电缆一样,一条电缆有多条光束,信道是没有限制的 2.队列 1)AMQP的命令basic.consume与basic.get 如果需要消息一到达队列就自动接收的话,应该使用basic.consume basi

Oozie 生成JMS消息并向 JMS Provider发送消息过程分析

一,涉及到的工程 从官网下载源码,mvn 编译成 Eclipse工程文件: 对于JMS消息这一块,主要涉及到两个工程: oozie-core工程有问题的原因是还需要一些其他的依赖工程未导入: 二,Oozie 生成 JMS消息 主要涉及到的一些类 oozie-core 工程中的: oozie-client工程中的: 三,相关代码: 对于Oozie Server而言,它是消息的生产者.在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就

ActiveMQ实例2--Spring JMS发送消息

参考文章:http://my.oschina.net/xiaoxishan/blog/381209#OSC_h3_7 一,步骤参照参考文献 二.新建的项目 三.补充 web.xml 1 <?xml version="1.0" encoding="UTF-8"?> 2 <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" 3 xmlns

窗口发送消息参数详解

//    窗口.发送消息    函数功能: 将指定的消息发送到一个窗口,同win32 api 里面的SendMessage等同的效果 中文函数原型: 发送消息(hwnd,msg,wparam,iparam)      英文函数原型: sendmessage(hwnd,msg,wparam,iparam) 参数: hwnd: 窗口句柄 值,可以通过,找到窗口.顶层窗口句柄,等获取句柄的函数得到msg:指定被发送的消息wparam:指定附加的消息特定信息. iparam:指定附加的消息特定信息.举