RabbitMQ之六种队列模式

  先学习一下RabbitMQ中的六种队列,只学习前五种,具体的官方文档地址是:http://next.rabbitmq.com/getstarted.html

导入maven依赖:

1 <dependency>
2     <groupId>com.rabbitmq</groupId>
3     <artifactId>amqp-client</artifactId>
4     <version>3.4.1</version>
5 </dependency>

一、简单队列

1、图示

P:消息的生产者

C:消息的消费者

红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

2、获取MQ的连接

 1 public static Connection getConnection() throws Exception {
 2         //定义连接工厂
 3         ConnectionFactory factory = new ConnectionFactory();
 4         //设置服务地址
 5         factory.setHost("localhost");
 6         //端口
 7         factory.setPort(5672);
 8         //设置账号信息,用户名、密码、vhost
 9         factory.setVirtualHost("/taotao");
10         factory.setUsername("taotao");
11         factory.setPassword("taotao");
12         // 通过工程获取连接
13         Connection connection = factory.newConnection();
14         return connection;
15     }

3、生产者发送消息到队列

 1 public class Send {
 2
 3     private final static String QUEUE_NAME = "test_queue";
 4
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         // 从连接中创建通道
 9         Channel channel = connection.createChannel();
10
11         // 声明(创建)队列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13
14         // 消息内容
15         String message = "Hello World!";
16         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
17         System.out.println(" [x] Sent ‘" + message + "‘");
18
19         //关闭通道和连接
20         channel.close();
21         connection.close();
22     }
23 }

4、管理工具中查看消息

点击上面的队列名称,查询具体的队列中的信息:

5、消费者从队列中获取消息

 1 public class Recv {
 2
 3     private final static String QUEUE_NAME = "test_queue";
 4
 5     public static void main(String[] argv) throws Exception {
 6
 7         // 获取到连接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10
11         // 声明队列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13
14         // 定义队列的消费者
15         QueueingConsumer consumer = new QueueingConsumer(channel);
16         // 监听队列
17         channel.basicConsume(QUEUE_NAME, true, consumer);
18
19         // 获取消息
20         while (true) {
21             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
22             String message = new String(delivery.getBody());
23             System.out.println(" [x] Received ‘" + message + "‘");
24         }
25     }
26 }

二、 Work模式

1、图示

一个生产者、2个消费者。

一个消息只能被一个消费者获取。

2、消费者1

 1 public class Recv {
 2
 3     private final static String QUEUE_NAME = "test_queue_work";
 4
 5     public static void main(String[] argv) throws Exception {
 6
 7         // 获取到连接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10
11         // 声明队列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13
14         // 同一时刻服务器只会发一条消息给消费者
15         //channel.basicQos(1);
16
17         // 定义队列的消费者
18         QueueingConsumer consumer = new QueueingConsumer(channel);
19         // 监听队列,手动返回完成
20         channel.basicConsume(QUEUE_NAME, false, consumer);
21
22         // 获取消息
23         while (true) {
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
25             String message = new String(delivery.getBody());
26             System.out.println(" [x] Received ‘" + message + "‘");
27             //休眠
28             Thread.sleep(10);
29             // 返回确认状态
30             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31         }
32     }
33 }

3、消费者2

 1 public class Recv2 {
 2
 3     private final static String QUEUE_NAME = "test_queue_work";
 4
 5     public static void main(String[] argv) throws Exception {
 6
 7         // 获取到连接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10
11         // 声明队列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13
14         // 同一时刻服务器只会发一条消息给消费者
15         //channel.basicQos(1);
16
17         // 定义队列的消费者
18         QueueingConsumer consumer = new QueueingConsumer(channel);
19         // 监听队列,手动返回完成状态
20         channel.basicConsume(QUEUE_NAME, false, consumer);
21
22         // 获取消息
23         while (true) {
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
25             String message = new String(delivery.getBody());
26             System.out.println(" [x] Received ‘" + message + "‘");
27             // 休眠1秒
28             Thread.sleep(1000);
29
30             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31         }
32     }
33 }

4、生产者

 1 public class Send {
 2
 3     private final static String QUEUE_NAME = "test_queue_work";
 4
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9
10         // 声明队列
11         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12
13         //向队列中发送50条消息
14         for (int i = 0; i < 50; i++) {
15             // 消息内容
16             String message = "" + i;
17             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
18             System.out.println(" [x] Sent ‘" + message + "‘");
19
20             Thread.sleep(i * 10);
21         }
22
23         channel.close();
24         connection.close();
25     }
26 }    

5、测试结果

测试结果:

1、  消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。

2、  消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。

其实,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。

6、Work模式的“能者多劳”

测试:

消费者1比消费者2获取的消息更多。

这种是比较符合实际情况的,能者多劳,RabbitMQ客户端同一时刻只会给消费者发送一条消息,消费者拿到消息后,重新向客户端拿消息,做的快的消费者要的消息多,同理,做的慢的消费者要的消息少,就体现出来能者多劳的机制了。

