原文地址:http://www.rabbitmq.com/getstarted.html 翻译得不好,欢迎指出。
一、Hello World
1、基本概念介绍
RabbitMQ是一个消息代理(或者说消息队列),它的主要意图很明显,就是接收和转发消息。你可以把它想象成一个邮局:当你把一封邮件放入邮箱,邮递员会帮你把邮件送到收件人的手上。在这里,RabbitMQ就好比一个邮箱、邮局或者邮递员。
RabbitMQ和邮局的主要区别在于,RabbitMQ不是处理邮件,而是接收、存储和将消息以二进制的方式转发出去。
在这里,我们先说明一些RabbitMQ中涉及到的术语。
- 生产者(Producer)。生产表示只负责发送的意思,一个只负责发送消息的程序称为一个生产者,我们通过一个P来表示一个生产者,如下图:
- 队列(Queue)。队列就好比一个邮箱,它在RabbitMQ的内部。虽然消息在RabbitMQ和程序之间传递,但是它们是存储在队列中的。一个队列没有大小的限制,你想存储多少条消息就存储多少条,它的本质是一个无限大的缓冲区。任何生产者都可以往一个队列里发送消息,同样的,任何消费者也可以从一个队列那接收到消息。我们用下图来表示一个队列,队列上面的文字表示这个队列的名字:
- 消费者(Consumer)。接收和发送的过程很类似,一个消费者程序通常是等待别人发送消息给它。我们通过一个C来表示一个消费者,如下图:
注意一点,消费者、生产者和消息队列可以不用运行在同一台机器上。实际上,在大多数的应用程序中,它们并不是在同一台机器上运行的。
2、”Hello World”
在这一小节中,我们将编写两个Java程序,一个作为生产者,发送一条简单的消息;另一个作为消费者,接收并打印出接收到的消息。在这里,我们先不讨论Java API的具体细节,而是先编写出一个可运行的“Hello World”程序。
在下图中,“P”表示一个生产者,”C”表示一个消费者,中间的矩形表示一个队列。
RabbitMQ会涉及许多协议,其中AMQP协议是一个开放式的、通用的消息协议。RabbitMQ支持很多编程语言,我们这里通过RabbitMQ提供的Java客户端来进行演示。请自行下载、安装RabbitMQ和相关jar包。
(1)发送
在下面的代码中,我们用Send来表示一个发送端,用Recv来表示一个接收端。发送端会先连接RabbitMQ,并发送一个简单的消息后关闭。
对于Send而言,我们需要导入一些相关的类:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
首先,先建立一个类,并为队列起一个名:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws Exception {
...
}
}
然后我们创建一个connection:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
connection是一个抽象的sockect连接,在连接时,我们需要注意一下登录认证信息。在这里我们连接到本地机器,所以填写上“localhost”。如果我们想要连接到其他机器上,只需要填写上其他机器的IP地址即可。
接下来,我们创建一个channel,大多数的API都可以通过这个channel来获取。
如果要发送消息,我们必须先声明一个队列,然后我们才可以把消息发送到这个队列里去:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
声明一个队列后,仅当队列不存在时,队列才会被创建。另外,消息的内容是一个字节数组,所以我们需要先对消息进行编码。最后,我们需要关闭连接。
channel.close();
connection.close();
以上就是Send.java类的所有代码
发送端运行不起来
如果你是第一次使用RabbitMQ,而且没有看到打印的“Sent”信息,你可能会奇怪是哪里出问题了。这里有可能是RabbitMQ在启动时没有足够的磁盘空间(默认情况下最少需要1Gb的空间),所以它会拒绝接收任何消息。通过查看日志文件可以确定是否是由这个原因所造成的。如果有必要,我们也可以通过修改配置文件中的disk_free_limit来降低大小的限制。
(2)接收
上面的代码是发送端的。我们的接收端是从RabbitMQ那获取消息,所以并不是像发送端那样发送一个简单的消息,而是需要一直监听获取消息,并打印输出。
对于接收端而言,它需要导入如下的类:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer是一个实现了Consumer接口的类,我们可以使用它来获取发送过来的消息。
跟Send一样,我们先创建一个connection和channel,并声明一个接收消息的队列。注意一点,这里的队列名要和Send的相匹配。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
在这里我们同样声明了一个队列,因为我们可能在启动发送端之前就先启动了接收端,在我们开始接收消息之前,我们要先确认队列是否存在。
我们告诉RabbitMQ通过这个队列给我们发送消息,因此RabbitMQ会异步的给我们推送消息,我们需要提供一个回调对象用来缓存消息,直到我们准备使用它。这就是DefaultConsumer所做的事情。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是Recv.java类的所有代码
3、代码整合
编译并运行以上程序,你会看到接收端将会收到并打印出来自发送端的“Hello World!”消息。
你可以在类路径下编译这些文件:
$ javac -cp rabbitmq-client.jar Send.java Recv.java
为了运行它们,你需要rabbitma-client.jar和它的依赖文件。在一个终端运行发送者:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
然后运行接收者:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
在windows环境中,我们使用分号代替冒号来分隔classpath上的选项。
Hint
你可以设置一个环境变量到classpath中:
$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar $ java -cp $CP Send在windows上:
> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar > java -cp %CP% Send
4、个人补充
(1)登录认证
在实际使用中,RabbitMQ并不是简单的通过指定一个IP就可以进行连接的,它还需要指定端口号、用户名和密码。就像是这样:
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
当没有明确指明登录认证信息的时候,就会使用默认值来进行登录,以上都是默认的认证信息。
另外,我们也可以通过设置URI来进行连接,URI的格式如下:
amqp://username:password@host:port
factory.setUri("amqp://guest:guest@localhost:5672");
二、工作队列
在第一部分中,我们的程序结构非常简单。在接下来,我们将会创建一个工作队列,向多个消费者分发任务。
1、准备
在之前的程序中,我们发送了一个包含“Hello World!”的消息,现在我们发送一些字符串用来表示复杂的任务。在这里,因为我们没有真正意义上的任务,比如调整图片的大小或者渲染pdf文件,所以我们通过Thread.sleep()函数来模拟程序的处理时间。我们用字符串中的句号来代表它的复杂度,每一个句号表示要花费一秒的工作时间。例如,一个“Hello…”的字符串就表示它需要3秒钟的处理时间。
我们会稍微修改我们之前程序中的Send.java代码,使其允许发送包含任意内容的消息。这个程序将会定时向队列中发送任务,我们把它命名为NewTask.java:
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
一些帮助我们从命令行的参数中获取消息的函数:
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
我们的Recv.java程序也需要作出一些修改:它需要从队列中获取消息,并统计消息中有多少个“.”,然后sleep相应的时间。我们把它取名为Worker.java:
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == ‘.‘) Thread.sleep(1000);
}
}
编译以上程序:
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
2、轮询分发
使用任务队列的优势之一是任务的并行处理。如果现在积压了一大堆任务,我们仅需要添加更多的消费者即可,这是很容易扩展的。
首先,我们尝试同时运行两个消费者实例,他们都会从队列里去获取消息,如下。
你需要打开三个控制台,其中两个用来运行消费者程序,分别称为C1和C2
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
第三个控制台用来运行生产者程序。一旦你开启了消费者程序,就可以启动生产者了:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....
让我们看看消费者得到了什么消息:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘First message.‘
[x] Received ‘Third message...‘
[x] Received ‘Fifth message.....‘
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Second message..‘
[x] Received ‘Fourth message....‘
默认情况下,RabbitMQ会一直把消息发送给下一个消费者,平均情况下,每个消费者获得的消息是一样多的。这种分发方式叫做轮询,你可以尝试运行更多的消费者。
3、消息确认
处理一个任务可能需要花费数秒时间,你可能会好奇如果一个消费者执行了一个长任务,并且在完成处理前就挂了的情况下会发送什么事。就拿我们当前的代码来说,一旦RabbitMQ将消息传递给消费者,消息就会从内存中删除。在这种情况下,如果你强行关闭正在运行的消费者,那么它正在处理的消息就会丢失。那些发送给这个消费者但还没有开始处理的消息也会一并丢失。
但是,我们并不想丢失任何消息。实际上,如果一个消费者挂了,我们更希望将消息传递给其他的消费者消费。
为了保证消息不会丢失,RabbitMQ支持消息确认(acknowledgments)机制。Ack是由消费者发送的,用来告诉RabbitMQ这个消息已经接收到并处理完成,可以从内存中删除它了。
如果一个消费者没有发送ack就挂了,RabbitMQ会认为这个消息没有处理完成并将消息重新入队。如果这时有其他消费者在运行,它将会把这个消息发送给另一个消费者。通过这种方式,即使消费者挂了,也可以确保消息不会丢失。
在这里不存在超时的概念,只有在消费者挂了的情况下,RabbitMQ才会重发消息,否则就会一直等待消息的处理,即使需要花费很长的时间来处理。
消息确认机制默认情况下是开启的。现在我们要关闭它,改为手动提交确认信号。当处理完一个任务后,我们将手动提交。
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
通过这段代码,我们可以保证即使你通过Ctrl+C来关闭一个消费者,也不会丢失任何消息。那些没有发送确认信号的消息将会很快被重发。
忘记发送确认信号
忘记写basicAck是一个很普遍的错误,但是这会产生严重的后果。当你的客户端退出后,所有的消息将会被重新发送,RabbitMQ会越来越占内存,因为它不会删除那些没有发送确认信号的消息。
想要调试这种类型的错误,你可以使用rabbitmqctl打印出messages_unacknowledged属性:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
4、消息持久化
我们已经学习了如何保证在消费者挂了的情况下,消息也不会丢失。但是如果RabbitMQ挂了,我们的消息仍然会丢失。
当RabbitMQ退出或崩溃时,它将会丢失所有队列和消息,除非你让它不要这么做。通过两个方面可以保证消息不会丢失:对消息和队列进行持久化处理。
首先,我们需要保证RabbitMQ不会丢失我们的队列。为此,我们需要将一个队列声明为一个持久化的队列:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
虽然这个代码是正确的,但是它却不会执行,因为我们已经定义了一个非持久化的hello队列。RabbitMQ不允许使用不同的参数去重新定义一个已经存在的队列,如果你强行这样做,它将会返回一个错误。有一个快速的解决方案,就是重新声明一个不同名字的队列,比如task_queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
需要在生产者和消费者的代码中对queuqDeclare进行修改。
至此,我们可以保证即使RabbitMQ重启,task_queue队列也不会丢失。现在,我们需要对消息进行持久化,通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN即可。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
注意:
消息的持久化并不能完全保证消息不会丢失,虽然它会告诉RabbitMQ把消息保存在硬盘上,但是从接收消息到保存消息之间,还是需要一定的时间的。同样,RabbitMQ没有对每个消息做fsync(2)——消息仅仅存在于缓存中,并没有真正的写入硬盘。所以这个持久化并不是健壮的,但是对于简单的工作队列来说已经完全足够了。如果你需要更强大的持久化的话,你可以考虑使用publisher confirms机制。
5、公平分发
你可能会注意到,分发的过程并不是我们所希望的那样。例如在某一情况下有两个消费者,RabbitMQ默认将序号为奇数的消息发送给第一个消费者,序号为偶数的消息发送给另一个消费者,当序号为奇数的消息都是一些很耗时的任务,而序号为偶数的消息都是一些小任务,那么就会造成第一个消费者一直处于忙碌状态,而另一个消费者处理完毕后会处于空等待的状态。RabbitMQ并不会在意这些事情,它只会一直均匀的分发消息。
这种情况的发生是因为RabbitMQ只负责分发消息,它并不关心一个消费者返回了多少个确认信号(即处理了多少条消息),它只是盲目的将第n条消息往第n个消费者发送罢了。
为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。它将会告诉RabbitMQ不要同时给一个消费者超过一个任务,换句话说,就是在一个消费者发送确认信息之前,不要再给它发送新消息了。取而代之的是将消息发送给下一个空闲的消费者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意队列的大小
如果所有的消费者都处于繁忙中,你的队列很快就会被占满。你需要注意这件事,并且添加更多的消费者或者通过其他策略来解决他。
6、代码整合
最终的NewTask.java如下:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
channel.close();
connection.close();
}
//...
}
Worker.java的代码:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received ‘" + message + "‘");
doWork(message);
System.out.println(" [x] Done" );
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
三、发布和订阅
在之前的示例中,我们创建了一个工作队列,在这之前,我们都假设每一个消息都准确的发送到一个消费者那里。在接下来,我们将做一些完全不同的事情——将一个消息发送到多个消费者,这种模式被称为发布和订阅模式。
为了说明这个模式,我们将会创建一个简单的日志系统,它由两部分程序组成,一个是发送日志消息,另一个是接收并打印日志消息。
在我们的日志系统中,每一个运行的接收程序都会获取一个消息的拷贝副本。通过这种方式,我们可以让一个消费者把日志记录到硬盘中,同时可以让另一个消费者把日志输出到屏幕上。
在本质上,发送日志消息相当于广播到所有接收者。
1、交换机
在之前,我们都是直接从一个队列中发送或获取消息。现在是时候介绍RabbitMQ中的full messaging模型了。
让我们快速复习下在先前部分中我们所学的知识:
- 一个发送消息的生产者是一个用户程序。
- 一个存储消息的队列是一个缓冲区。
- 一个接收消息的消费者是一个用户程序。
在RabbitMQ的消息模型中,核心思想是生产者不直接将消息发送给队列。实际上,生产者甚至完全不知道消息会被发送到哪个队列中。
相反,生产者只能将消息发送到交换机上。交换机是一个非常简单的东西,它一边从生产者那里接收消息,一边向队列推送消息。交换机必须确切的知道它想要把消息发送给哪些接收者。例如是否发送到一个特定的队列中?还是发送给很多个队列?或者是把消息丢弃等等。这些东西都通过交换机的类型来规定。
交换机的类型包括:direct, topic, headers和fanout。我们先关注fanout。让我们先创建一个这种类型的交换机,并称呼它为logs:
channel.exchangeDeclare("logs", "fanout");
fanout类型的交换机非常简单,通过它的名字,你可能已经猜出它的用处了。它将会以广播的方式把接收到的消息发送到它所知道的队列中去,这个正是我们所需要的。
交换机列表
通过使用rabbitmqctl命令,我们可以列出服务器中的所有交换机:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.在这个列表里,有一些以amq.*开头的交换机和默认(没有名字)的交换机。这些都是默认创建的,而且你不太可能会使用到它们。
匿名交换机
在之前的部分中,我们对交换机毫无概念,但仍然能将消息发送到队列中,那是因为我们使用了默认的交换机,通过使用空串(”“)来标识它。
回想一下之前是如何发送消息的:
channel.basicPublish("", "hello", null, message.getBytes());
这里的第一个参数就是交换机的名字。空串表示它是默认交换机或者是匿名交换机:如果routingKey存在的话,消息将通过routingKey路由到特定的队列中去。
现在,我们定义自己命名的交换机:
channel.basicPublish( "logs", "", null, message.getBytes());
2、临时队列
你可能会想起我们之前使用的队列是有特定的名称的(hello和task_queue)。对于我们来说,为一个队列命名是非常有必要的,我们需要指定多个消费者到同一个队列获取消息。当你想在多个生产者和消费者之间共用一个队列,那么为队列命名就非常重要了。
但是对于我们目前来说还不需要。我们想要监听所有日志消息,而不是它的一个子集。同样,我们只会对最新的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要做两件事。
第一,无论什么时候连接RabbitMQ,我们都需要一个新的空队列。为了这样做,我们会创建一个随机的名字,或者直接让服务器给我们一个随机的名字。
第二,一旦我们与消费者断开,队列应该被自动删除。
在Java中,当我们使用无参的queueDeclare(),它将会为我们创建一个非持久化的、专用的、自动删除、带随机名字的队列:
String queueName = channel.queueDeclare().getQueue();
在这里,队列名queueName的值是一个随机产生的字符串,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg。
3、绑定
我们已经创建一个fanout类型的交换机和一个队列。现在我们需要告诉交换机发送消息到我们的队列里。在这里,把交换机和队列之间的关系称为绑定。
channel.queueBind(queueName, "logs", "");
从现在开始,logs交换机将会把消息发送到我们的队列中去。
绑定列表
你可以通过使用rabbitmqctl list_bindings来列出所有绑定。
4、代码整合
生产者程序负责发送日志消息,看起来跟以前的代码没有什么区别。最重要的改变是,现在我们把消息发送到我们的logs交换机中,而不是匿名交换机。在发送时我们需要指定一个routingKey,但是在使用fanout类型的交换机时,routingKey的值会被忽略。这是我们的EmitLog.java程序:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "‘");
channel.close();
connection.close();
}
//...
}
正如你所看到的,在建立一个connection之后,我们声明了一个交换机。这个步骤是必须的,因为RabbitMQ禁止把消息发送到一个不存在的交换机。
如果交换机上没有绑定任何队列的话,消息将会被丢弃。但是这个对我们来说是可以接受的,如果没有消费者监听,我们可以安全的丢弃消息。
ReceiveLogs.java的代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
像之前那样编译程序,我已经编译完成了。
$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果你想要把日志保存到文件中,你只需要打开控制台并输入:
$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果你希望在屏幕中看到日志,新建一个终端并运行:
$ java -cp .:rabbitmq-client.jar ReceiveLogs
当然,为了发送日志,输入:
$ java -cp .:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings,你可以验证这代码确实创建和绑定了我们想要的队列。随着两个ReceiveLogs.java程序的运行,你可以看到如下的信息:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
这个结果简单明了:数据从logs交换机发送到服务器安排的两个队列中去,那正是我们所期望的。
四、路由
在上一节,我们建立了一个简单的日志系统,我们可以将日志信息广播到多个接收者那里去。
接下来,我们将要实现只订阅部分消息。例如,我们只将error类型的日志信息保存到硬盘中去,当然依旧在控制台中打印出所有的日志信息。
1、绑定
在之前的例子中,我们已经建立了绑定,你可以回顾一下代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一个交换机跟一个队列之间的绑定,可以理解为:就是这个队列只对这个交换机发送过来的消息感兴趣。
绑定还有另外的一个routingKey参数,为了避免跟basic_publish的参数搞混,我们把它称为binding key。以下是我们通过一个key创建的一个绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
这个binding key的意义取决于交换机的类型,我们之前使用的fanout类型的交换机是会忽略它的值的。
2、direct交换类型
我们之前的日志系统是将所有消息广播到所有消费者那里去。我们想要扩展它,让它能根据消息的严重性来进行过滤。例如,我们只把error类型的错误日志记录到硬盘中去,而不是将硬盘空间浪费在warning或info类型的日志上。
我们使用的fanout类型的交换机没有太多的灵活性,它只能无脑的进行广播。
所以我们将会使用direct类型的交换机来替换它。direct类型交换机的路由规则很简单——它只将消息发送到那些binding key和routing key完全匹配的队列中去。
为了说明清楚,我们看一下下面的结构图:
在这里,我们可以看到direct类型的交换机X绑定了两个队列。第一个队列通过一个orange关键字来绑定,第二个队列绑定了两个关键字,分别为black和green。
通过这样的绑定,当交换器接收到routing key为orange的消息的时候,就会发送到队列Q1中去;当接收到 routing key为black或green的消息的时候,就会发送到队列Q2中去;其他类型的消息则会被丢弃。
3、多重绑定
用一个binding key绑定多个队列是合法的,在上面的例子中,我们还可以使用black将X和Q1绑定起来。在这里,direct类型的交换机就跟fanout类型的交换机一样,会把消息发送到所有的匹配的队列中去。一个routing key为black的消息将会被发送到Q1和Q2中去。
4、发送日志
我们将会在我们的日志系统中使用这种模型,使用direct类型的交换机来替换fanout类型的交换机。我们可以把routing key作为日志消息的严重级别,通过这种方式,接收程序就可以选择对应的级别进行接收。总之,让我们先看一下如何发送日志:
首先,我们需要创建一个交换机:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
在这里为了简化,我们先约定日志的级别分为info、warning和error三种。
5、订阅
接收消息的部分跟之前的程序没有什么不同。唯一的区别就是我们要为队列创建它感兴趣的binding key。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
6、代码整合
EmitLogDirect.java的代码:
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘");
channel.close();
connection.close();
}
//..
}
ReceiveLogsDirect.java的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
像之前那样编译它们。为了方便,当我们运行实例时,我们使用一个$CP的环境变量来表示类的路径。
如果你只想保存warning和error类型的日志到文件里去,你可以打开控制台,并输入:
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果你想要在屏幕中看到所有的日志信息,你可以打开一个新的终端并做如下操作:
$ java -cp $CP ReceiveLogsDirect info warning error
[*] Waiting for logs. To exit press CTRL+C
另外,例如想要发送error类型的日志信息,可以输入:
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
[x] Sent ‘error‘:‘Run. Run. Or it will explode.‘
五、topic类型
在上一部分中,我们改进了我们的日志系统,使用direct类型的交换机代替fanout类型的交换机,得到了一个具有选择性的日志接收系统。
虽然使用了direct类型的交换机改进了我们的系统,但是它依然不完善,它不能根据多个条件进行路由。
在我们的日志系统中,我们可能不仅想要得到各种级别的日志,还想要得到日志的发送源。你可能从syslog unix tool了解过这个概念,它基于severity(info/warn/crit…) 和facility (auth/cron/kern…)来路由日志信息。
那将会给我们带来更多的灵活性,因为我们可能只需要监听那些来自cron的critical日志,而不是所有的kern日志。
为了在我们的日志系统中实现此功能,我们需要使用更加复杂的类型——topic类型的交换机
1、topic类型的交换机
发送到topic类型的交换机的消息不能是随意的routing key,它必须是一个通过点分隔的字符串列表。字符串的内容什么都可以,但是我们一般会给它一个有意义的名字。例如一些合法的routing key就像这样:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。Routing key中你想要写多少个单词都可以,它的长度上线是255个字节。
Binding key也是一样。Topic类型的交换机的逻辑跟direct类型的非常相似,它也是把消息发送到与routing key匹配的队列中去。然而,它的binding key却有两种非常特殊的用法:
- *(星号)能代替任何一个单词。
- #(井号)能代替0个或任意多个单词。
下面这个例子很容易理解:
在这个例子中,我们发送的消息都是用来描述动物的。每个消息的routing key都包含三个单词,第一个单词用来描述动物的速度,第二个用来表示颜色,第三个用来表示物种:”..”。
我们为交换机和队列创建三个绑定关系,队列Q1通过”*.orange.*”来绑定,队列Q2通过 “*.*.rabbit”和”lazy.#”来绑定。
这些绑定关系可以概括为:
- Q1只对橙色的动物感兴趣。
- Q2想要了解兔子和那些行动缓慢的动物。
一个routing key为”quick.orange.rabbit”的消息将会被发送到所有队列。带有”lazy.orange.elephant”的消息也会被发往所有队列中去。而对于”quick.orange.fox”来说,它只会被发送到Q1队列中,”lazy.brown.fox”只会被发送到Q2队列中。而”lazy.pink.rabbit”只会被发送到Q2队列中一次,尽管它匹配了Q2队列的两个routing key。对于”quick.brown.fox”来说,没有routing key和它匹配,所以它会被丢弃。
如果我们不遵守之前的约定,发送一条只带一个单词或四个单词的消息,例如”orange”或”quick.orange.male.rabbit”,会发生什么事呢?好吧,这些消息都会因为没有找到匹配routing key而被丢弃。
另一方面,对于”lazy.orange.male.rabbit”来说,虽然它包含四个单词,但是它却和最后一个routing key(”lazy.#”)匹配,所以它会被发送到Q2队列中去。
Topic类型的交换机
Topic类型的交换机的功能十分强大,可以媲美其他类型的交换机。
当一个队列的binding key为一个“#”,它将接收所有消息,就像fanout类型的交换机一样。
当binding key中没有包含“*”或“#”字符的话,topic类型的交换机就相当于direct类型的交换机。
2、代码整合
我们将在我们的日志系统中使用topic类型的交换机。我们假设日志消息的routing key通过两个单词组成,就像”.”。
代码几乎跟先前的一样。
EmitLogTopic.java的代码如下:
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘");
connection.close();
}
//...
}
ReceiveLogsTopic.java的代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
接收所有日志类型:
$ java -cp $CP ReceiveLogsTopic "#"
从”kern”那接收日志信息:
$ java -cp $CP ReceiveLogsTopic "kern.*"
或者只接收“critical”类型的日志:
$ java -cp $CP ReceiveLogsTopic "*.critical"
你还可以建立多个绑定关系:
$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
或者直接发送routing key为”kern.critical”类型的日志:
$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
<p><b>一些有趣的问题:</b></p>
- "*"会匹配到routing key为空的消息吗?
- “#.*”会匹配到“. .”吗?或者只匹配一个单词吗?
- “a.*.#”和“a.#“有什么不同?