RabbitMQ实例教程:主题交换机

  前面的例子中,尽管我们使用了direct路由代替fanout路由解决了盲目广播的问题,但direct路由也有它的缺陷,他不能基于多个标准做路由转发。

  在上面的日志系统中,如果不仅想基于日志等级做订阅,也想根据日志的发生源做订阅该怎么处理呢?这时候你可能想到了unix系统工具中的syslog服务,它不仅基于日志等级(info/warn/crit...)进行路由转发,也会根据操作(auth/cron/kern...)做路由转发。

  如果是那样的话,日志系统就灵活多了,它不仅能够监听来自‘cron’的关键错误,也能监听来自‘kern‘的所有日志。其实主题交换机(topic exchange)就能解决这种问题。

  主题交换机(Topic exchange)

  主题交换机的路由代码不能是任意写的,必须是小树点分隔开的一组单词列表。这些单词可以随便写,但通常是与连接消息特征有关的单词。有效地路由代码应该是这样的“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由代码可以随便写,但是长度限制在255字节。

  注意,绑定代码也必须在同一个表单中。topic交换机与direct交换机类似-具有特定路由代码的消息会传送给所有匹配绑定代码的队列,但有两个特殊的绑定代码:

  * :它能替代一个单词 

  #:它能替代0或多个单词

  该例子中,我们给所有的动物发送消息,符合由三个单词(第一个单词描述速度;第二个单词描述颜色;第三个单词描述物种)组成的路由代码将会发送消息:“<speed>.<colour>.<species>”。

  我们创建了三个绑定:Q1使用“*.orange.*”绑定,Q2使用“*.*.rabbit”和“lazy.#”绑定。这些绑定的意义如下:

  Q1描述了所有颜色为橙色的动物。

  Q2描述了是兔子的动物和懒惰的动物。

  这样,“quick.orange.rabbit”消息通过路由转发给Q1、Q2两个队列。"lazy.orange.elephant"消息也会转发给Q1、Q2两个队列。“quick.orange.fox”消息只会转发给Q1队列,"lazy.brown.fox"也只会转发给Q2队列。"lazy.pink.rabbit"会转发给Q2队列一次,尽管它匹配两个绑定。"quick.brown.fox"并不匹配任何一个队列就会被废弃。

  如果我们打破规则,每次只发一个或四个单词的话,如“orange”或”quick.orange.male.rabbit“,这些消息不匹配任何绑定,就会被废弃。但如果发送”lazy.orange.male.rabbit“这样的消息的话,由于它匹配最后的绑定仍会被转发到Q2队列中。

  主题交换机是一种非常强大的交换机,当它只绑定”#“时,它会接收所有的消息,与fanout交换机类似。当没有使用”*“和”#“符号时,主题交换机的作用等同与direct交换机。

  源代码

EmitLogTopic.java

package com.favccxx.favrabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "topic");

			String[] routingKeys = { "fast.orange.duck", "slow.orange.fish", "grey.rabbit", "fast.black.rabbit",
					"quick.white.rabbit", "lazy.dog", "lazy.black.pig" };
			String[] messages = { "Hello", "Guys", "Girls", "Babies" };

			for (int i = 0; i < routingKeys.length; i++) {
				for (int j = 0; j < messages.length; j++) {
					channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, messages[j].getBytes("UTF-8"));
					System.out.println(" [x] Sent ‘" + routingKeys[i] + "‘:‘" + messages[j] + "‘");
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

ReceiveLogsTopic.java

package com.favccxx.favrabbit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogsTopic {

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		String queueName = channel.queueDeclare().getQueue();

		String[] bindingKeys = { "*.orange.*", "*.*.rabbit", "lazy.#" };
		for (final String bindingKey : bindingKeys) {
			channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
			Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					String message = new String(body, "UTF-8");
					System.out.println("[" + bindingKey + "] Received message :‘" + message + "‘ from routingKey : " + envelope.getRoutingKey());
				}
			};
			channel.basicConsume(queueName, true, consumer);
		}

	}
}

运行消息发送器,在消息接收平台输出内容如下