7、消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

模式1:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

模式2:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

7.1 手动模式

7.2 自动模式

三、订阅模式

1、图示

解读:

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

注释:Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

2、消息的生产者(看作是后台系统)

向交换机中发行消息

 1 public class Send {
 2
 3     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 4
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9
10         // 声明exchange
11         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
12
13         // 消息内容
14         String message = "商品已经被更新,id=1001";
15         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
16         System.out.println(" 后台系统: ‘" + message + "‘");
17
18         channel.close();
19         connection.close();
20     }
21 }

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

3、消费者1(看作是前台系统)

 1 public class Recv {
 2
 3     private final static String QUEUE_NAME = "test_queue_ps_1";
 4
 5     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 6
 7     public static void main(String[] argv) throws Exception {
 8
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
18
19         // 同一时刻服务器只会发一条消息给消费者
20         channel.basicQos(1);
21
22         // 定义队列的消费者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 监听队列,手动返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26
27         // 获取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" 前台系统: ‘" + message + "‘");
32             Thread.sleep(10);
33
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

4、消费者2(看作是搜索系统)

 1 public class Recv2 {
 2
 3     private final static String QUEUE_NAME = "test_queue_ps_2";
 4
 5     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 6
 7     public static void main(String[] argv) throws Exception {
 8
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
18
19         // 同一时刻服务器只会发一条消息给消费者
20         channel.basicQos(1);
21
22         // 定义队列的消费者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 监听队列,手动返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26
27         // 获取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" 搜索系统: ‘" + message + "‘");
32             Thread.sleep(10);
33
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

5、测试

测试结果:

同一个消息被多个消费者获取。

在管理工具中查看队列和交换机的绑定关系:

6、使用订阅模式能否实现商品数据的同步?

答案:可以的。

后台系统就是消息的生产者。

前台系统和搜索系统是消息的消费者。

后台系统将消息发送到交换机中,前台系统和搜索系统都创建自己的队列,然后将队列绑定到交换机,即可实现。

消息,新增商品、修改商品、删除商品。

前台系统:修改商品、删除商品。

搜索系统:新增商品、修改商品、删除商品。

所以使用订阅模式实现商品数据的同步并不合理。

四、路由模式

1、图示

注释:Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。

2、生产者

 1 public class Send {
 2
 3     private final static String EXCHANGE_NAME = "test_exchange_direct";
 4
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9
10         // 声明exchange
11         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
12
13         // 消息内容
14         String message = "商品删除,id=1002";
15         channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
16         System.out.println(" 后台系统: ‘" + message + "‘");
17
18         channel.close();
19         connection.close();
20     }
21 }

3、消费者1(前台系统)

 1 public class Recv {
 2
 3     private final static String QUEUE_NAME = "test_queue_direct_1";
 4
 5     private final static String EXCHANGE_NAME = "test_exchange_direct";
 6
 7     public static void main(String[] argv) throws Exception {
 8
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
18         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
19
20         // 同一时刻服务器只会发一条消息给消费者
21         channel.basicQos(1);
22
23         // 定义队列的消费者
24         QueueingConsumer consumer = new QueueingConsumer(channel);
25         // 监听队列,手动返回完成
26         channel.basicConsume(QUEUE_NAME, false, consumer);
27
28         // 获取消息
29         while (true) {
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
31             String message = new String(delivery.getBody());
32             System.out.println(" 前台系统: ‘" + message + "‘");
33             Thread.sleep(10);
34
35             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
36         }
37     }
38 }

4、消费者2(搜索系统)

 1 public class Recv2 {
 2
 3     private final static String QUEUE_NAME = "test_queue_direct_2";
 4
 5     private final static String EXCHANGE_NAME = "test_exchange_direct";
 6
 7     public static void main(String[] argv) throws Exception {
 8
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
18         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
19         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
20
21         // 同一时刻服务器只会发一条消息给消费者
22         channel.basicQos(1);
23
24         // 定义队列的消费者
25         QueueingConsumer consumer = new QueueingConsumer(channel);
26         // 监听队列,手动返回完成
27         channel.basicConsume(QUEUE_NAME, false, consumer);
28
29         // 获取消息
30         while (true) {
31             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
32             String message = new String(delivery.getBody());
33             System.out.println(" 搜索系统: ‘" + message + "‘");
34             Thread.sleep(10);
35
36             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
37         }
38     }
39 }

五、通配符模式

1、图示

注释:Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:

2、生产者

 1 public class Send {
 2
 3     private final static String EXCHANGE_NAME = "test_exchange_topic";
 4
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9
10         // 声明exchange
11         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
12
13         // 消息内容
14         String message = "商品删除,id=1003";
15         channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
16         System.out.println(" 后台系统: ‘" + message + "‘");
17
18         channel.close();
19         connection.close();
20     }
21 }

3、消费者1(前台系统)

 1 public class Recv {
 2
 3     private final static String QUEUE_NAME = "test_queue_topic_1";
 4
 5     private final static String EXCHANGE_NAME = "test_exchange_topic";
 6
 7     public static void main(String[] argv) throws Exception {
 8
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
18         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
19
20         // 同一时刻服务器只会发一条消息给消费者
21         channel.basicQos(1);
22
23         // 定义队列的消费者
24         QueueingConsumer consumer = new QueueingConsumer(channel);
25         // 监听队列,手动返回完成
26         channel.basicConsume(QUEUE_NAME, false, consumer);
27
28         // 获取消息
29         while (true) {
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
31             String message = new String(delivery.getBody());
32             System.out.println(" 前台系统: ‘" + message + "‘");
33             Thread.sleep(10);
34
35             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
36         }
37     }
38 }

4、消费者2(搜索系统)

 1 public class Recv2 {
 2
 3     private final static String QUEUE_NAME = "test_queue_topic_2";
 4
 5     private final static String EXCHANGE_NAME = "test_exchange_topic";
 6
 7     public static void main(String[] argv) throws Exception {
 8
 9         // 获取到连接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12
13         // 声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15
16         // 绑定队列到交换机
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
18
19         // 同一时刻服务器只会发一条消息给消费者
20         channel.basicQos(1);
21
22         // 定义队列的消费者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 监听队列,手动返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26
27         // 获取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" 搜索系统: ‘" + message + "‘");
32             Thread.sleep(10);
33
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

原文地址:https://www.cnblogs.com/ssh-html/p/10542841.html

时间: 2024-07-31 18:43:50

RabbitMQ之六种队列模式的相关文章

RabbitMQ六种队列模式-简单队列模式

前言 RabbitMQ六种队列模式-简单队列 [本文]RabbitMQ六种队列模式-工作队列RabbitMQ六种队列模式-发布订阅RabbitMQ六种队列模式-路由模式RabbitMQ六种队列模式-主题模式 在官网的教程中,描述了如上六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列.也称为点对点模式 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者.同样也称为点对点模式 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个

RabbitMQ (消息队列)专题学习07 RPC

(使用Java客户端) 一.概述 在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC. 在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数. 1.1.客户端接口(Client Interface) 为了说明一个RPC

基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景

前言 传统处理超时订单 采取定时任务轮训数据库订单,并且批量处理.其弊端也是显而易见的:对服务器.数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好 当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作 jdk延迟队列 DelayQueue 采取jdk自带的延迟队列能很好的优化传统的处理方案,但是该方案的弊.端也是非常致命的,所有的消息数据都是存于内存之中,一旦

RabbitMq 6种使用模式

RabbitMQ的5种模式与实例 1.1 简单模式Hello World 功能:一个生产者P发送消息到队列Q,一个消费者C接收 生产者实现思路: 创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名.密码.virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接. 消费者实现思路 创建连接工厂Connec

RabbitMQ五种工作模式学习总结

一.简介最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件.本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结.RabbitMQ常用的工作模式有:简单队列模式.工作队列模式.发布订阅模式.路由模式.主题模式.本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html. 二.简单队列模式(Simple Queue) [a]模型图:只包含一个生产者以及一个消费者,生产者Producer将消息

C#调用RabbitMQ实现消息队列

我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的. 那么,接收数据的方式是什么呢?自然是端口监听啦. 那消息队列是什么就很好解释了? 它就是端口监听,接到数据后,将数据排列起来. 那这件事,我们不用中间件能做吗? 当然能做啦,写个TCP/

RabbitMQ分布式消息队列服务器(一、Windows下安装和部署)

RabbitMQ消息队列服务器在Windows下的安装和部署-> 一.Erlang语言环境的搭建 RabbitMQ开源消息队列服务是使用Erlang语言开发的,因此我们要使用他就必须先进行Erlang语言环境的搭建,其实是非常简单的. 登录Erlang官网,进入下载页,官网地址->http://www.erlang.org/downloads 然后按照自己的系统环境来选择需要下载的安装文件. 我选择 64-bit下载包,因为我的操作系统是64位的 接下来我们需要对Erlang语言的环境变量的配

RabbitMQ (消息队列)专题学习06 Topic

(使用Java客户端) 一.概述 在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接.并且可以有选择的接收消息. 尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息. 在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源. 这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而

如何基于RabbitMQ实现优先级队列

概述 由于种种原因,RabbitMQ到目前为止,官方还没有实现优先级队列,只实现了Consumer的优先级处理. 但是,迫于种种原因,应用层面上又需要优先级队列,因此需求来了:如何为RabbitMQ加入优先级队列特性. 查询资料后,得知RabbitMQ虽然官方没有支持此特性,但是社区已经有相关优先级队列插件了,并且这个插件被列在RabbitMQ官方网站中了. 地址如下:http://www.rabbitmq.com/community-plugins.html 插件安装 不要立刻下载这个url中