一、概述
RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。
RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。
在专题学习一中我们已经简单提到了一些概念,在此我们更为深入的学习下RabbitMQ相关的专有名词。
1、生产(Producing):意思就是发送,发送消息的程序就是一个生产者(Producer),我们一般使用P来标示,如下图-1所示:
图-1
2、队列(queue)就是邮箱的名称,消息通过你的应用程序和RabbitMQ服务器进行传递,它们能够存储在队列(queue)中,队列(queue)没有任何限制,你要存储多少消息都可以---queue基本上是一个无限的缓冲区,而且多个生产者(Producers)能够把消息发送给同一个队列,同样多个消费者(consumer)也能够从同一个队列(queue)中获取数据,如下图-2所示表示的就是一个队列:
图-2
3、消费(consuming),它和获取消息是一个意思,一个消费者(sonsumer)就是一个等待获取消息的程序,我们通常用C来表示,如下图-3所示:
图-3
二、实现Hello World
由于RabbitMQ支持多种语言,诸如Java、Python、Ruby、PHP、C#等等,实现了对每种语言的Client接口,RabbitMQ自己充当服务器,所以在专题一的架构中可以看出,不管发送消息的生产者和接收消息的消费者使用何种语言,都充当客户端的角色,故在此我们使用Java语言的客户端(消息发送者和消息接受者)。
在这部分学习中我们将实现两个Java程序:一个是发送单个消息的生产者,一个是接收消息并在控制台打印消息的消费者,我们将忽略一些JavaAPI的细节,把所有的精力都放在将要开始实现生产者和消费者上---它就是一个关于Hello World的消息队列的实现。
在下图-4中,P是消息的生产者,C是消息的消费者(接收者),中间部分就是队列(queue)---rabbitMQ用来保存消息的缓冲区域。
图-4
大致过程:生产者(Producer)把消息发送到一个名为”hello“的队列中,消费者(Consumer)从这个队列中获取消息。
注意:在编程的时候要让客户端和服务器的AMQP的版本一致,不一致会报错,如果不一致主要去找对应版本的客户端即可,这样就不用再费劲去重装服务器端了。
2.1、发送消息
图-5
下面实现调用消息发送者发送消息和消息接收者接收消息,发送者首先将建立与RabbitMQ的连接,发送一条消息,然后退出。
Send.java中,我们首先需要导入如下类文件:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
发送消息的过程如下:
1、创建连接(Connection)
2、创建通道(Channel),声明队列(当队列存在时就获取队列,不存在时就创建队列)。
3、发送消息
完成的发送者代码清单如下:
package com.xuz.send; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender01 { public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); //RabbitMQ-Server安装在本机,所以直接用127.0.0.1 factory.setHost("127.0.0.1"); //创建一个连接 Connection conn = factory.newConnection(); //创建一个通信通道 Channel channel = conn.createChannel(); //定义Queue名称 String queueName = "hello"; //为Channel定义queue的属性,queueName为queue名称 channel.queueDeclare(queueName, false, false,false,null); String msg = "Hello World!xuzheng test!"; //发送消息 channel.basicPublish("", queueName, null, msg.getBytes()); System.out.println("send message["+msg+"] to "+queueName+" success!"); //关闭通道 channel.close(); //关闭连接 conn.close(); } }
2.2、接收消息
图-6
接收消息的过程如下:
1、创建连接
2、创建通道、声明队列(还得在声明一次,原因在后面解释)
3、创建QueueConsumer用于缓存消息
4、循环接收消息(无消息阻塞,有消息处理)。
注意:声明队列的机制是:当队列存在时就获取队列,不存在时就创建队列,接收者可能比发送者更早创建,这时它需要保证存在一个队列,它能够从中取出消息。
完整接收者的代码如下:
package com.xuz.recv; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Recv01 { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); String queueName = "hello"; channel.queueDeclare(queueName, false, false, false, null); //以上部分和sender一样 //配置好获取消息得方式 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true,consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("received message["+msg+"] from "+queueName); } } }
说明:在上述代码中我们还是要连接RabbitMQ服务器,连接代码和发送端代码是一样的,创建通道也是一样的,我们需要确认队列是否存在,使用queue_declare创建一个队列,我们可以运行这个命令很多次,但是只有一个队列会被创建。
channel.queueDeclare(queueName, false, false, false, null);
你也许要问为什么重复声明了队列---我们已经在前面的代码中声明了它,如果我们确定了队列是否已经存在的,那么我们可以不这么做,比如先运行Sender01.java程序,可是我们并不确定哪个程序先运行,这种情况的话在程序中重复声明是好的做法。
factory.setHost("127.0.0.1");
表示我们的发送端和接收端都在安装RabbitMQ服务器的本机运行。
运行结果如下:
发送端:
接收端:
至此RabbitMQ入门程序HelloWorld消息队列实现完毕!
源码下载:
RabbitMQ (消息队列)专题学习02 Hello World