RabbitMQ (消息队列)专题学习06 Topic

(使用Java客户端)

一、概述

在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接、并且可以有选择的接收消息。

尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息。

在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源。

这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而不是来自"kern"消息源发送的所有消息。

为了按照上述的要求改进我们的日志系统,所以我们需要学习一种更为复杂的exchange.

二、实现步骤

2.1、Topic exchange

消息被发送到一个topic exchange不能使用一个随意的routingKey,而它必须是一个由点号(.)隔开的单词列表,这些单词可以使任何字符,但是它们通常有一些特性用来指定分发的消息,一些有效的routingKey的例子,比如"stock.mus.nyse","nyse.vmw","quick.orange.rabbit",可以用自己喜欢的许多单词来作为routingKey,但是最多不能超过255个字符。

绑定Key必须是相同的形式,topic exchange的逻辑绑定类似与一个direct exchange的丁丁,一个被发送的消息带着一个特定的routingKey被传递到所有与之匹配的routingKey的queues中,但是topic exchange有两个特殊的情况。

1、*(星号)可以代替一个确切的词

2、#(井号)可以替换零个或者多个词

以下是一个最容易的例子的说明:

图-1

说明:

在这个实例中,要发送描述动物的所有消息,该消息将routingKey包含三个单词,两个点号,在routingKey的第一部分描述速度、第二部描述颜色、第三部分描述种类。格式为:"<speed>.<colour>.<species>"。

创建了三个绑定,Q1绑定key为“*.orange.*”,Q2队列的绑定key为"*.*.rabbit"和"lazy.#".

这些绑定可以概括为:

>Q1是一对所有orange的舞动感兴趣的队列。

>Q2项坚挺关于兔子和懒惰的动物一切消息的队列。

若一个消息绑定的key设置为"quick.orange.rabbit"将被发送到所有队列,因为它匹配所有消息队列的绑定关系,消息绑定key为"lazy.orange.elephant"也将被发送到所有的消息队列中,另外一方面,若某条消息绑定key为"quick.orange.fox"仅仅将被发送到Q1中,"lazy.pink.rabbit"这样的绑定key仅仅只有一次被传递到Q2中,即使它符合匹配两个绑定,像"quick.brown.fox"不匹配任何绑定的队列,这些消息将会被丢弃。

如果我们打破上述的这些规则发送包含一个单词或者四个单词的消息,比如说“orange”或者"quick.orange.male.rabbit"这样的key,这些消息将全部都被丢失,因为没有与之匹配的queue.

另外一方面像"lazy.orange.male.rabbit"这样key,即使它有四个单词,将匹配最后的一部分,满足Q2的规则,所以这些消息将被传递到Q2中。

注意:

1、Topic exchange是强大的,当一个queue绑定一个带有#的key时,无论是什么routingKey,它将接收所有的消息,跟fanout exchange是一样的效果。

2、当被绑定的key中没有使用#和*符号时,topic exchange就像一个direct exchange一样,能准确的将消息路由到匹配的queue中。

2.2、代码清单:

使用一个topic exchange在日志系统中,将开始一个带有两个单词的routingKey,格式为"<facility>.<severity>",代码几乎和之前的一样。

消息发送者:

package com.xuz.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {
	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("127.0.0.1");
			connection = factory.newConnection();
			channel = connection.createChannel();
			//指定exchange的类型为topic
			channel.exchangeDeclare(EXCHANGE_NAME, "topic");
			String[] msg = new String[]{"xuz RabbitMQ Test!","This is a Topic Exchange Model!"};
			//获取发送消息的routing key
			String[] routType = new String[]{"anonymous.info","anonymous.warning","anonymous.error","*.info","anonymous.#"};
			String routingKey = getRouting(routType);
			//获取发送消息
			String message = getMessage(msg);
			channel.basicPublish(EXCHANGE_NAME, routingKey, null, message
					.getBytes());
			System.out.println("EmitLogTopic---->Sent ‘" + routingKey + "‘:‘" + message
					+ "‘");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}

	private static String getRouting(String[] strings) {
		if (strings.length < 1)
			return "anonymous.info";
		//选择key
		return strings[0];
	}

	private static String getMessage(String[] strings) {
		if (strings.length < 2)
			return "Hello World!";
		return joinStrings(strings, " ", 1);
	}

	private static String joinStrings(String[] strings, String delimiter,
			int startIndex) {
		int length = strings.length;
		if (length == 0)
			return "";
		if (length < startIndex)
			return "";
		StringBuilder words = new StringBuilder(strings[startIndex]);
		for (int i = startIndex + 1; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}
}

