在上一节我们创建了一个工作队列,并且假设工作队列把每一个任务都准确地分发给一个worker。在本章我们将创建一个更加复杂的例子–我们将把一个消息分发到多个消费者。这个模式就是发布/订阅。
为了说明这个模式,我们将构建一个简单的日志系统。它包含两个程序–第一个用来发出日志消息,第二个将接收并打印它们。
在我们的日志系统中,每个运行的receiver都将获得消息。这样我们通过一个receiver接收消息并直接记录到磁盘中;同时我们可以运行另一个receiver在屏幕上打印日志。
从本质上讲,日志消息是通过广播分发给所有的receivers.
Exchanges
在前面的章节中,我们使用了一个队列来发送和接收消息。而现在我们将完整地介绍RabbitMQ的消息模型。
首先我们先回顾一下前面的例子:
- 生产者:用来发送消息的应用程序
- 队列:存储消息的缓存区
- 消费者:用来消费消息的应用程序
RabbitMQ消息模型的核心思想就是生产者不会直接向队列发送消息。实际上,生产者根本不知道消息是否发送到了任何队列。
相反,生产者只能发送消息给交换器exchange(后面统称交换器).交换器原理非常简单,一方面它从生产者接收消息而另一方面把消息推送给消息队列。 交换器必须知道如何处理接收到的每一个消息。是直接添加到特定的队列呢?还是添加到多个队列?再或者是直接丢弃。消息的处理方式取决于交换器的类型。
交换器的类型有:direct, topic, headers 和 fanout。本章我们主要关注第四个类型:fanout.下面我们创建一个该类型的交换器,并命名为logs:
channel.exchangeDeclare("logs", "fanout");
fanout交换器非常简单。根据名字你可能就猜到,它是把所有接收到消息都广播给其知道的队列。这也正是我们的日志记录器需要的。
交换器列表
我们可以执行命令rabbitmqctl来列出服务器的交换器列表:
$ sudo rabbitmqctl list_exchangesListing exchanges ... directamq.direct directamq.fanout fanoutamq.headers headersamq.match headersamq.rabbitmq.log topicamq.rabbitmq.trace topicamq.topic topiclogs fanout...done
列表中有一些以amq.*开头的交换器以及未命名(默认)的交换器,这些交换器都是系统默认创建的,但是你不太可能用到它们。
无名交换器
在前面的例子中我们没有使用任何交换器,但是我们依然可以将消息发送到队列中。这是由于我们通过设置了名字为””字串,进而使用了默认的交换器。
回顾一下我们之前发送消息的方式:
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是交换器的名称。空字符串表示使用默认的或者无名的交换器:消息被路由到routingKey指定的队列。
通过下面的代码我们就可以向我们的命名交换器发送消息:
channel.basicPublish( "logs", "", null, message.getBytes());
临时队列
你是否还记得前面的例子中我们使用的指定命名的队列么?如hello和task_queue。当你想在生产者与消费者之间的共享队列,那么给这个队列命名是非常重要的。
但是我们这个例子的日志记录器并不需要这样。我们希望收集到所有的日志信息而不是其中的一个子集。我们只对当前的消息流感兴趣而不会关注旧消息。为了实现这个例子,我们需要做两件事。
首先,任何时候连接到RabbitMQ,我们都需要一个新的、空的队列。如何做?我们可以创建一个具有随机命名的队列或者也可以让服务器来选择一个随机命名的队列,这样更好。
第二件事就是,一旦我们消费者的连接,队列应该能自动被删除。
在java客户端,我们可以无参调用queueDeclare()方法来创建一个非持久的,专属的,能被自动删除的队列;该方法返回队列的名称。
String queueName = channel.queueDeclare().getQueue();
变量queueName的内容就是队列的随机名称,类似字串’amq.gen-JzTY20BRgKO-HjmUJj0wLg’.
Bindings
前面我们已经创建了一个fanout类型的交换器和一个队列。现在我们就可以告诉交换器发送消息给队列了。交换器与队列之间的关系称作binding。
channel.queueBind(queueName, "logs", "");
现在logs交换器将向我们创建的队列追加消息。
bindings列表
我们可以使用命令rabbitmqctl list_bindings来列出所有的bindings.
执行代码
用来发送日志消息的生产者代码看起来与之前的没什么区别。比较重要的改变就是我们把消息发送到了名为logs的交换器而不是系统默认的无名交换器。在发送消息时我们需要提供一个routingKey,但fanout类型的交换器会忽略该值的意义。下面是完整代码:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "info: Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
如你所见,在建立连接后,我们就创建一个fanout的交换器,这一步是必须。禁止向一个不存在的交换器发送消息。
假如没有队列绑定到这个交换器,那么发送的消息就会丢失,但这在我们这个例子中影响不大。如果没有消费者在监听消息队列,我们可以安全地丢弃消息。
完整的ReceiveLogs.java代码如下:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received ‘" + message + "‘"); } } }