RabbitMQ学习 (五):主题交换机

尽管直连交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。

为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— 主题交换机。

发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

* (星号) 用来表示一个单词.

# (井号) 用来表示任意数量(零个或多个)单词。

生产者代码:


public class Productor {

public static void main(String[]
args) throws
IOException,
TimeoutException {

//配置rabbitmq服务器地址

ConnectionFactory
factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(5672);

factory.setUsername("starktan");

factory.setPassword("starktan");

factory.setVirtualHost("/");

//建立连接和通道

Connection
connection = factory.newConnection();

Channel channel =
connection.createChannel();

//声明一个主题交换机

channel.exchangeDeclare("topic",
BuiltinExchangeType.TOPIC);

System.out.println("发送信息!");

String message = "1.2.3";

//发送routkey“1.2.3”

channel.basicPublish("topic", message, true, null, message.getBytes());

channel.close();

connection.close();

}

}

消费者代码

package com.stark.example5;

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //创建连接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 4; i++) {
            final int cur = i;
            service.submit(new Runnable() {
                Channel channel = connection.createChannel();
                String queryname = channel.queueDeclare().getQueue();

                public void run() {
                    //创建队列消费者
                    QueueingConsumer consumer = new QueueingConsumer(channel);
                    try {
                        switch (cur) {
                            case 0: //获取0开头的主题消息
                                channel.queueBind(queryname, "topic", "1.#");
                            case 1: //获取3结尾的主题消息
                                channel.queueBind(queryname, "topic", "#.3");
                            case 2: //获取2中间的主题消息
                                channel.queueBind(queryname, "topic", "*.2.*");
                            default://获取4中间的主题消息
                                channel.queueBind(queryname, "topic", "*.4.*");
                        }
                        channel.basicConsume(queryname, consumer);
                        while (true) {
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            System.out.println("线程 " + cur + " 获取到消息 " + message);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        service.shutdown();
    }
}
时间: 2024-08-07 04:10:22

RabbitMQ学习 (五):主题交换机的相关文章

RabbitMQ(五) ——主题

RabbitMQ(五) --主题 (转载请附上本文链接--linhxx) 一.概述 话题模式(topic)可以让队列绑定某一类型的消息,而不仅仅是direct模式下的具体的消息.即,其允许绑定的信息采用通配符.可以保证多重条件下,仍具备灵活性.但是,当routing key没有匹配时,仍然会丢弃消息. 话题模式如下图所示: 二.话题模式的交换机(topic exchange) 该模式下,routing key更加灵活,支持通配符.但是,并没有正则表达式那么强大的匹配,其主要支持两个通配符.匹配是

RabbitMQ实例教程:主题交换机

前面的例子中,尽管我们使用了direct路由代替fanout路由解决了盲目广播的问题,但direct路由也有它的缺陷,他不能基于多个标准做路由转发. 在上面的日志系统中,如果不仅想基于日志等级做订阅,也想根据日志的发生源做订阅该怎么处理呢?这时候你可能想到了unix系统工具中的syslog服务,它不仅基于日志等级(info/warn/crit...)进行路由转发,也会根据操作(auth/cron/kern...)做路由转发. 如果是那样的话,日志系统就灵活多了,它不仅能够监听来自'cron'的关

rabbitmq系列四 之主题交换机

1.主题 在前面的例子中,我们对日志系统进行了改进.使用了direct交换机代替了fanout交换机,从只能盲目的广播消息改进为有可能选择性的接收日志. 尽管直接交换机能够改善我们的日志系统,但是它也有它的限制--没办法基于多个标准执行路由操作. 在我们的日志系统中,我们不只希望订阅基于日志级别,同时还希望订阅基于日志来源.其中unix工具syslog是同时基于日志的级别(info/warn/error)和设备-facility (auth/cron/kern...)来路由日志的. 如果这样的话

RabbitMQ入门教程(七):主题交换机Topics

原文:RabbitMQ入门教程(七):主题交换机Topics 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78631035 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的另一种类型:主题类型topic,直连接类型direct必须是生产者发布消息指定的routingKey和消费者

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

RabbitMQ学习(三)订阅/发布

RabbitMQ学习(三)订阅/发布 1.RabbitMQ模型 前面所学都只用到了生产者.队列.消费者.如上图所示,其实生产者并不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机. 有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout) 这里我们使用扇形交换机做一个简单的广播模型:一个生产者和多个消费者接受相同消息

rabbitmq学习之路(三)

今天继续学习rabbitmq 了解一下AMQP的一些基本概念 交换机: Direct exchange(直连交换机) Fanout exchange(扇型交换机) Topic exchange(主题交换机) Headers exchange(头交换机) 交换机有两个状态 持久和暂存,区别就是持久话的交换机在消息代理也就是broker重启后依旧存在 队列: 队列需要被声明之后才能使用,如果声明时,该队列不存在,就会新建,如果已经存在,且属性无变化,则没有关系,不影响,若属性有变化,则报错 队列和交

RabbitMQ学习及实践2---介绍及简单Java实现

一,基本概念 MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息.MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品. RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消息队列协

RabbitMQ学习系列(四): 几种Exchange 模式

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ.今天说些理论的东西,Exchange 的几种模式. AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相