接收者:

package com.xuz.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopic {
	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("127.0.0.1");
			connection = factory.newConnection();
			channel = connection.createChannel();
			//指定exchange类型为topic
			channel.exchangeDeclare(EXCHANGE_NAME, "topic");
			String queueName = channel.queueDeclare().getQueue();
			//指定队列绑定key
			String[] msgType = new String[]{"anonymous.info","anonymous.warning","anonymous.error","*.info","anonymous.#"};
			if (msgType.length < 1) {
				System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
				System.exit(1);
			}
			//test1 :接收所有类型的消息
			for (String bindingKey : msgType) {
				channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
			}
			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true, consumer);
			while (true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String message = new String(delivery.getBody());
				String routingKey = delivery.getEnvelope().getRoutingKey();
				System.out.println(" [x] Received ‘" + routingKey + "‘:‘"
						+ message + "‘");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

2.3、测试topic exchange

测试方案如下图所示,有一个发送消息的EmitLogTopic,有五个接收消息的类,其中ReceiveLogsTopic03指定的key为:"*.info",意味它只接收绑定key消息中有info这个词的消息源的消息,ReceiveLogsTopic04指定的key为:"anonymous.#",意味着它接收绑定key消息中带有anonymous这个词的一个或者多个消息源的消息。其他的按照类的命名接收绑定指定key的消息源的消息,key类型为:

"anonymous.info","anonymous.warning","anonymous.error","*.info","anonymous.#"

操作步骤:(在此只挑选几个类测试)

1、运行ReceiveLogsTopic03、ReceiveLogsTopic04、ReceiveLogsTopicError、ReceiveLogsTopicInfo、ReceiveLogsTopicWarning,分别如下图所示:

2、运行EmitLogTopic,此时指定发送消息的绑定的key为:anonymous.warning,那么此时ReceiveLogsTopic04、ReceiveLogsTopicWarning两个应该接收到消息,其他的接收不到消息。

3、修改EmitLogTopic的key的类型为:*.info,此时只有ReceiveLogsTopic03才会接收到消息,其他的接收不到消息。

4、修改EmitLogTopic的key为:anonymous.info ,此时只有ReceiveLogsTopic03、ReceiveLogsTopicWarning、ReceiveLogsTopic04能接收到消息。其他的接收不到消息。

Topic exchange是非常强大的,它弥补了fanout exchange(广播)和direct exchange(不能多个queues)各自的不足,它比前两者具有更强的灵活性。通配符*和#的使用,使得topic exchange灵活性大大增强,消费者不光能从匹配绑定key的queue中取出相关消息,还能做到从指定发送消息的消息源的所有信息。

源码下载:

topic 消息交换

RabbitMQ (消息队列)专题学习06 Topic,布布扣,bubuko.com

时间: 2024-10-30 16:20:27

RabbitMQ (消息队列)专题学习06 Topic的相关文章

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基

RabbitMQ 消息队列 应用

安装参考    详细介绍   学习参考 RabbitMQ 消息队列 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概

RabbitMQ消息队列1: Detailed Introduction 详细介绍

1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco .Redhat.iMatix 等联合制定了 AMQP 的公开标

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)

目录(?)[-] Exchanges Temporary queues Bindings绑定 最终版本 上篇文章中,我们把每个Message都是deliver到某个Consumer.在这篇文章中,我们将会将同一个Message deliver到多个Consumer中.这个模式也被成为 "publish / subscribe".    这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer). 我们将构建

Nginx通过LUA脚本访问RabbitMQ消息队列

发现了一个Nginx的LUA脚本:lua-resty-rabbitmqstomp,可以让Nginx通过LUA脚本访问RabbitMQ消息队列,这个脚本是通过stomp协议连接RabbitMQ的stomp适配器,来pub/sub消息的 关于RabbitMQ-STOMP安装使用相关内容可以参见:RabbitMQ STOMP Adapter 关于Nginx-LUA模块安装使用参见:LAMP架构演进到LAMPGC,再演进到LNMLGC 关于STOMP协议相关资料参见这里: STOMP官方英文协议1.1版

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)[转]

上篇文章中,我们把每个Message都是deliver(提供)到某个Consumer.在这篇文章中,我们将会将同一个Message deliver(提供)到多个Consumer中.这个模式也被成为 "publish / subscribe".     这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer). 我们将构建两个Consumer,第一个将log写到物理磁盘上:第二个将log输出的屏幕. 1.