[*.orange.*] Received message :‘Hello‘ from routingKey : fast.orange.duck
[*.*.rabbit] Received message :‘Guys‘ from routingKey : fast.orange.duck
[lazy.#] Received message :‘Girls‘ from routingKey : fast.orange.duck
[*.orange.*] Received message :‘Babies‘ from routingKey : fast.orange.duck
[*.*.rabbit] Received message :‘Hello‘ from routingKey : slow.orange.fish
[lazy.#] Received message :‘Guys‘ from routingKey : slow.orange.fish
[*.orange.*] Received message :‘Girls‘ from routingKey : slow.orange.fish
[*.*.rabbit] Received message :‘Babies‘ from routingKey : slow.orange.fish
[lazy.#] Received message :‘Hello‘ from routingKey : fast.black.rabbit
[*.orange.*] Received message :‘Guys‘ from routingKey : fast.black.rabbit
[*.*.rabbit] Received message :‘Girls‘ from routingKey : fast.black.rabbit
[lazy.#] Received message :‘Babies‘ from routingKey : fast.black.rabbit
[*.orange.*] Received message :‘Hello‘ from routingKey : quick.white.rabbit
[*.*.rabbit] Received message :‘Guys‘ from routingKey : quick.white.rabbit
[lazy.#] Received message :‘Girls‘ from routingKey : quick.white.rabbit
[*.orange.*] Received message :‘Babies‘ from routingKey : quick.white.rabbit
[*.*.rabbit] Received message :‘Hello‘ from routingKey : lazy.dog
[lazy.#] Received message :‘Guys‘ from routingKey : lazy.dog
[*.orange.*] Received message :‘Girls‘ from routingKey : lazy.dog
[*.*.rabbit] Received message :‘Babies‘ from routingKey : lazy.dog
[lazy.#] Received message :‘Hello‘ from routingKey : lazy.black.pig
[*.orange.*] Received message :‘Guys‘ from routingKey : lazy.black.pig
[*.*.rabbit] Received message :‘Girls‘ from routingKey : lazy.black.pig
[lazy.#] Received message :‘Babies‘ from routingKey : lazy.black.pig
时间: 2024-08-11 05:33:37

RabbitMQ实例教程:主题交换机的相关文章

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

RabbitMQ实例教程:用Java搞定工作队列

在上一节中,我们学会了使用编程的方式发送和接收一个命名好的队列.本节中我们将会使用工作队列在多个工作者之间分发任务. 工作队列的核心思想是避免立即处理高密集度必须等待完成的任务.它采用了安排任务的方式,将一个任务封装成一个消息把它放进队列.在后台运行的工作进程到时候会将它弹出并执行,这样任务队列中的任务就会被工作进程共享执行. 工作队列适用于Web应用中在一个短的HTTP请求中处理复杂任务的场景. 在上节中,我们发送了一个"Hello World!"字符串消息.现在发送多个字符串消息表

RabbitMQ实例教程:路由选择

在前面的例子中,我们构建了一个简单的日志系统来日志消息通过广播传送到多个接受者.本文将介绍如何订阅消息的子集.比如,我们能够将关键的错误信息写到日志文件中,同时也能够在控制台打印所有的日志消息. 消息绑定(Bindings) 在前面的例子中,我们使用下面的代码方式再次绑定. channel.queueBind(queueName, EXCHANGE_NAME, ""); 绑定表述的是交换机和队列之间的关系,可以理解为队列对交换机中的消息感兴趣. 绑定的参数是 routingKey,为避

RabbitMQ实例教程:Hello RabbitMQ World之Java实现

RabbitMQ要实现Hello World,其实也很简单.只需一个服务器来发送消息,另外有个客户端接收消息即可. 整体的设计流程如下: 消息生产者发送Hello到消息队列,消息消费者从队列中接收消息. 下载依赖Jar包 RabbitMQ要用Java实现发送消息,就必须使用Java客户端库.目前RabbizMQ的Java客户端库最新版为为 3.5.5 .可以从Maven仓库下载,也可以直接去官网下载. <dependency>    <groupId>com.rabbitmq<

RabbitMQ实例教程:Windows下安装RabbitMQ

(1)下载RabbitMQ服务器 从RabbitMQ官网下载最新的稳定版.目前最新版本为3.5.1. (2)移除RabbitMQ老版本. 如果之前安装了老版本的话,或者想要将Erlang VM从32位升级到64位,需要手动卸载RabbitMQ服务器.因为安装过程中并不会停止或移除旧的服务. (3)安装RabbitMQ服务器 从Erlang官网下载Windows安装文件,并安装.RabbitMQ需要这个东西. 运行rabbitmq-server-3.5.1.exe,安装RabbitMQ并使用默认配

RabbitMQ入门教程(七):主题交换机Topics

原文:RabbitMQ入门教程(七):主题交换机Topics 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78631035 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的另一种类型:主题类型topic,直连接类型direct必须是生产者发布消息指定的routingKey和消费者

RabbitMQ学习 (五):主题交换机

尽管直连交换机能够改善我们的系统,但是它也有它的限制 -- 没办法基于多个标准执行路由操作. 为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 -- 主题交换机. 发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表.这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇.以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw&quo

rabbitmq系列四 之主题交换机

1.主题 在前面的例子中,我们对日志系统进行了改进.使用了direct交换机代替了fanout交换机,从只能盲目的广播消息改进为有可能选择性的接收日志. 尽管直接交换机能够改善我们的日志系统,但是它也有它的限制--没办法基于多个标准执行路由操作. 在我们的日志系统中,我们不只希望订阅基于日志级别,同时还希望订阅基于日志来源.其中unix工具syslog是同时基于日志的级别(info/warn/error)和设备-facility (auth/cron/kern...)来路由日志的. 如果这样的话

RabbitMQ入门教程(九):首部交换机Headers

原文:RabbitMQ入门教程(九):首部交换机Headers 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78638988 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 首部交换机和扇形交换机都不需要路由键routingKey,交换机时通过Headers头部来将消息映射到队列的,有点像HTTP的