主题模式

  • Topic(主题模式)

    • Topic exchange

      direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但匹配规则有些不同
      routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但是通常它们指定与消息相关的某些功能。一些有效的rounting key 如:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。rounting key中可以包含任意多个单词,最多255个字节。
      binding key可以存在如下两种特殊的字符 即:
      1、*(星号)可以代替一个单词。
      2、#(哈希)可以替代零个或多个单词

      ? 在上面图片中,Routing key 设置为"quick.orange.rabbit"的消息将传递到两个队列。消息"lazy.orange.elephant"也将发送给他们两个。但,"quick.orange.fox"只会进入第一个队列,而"lazy.brown.fox"只会进入第二个队列。"lazy.pink.rabbit"将被传递到第二队只有一次,即使两个绑定匹配。"quick.brown.fox"与任何绑定都不匹配,因此将被丢弃。
      如果我们发送一个或四个单词的消息,例如"orange"或"quick.orange.male.rabbit",这些消息将不匹配任何绑定,并且将会丢失。但"lazy.orange.male.rabbit"即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。

    • 生产者消费者代码:

      ? 生产者

      public class TopicEmitLog {
          private static final String EXCHANGE_NAME = "topic_logs";
      
          public static void main(String[] args) throws Exception {
              //获取连接
              Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      
              Channel channel = connection.createChannel();
      
              //创建队列
              //channel.queueDeclare("direct_loge",true,false,false,null);
              //声明交换机,
              channel.exchangeDeclare(EXCHANGE_NAME, "topic");
      
              String message="hello";
              //发送消息
      
              //发送消息
              channel.basicPublish(EXCHANGE_NAME, "topics.log", null, message.getBytes("utf-8"));
      
              channel.close();
              connection.close();
      
          }
      
      }

      消费者1可以收到消息

      public class TopicRecv {
      
          public static final String QUEUE_NAME = "topic_queues";
      
          public static final String EXCHANGE_NAME = "topic_logs";
      
          public static void main(String[] args) throws Exception{
              //获取连接
              Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      
              //声明通道
              Channel channel = connection.createChannel();
      
              channel.exchangeDeclare(EXCHANGE_NAME, "topic");
              //声明队列队列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              //4.绑定队列到交换器,指定路由key为topics
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topics.#");
      
              //
              //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
      
              DeliverCallback deliverCallback = new DeliverCallback(){
                  @Override
                  public void handle(String consumerTag, Delivery delivery) throws IOException {
                      String message = new String(delivery.getBody(), "UTF-8");
                      System.out.println(" [x] Received '" + message + "'");
                  }
              };
      
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
                  @Override
                  public void handle(String consumerTag) throws IOException {
      
                  }
              });
      
          }
      
      }

      消费者2收不到消息

      public class TopicRecv2 {
      
          public static final String QUEUE_NAME = "topic_queues2";
      
          public static final String EXCHANGE_NAME = "topic_logs";
      
          public static void main(String[] args) throws Exception{
              //获取连接
              Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      
              //声明通道
              Channel channel = connection.createChannel();
      
              channel.exchangeDeclare(EXCHANGE_NAME, "topic");
              //声明队列队列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              //4.绑定队列到交换器,指定路由key为topics
              //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topics.#");
      
              //
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
      
              DeliverCallback deliverCallback = new DeliverCallback(){
                  @Override
                  public void handle(String consumerTag, Delivery delivery) throws IOException {
                      String message = new String(delivery.getBody(), "UTF-8");
                      System.out.println(" [x] Received '" + message + "'");
                  }
              };
      
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
                  @Override
                  public void handle(String consumerTag) throws IOException {
      
                  }
              });
      
          }
      
      }

相关代码链接: https://github.com/albert-liu435/springmq

原文地址:https://www.cnblogs.com/haizhilangzi/p/12301736.html

时间: 2024-10-29 18:21:36

主题模式的相关文章

activeMQ队列模式和主题模式的Java实现

一.队列模式 生产者 import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activ

使用Java编写ActiveMQ的队列模式和主题模式

队列模式的消息演示 本小节简单演示一下如何使用JMS接口规范连接ActiveMQ,首先创建一个Maven工程,在pom.xml文件中,添加activemq的依赖: <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version

ActiveMQ队列、主题模式区别

1.ActiveMQ队列模式如下图,生产者创建消息到消息中间件,再"均分给消费者". 2.ActiveMQ主题模式如下图,生产者创建消息到消息中间件,消费者会接受到订阅的主题中所有的消息.在主题模式下,消费者获取不到订阅之前的中间件中的消息. 原文地址:https://www.cnblogs.com/GrapefruitTea/p/9941169.html

ActiveMQ--模式(队列模式/主题模式)

两种模式:队列模式/主题模式 pom.xml <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> 队列模式,其实就是分食模式. 比如生产方发了 10条消息到 activeMQ 服务器, 而此时有多个 消费方

ActiveMQ之topic主题模式

开发环境我们使用的是ActiveMQ 5.11.1 Release的Windows版,官网最新版是ActiveMQ 5.12.0 Release,大家可以自行下载,下载地址.需要注意的是,开发时候,要将apache-activemq-5.11.1-bin.zip解压缩后里面的activemq-all-5.11.1.jar包加入到classpath下面,这个包包含了所有jms接口api的实现. 搭建开发环境建立项目,我们只需要建立一个java项目就可以了,导入jar包,项目截图: 1.编写生产者

Rabbitmq(6) 主题模式

* 匹配1个 # 匹配所有 发送者: package com.aynu.bootamqp.service; import com.aynu.bootamqp.commons.utils.Amqp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; pu

夜间模式的实现

前天在做项目的时候, 遇到一个问题(夜间模式的实现),通常我们在设置夜间模式的时候,简单的做法是使用通知的设计模式,改变各个页面的背景色,然后设置一下透明的效果,可是一个真正的项目,并不能马虎,需要页面效果美观精致.本文参考了github上一个老外写的实现方案,方案参考 经过自己的理解整合,制作出了自己的页面模式的实现. Xcode中floder 与 group 的区别 在这里我先要说明一下:在Xcode中蓝色和黄色文件夹的区别,因为本文就是使用到了蓝色的文件夹,通常蓝色文件夹在IOS中被称为f

MQ的几种模式

MQ的几种模式: Producer消息产生者,红色队列,Customer消息消费者,蓝色交换机exchange 简单模式:消息产生者产生消息,消息的消费者进行消费 工作模式:消息消费产生消息,将消息发送到消息队列中,这是竞争,消费者1和消费者2都监听消息队列,当队列中有消息,一起来抢消息.谁抢到谁处理. 消息发布和订阅:消息产生者产生消息,将消息发送到交换机中.多个消息队列绑定到交换机上.交换机将消息发送到多个队列中.消费者1监听自己的队列,如果有消息就进行消费.消费者2监听自己的队列,如果有消

4.4.2.3 模式和密码需求

4.4.2.3 模式和密码需求 所有Oracle身份管理和融合应用程序模式Oracle身份管理数据库的名称/ OID数据库和融合应用程序数据库是固定的,不能修改. 他们将由融合中间件RCU(Oracle身份管理)和Oracle RCU融合应用程序. 注意,融合中间件RCU似乎提供一个选项来选择一个前缀模式,但在这个版本 前缀必须足总. 因此,所需的主题模式,唯一的计划是关于密码的选择. 甲骨文身份管理和融合应用程序配置向导给你的选项为每个模式选择不同的密码,但是你也可以使用相同的密码为所有模式或