【RocketMQ】同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?

一、问题答案

  是不可以的

而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息

/**
 * @date 2019/05/28
 */
@Component
@Slf4j
public class MqConsumer implements MessageConsumer {

    @Override
    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
    public void onMessage(String msg) {
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
    }

    @Override
    public String getTopic() {
        return "topic-1";
    }

    @Override
    public String getTag() {
        return "tag-1";
    }
}
@Component
@Slf4j
public class MqConsumer2 implements MessageConsumer {

    @Override
    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
    public void onMessage(String msg) {
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
        log.info("接收到的库存MQ消息:{}", msg);
    }

    @Override
    public String getTopic() {
        return "topic-1";
    }

    @Override
    public String getTag() {
        return "tag-2";
    }
}

二、为什么呢?

我们从源码的角度来分析下

1.订阅消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) ,其中subExpression即为tag

package com.aliyun.openservices.ons.api.impl.rocketmq;
....
@Generated("ons-client")
public class ConsumerImpl extends ONSConsumerAbstract implements Consumer {
    private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>();

    public ConsumerImpl(final Properties properties) {
        super(properties);
        boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false"));
        this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull);

        String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
    }

    @Override
    public void start() {
        this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl());
        super.start();
    }

    @Override
    public void subscribe(String topic, String subExpression, MessageListener listener) {
        if (null == topic) {
            throw new ONSClientException("topic is null");
        }

        if (null == listener) {
            throw new ONSClientException("listener is null");
        }
        this.subscribeTable.put(topic, listener);
        super.subscribe(topic, subExpression);
    }

.....
}

从上面的类中我们可以从this.subscribeTable.put(topic, listener);看到subscribeTable这样的一个Map,该Map与tag无关

2.我们再看super.subscribe(topic, subExpression)方法,属于ONSConsumerAbstract类中

protected void subscribe(String topic, String subExpression) {
        try {
            this.defaultMQPushConsumer.subscribe(topic, subExpression);
        } catch (MQClientException e) {
            throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
        }
    }

DefaultMQPushConsumer中:

@Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }

DefaultMQPushConsumerImpl中:

public void subscribe(String topic, String subExpression) throws MQClientException {
        try {      //此处用来构建订阅数据,并且指定了tag
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);      //此处将topic和该topic的订阅数据存放到subscriptionInner这个Map中   // protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

三、总结

从上面简单的源码可以看到,有用到两个Map,

subscribeTable 和 subscriptionInner ,并且Map的key都为topic。所以我们可以笃定,RocketMQ在同一个项目中,只支持注册一个topic消费者,那么也就只能指定一个tag

原文地址:https://www.cnblogs.com/756623607-zhang/p/10989591.html

时间: 2024-08-28 16:39:38

【RocketMQ】同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?的相关文章

Swift &amp; Objc 在同一个项目中的使用

在WWDC大会中发布了Swift让人眼前一亮.终于加了很多的现代编程语言该有的东西.很早年以前玩C#3.0+的时候这些差不多类似的 已经用的烂熟的东西终于一点一点的在看Swift Programming Language的时候再唤醒. Swift较之于OC(Objective-C)在愈发上几乎是一门新的语言了,幸运的是Swift和OC都是出自苹果大家庭,还能在一张桌子上吃饭.也就是说在升级项目 的时候你可以考虑使用Swift开发新的功能,调用已有的部分或者被已有的部分调用.或者在Swift开发的

同一个项目中存在完全相同的包名和类名如何解决调用问题

项目中遇到有一个类,在两个jar包中都存在,而且类所在的包名和类名完全一致,解决办法有两种: 1.常用办法 清除项目中过时的那个jar包,推荐方式. 2.如果两个都不能清除,则在使用过程中动态指定加载的jar包即可.以rt.jar中javax.xml.ws.Service为例,代码如下 File file = new File("f:\\rt.jar"); URL url = file.toURI().toURL(); ClassLoader classLoader = new URL

在同一个项目中灵活运用application/json 和application/x-www-form-urlencoded 两种传输格式(配合axios,同时配置loading)

'use strict' import axios from 'axios' // import qs from 'qs' import { Notification} from 'element-ui' //使用elementui的提示显示 import { Loading } from 'element-ui' import router from "../router"; let loading function startLoading() { loading = Loadin

项目中同一个dll的x86和x64同时引用

<ItemGroup Condition=" '$(Platform)' == 'x86' "> <Reference Include="System.Data.SQLite, Version=1.0.84.0, Culture=neutral, PublicKeyToken=db937bc2d44ff139, processorArchitecture=x86"> <SpecificVersion>False</Speci

IntelliJ IDEA导入多个eclipse项目到同一个workspace下

IntelliJ IDEA 与eclipse在新建项目上工作区的叫法略有不同,区别见下图. 我们在eclipse都是在新建的workspace目录下新建我们的项目,但是在IDEA中没有workspace这个概念,IDEA中的项目就相当于eclipe中的workspace,那么问题来了,我一般在eclipse下的一个工作空间中建立多个相关的小项目(并非一个大项目的多个模块),比如说我写了好多接口项目,他们不属于同一个项目但是他们都是接口类的项目,所以我就将他们放到workspace_webserv

在vue项目中封装echarts的正确姿势

为什么需要封装echarts 每个开发者在制作图表时都需要从头到尾书写一遍完整的option配置,十分冗余 在同一个项目中,各类图表设计十分相似,甚至是相同,没必要一直做重复工作 可能有一些开发者忘记考虑echarts更新数据的特性,以及窗口缩放时的适应问题.这样导致数据更新了echarts视图却没有更新,窗口缩放引起echarts图形变形问题 我希望这个echarts组件能设计成什么样 业务数据和样式配置数据分离,我只需要传入业务数据就行了 它的大小要完全由使用者决定 不会因为缩放出现变形问题

ThinkPHP中:多个项目共享同一个session问题

使用ThinkPHP3.1.3版本的session时,多个项目同时调试会使得一维数组式的session不够用,导致在A项目登录后台后,在B项目就不用登录后台就可以进入后台操作了. 问题在于他们都调用同一个session文件sess_d55a5f55db022fb119fc38193c4cecac.我不知道怎么去定义这个文件的命名方式.但是,我找到了解决多个项目共享同一个session但又不相互冲突的方法.这个方法的原理就是,将session变为二维数组.具体操作是在配置文件中加上session前

同一个项目,项目名称不一致,这两个项目同时在Eclipse中出现

在Eclispse中,实际同一个项目,项目名称不一致,这两个项目同时在Eclipse中出现. ①打开项目文件夹,找到“.cproject”文件 ② 在<name>节点重命名 ③ 导入Eclipse

ideal中如何添加几个不同的项目在同一个idea的显示页面

今天,我遇到了一个问题,就是同事给了我一些项目,我下载了之后,项目有点多,然后想把这些项目都放到一个里面,所以我就采取了添加module的方式进行添加,首先先看一下我们的四个项目, 我们就想实现在一个idea里面进行添加这四个module 1.首先我们要新建一个项目,手动的在那个项目中新建一个文件夹 然后我们可以把要导入的module,可以手动的粘贴到这个myFirstTest的下面,因为我们如果 要import module的话,这个import的这个引用的导入,而不是物理空间的导入,所以我们