RabbitMQ 发布/订阅

  我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。

在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。本质上来说,就是发布的日志消息会转发给所有的接收者。

  1、转发器(Exchanges)

  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

  相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

  下面列出一些可用的转发器类型:

  Direct

  Topic

  Headers

  Fanout

  目前我们关注最后一个fanout,声明转发器类型的代码:

1 channel.exchangeDeclare("logs","fanout");

  fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。

  2、匿名转发器(nameless exchange)

  前面说到生产者只能发送消息给转发器(Exchange),我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

1 channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

  第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

  现在我们可以指定消息发送到的转发器:

1 channel.basicPublish( "logs","", null, message.getBytes());

  3、临时队列(Temporary queues)

  前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
  不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
  第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
  第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
  Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。

1 String queueName = channel.queueDeclare().getQueue();

  一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

  4、绑定(Bindings)

   

  我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
  channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

  5、完整的例子

  

  发送端:

 1 public class EmitLog
 2 {
 3     private final static String EXCHANGE_NAME = "ex_log";
 4
 5     public static void main(String[] args) throws IOException
 6     {
 7         // 创建连接和频道
 8         ConnectionFactory factory = new ConnectionFactory();
 9         factory.setHost("localhost");
10         Connection connection = factory.newConnection();
11         Channel channel = connection.createChannel();
12         // 声明转发器和类型
13         channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
14
15         String message = new Date().toLocaleString()+" : log something";
16         // 往转发器上发送消息
17         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
18
19         System.out.println(" [x] Sent ‘" + message + "‘");
20
21         channel.close();
22         connection.close();
23
24     }
25
26 }  

  接收端1:

 1 public class ReceiveLogsToSave
 2 {
 3     private final static String EXCHANGE_NAME = "ex_log";
 4
 5     public static void main(String[] argv) throws java.io.IOException,
 6             java.lang.InterruptedException
 7     {
 8         // 创建连接和频道
 9         ConnectionFactory factory = new ConnectionFactory();
10         factory.setHost("localhost");
11         Connection connection = factory.newConnection();
12         Channel channel = connection.createChannel();
13
14         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
15         // 创建一个非持久的、唯一的且自动删除的队列
16         String queueName = channel.queueDeclare().getQueue();
17         // 为转发器指定队列,设置binding
18         channel.queueBind(queueName, EXCHANGE_NAME, "");
19
20         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
21
22         QueueingConsumer consumer = new QueueingConsumer(channel);
23         // 指定接收者,第二个参数为自动应答,无需手动应答
24         channel.basicConsume(queueName, true, consumer);
25
26         while (true)
27         {
28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
29             String message = new String(delivery.getBody());
30
31             print2File(message);
32         }
33
34     }
35
36     private static void print2File(String msg)
37     {
38         try
39         {
40             String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
41             String logFileName = new SimpleDateFormat("yyyy-MM-dd")
42                     .format(new Date());
43             File file = new File(dir, logFileName+".txt");
44             FileOutputStream fos = new FileOutputStream(file, true);
45             fos.write((msg + "\r\n").getBytes());
46             fos.flush();
47             fos.close();
48         } catch (FileNotFoundException e)
49         {
50             e.printStackTrace();
51         } catch (IOException e)
52         {
53             e.printStackTrace();
54         }
55     }
56 }  

  随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。

 1 public class ReceiveLogsToConsole
 2 {
 3     private final static String EXCHANGE_NAME = "ex_log";
 4
 5     public static void main(String[] argv) throws java.io.IOException,
 6             java.lang.InterruptedException
 7     {
 8         // 创建连接和频道
 9         ConnectionFactory factory = new ConnectionFactory();
10         factory.setHost("localhost");
11         Connection connection = factory.newConnection();
12         Channel channel = connection.createChannel();
13
14         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
15         // 创建一个非持久的、唯一的且自动删除的队列
16         String queueName = channel.queueDeclare().getQueue();
17         // 为转发器指定队列,设置binding
18         channel.queueBind(queueName, EXCHANGE_NAME, "");
19
20         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
21
22         QueueingConsumer consumer = new QueueingConsumer(channel);
23         // 指定接收者,第二个参数为自动应答,无需手动应答
24         channel.basicConsume(queueName, true, consumer);
25
26         while (true)
27         {
28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
29             String message = new String(delivery.getBody());
30             System.out.println(" [x] Received ‘" + message + "‘");
31
32         }
33
34     }
35
36 }  

  随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。

   现在把两个接收端运行,然后运行3次发送端:

  输出结果:

  发送端:

[x] Sent ‘2014-7-10 16:04:54 : log something‘

[x] Sent ‘2014-7-10 16:04:58 : log something‘

[x] Sent ‘2014-7-10 16:05:02 : log something‘

  接收端1:

  接收端2:

[*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘2014-7-10 16:04:54 : log something‘
 [x] Received ‘2014-7-10 16:04:58 : log something‘
 [x] Received ‘2014-7-10 16:05:02 : log something‘

  这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。

  参考文档:http://blog.csdn.net/lmj623565791/article/details/37657225

时间: 2024-09-30 11:26:47

RabbitMQ 发布/订阅的相关文章

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

RabbitMQ 发布订阅持久化

RabbitMQ是一种重要的消息队列中间件,在生产环境中,稳定是第一考虑.RabbitMQ厂家也深知开发者的声音,稳定.可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化.Queue持久化及Message的持久化.以保证RabbitMQ在退出或Crash等异常情况下,消息不会丢失.RabbitMQ提供了简单的参数配置来实现持久化操作. 简单说明一下各种持久化方式:(描述代码采用的是Rabbit.Client  SDK,  C#代码)

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

RabbitMQ - 发布订阅

这次我们试试publish / subscribe模式,也就是将一个消息发送给多个consumer. 这里用一个简单的小程序来说明publish / subscribe.由一个provider提供消息,这个消息会被多个consumer接收.consumer对同一个消息做出不同的反应,比如打印.保存到文件.数据库什么的. 之前的例子可能会给人这种感觉:producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息. 但这并不正确.在rabbit中,producer从来不会直接

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

RabbitMQ官网教程---发布/订阅

(使用python客户端pika 0.9.8) 在前面的教程中我们创建了一个工作队列.假设在一个工作队列后面是每一个被传递给正确的工作者的任务.在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息.这种模式被称为"发布/订阅". 为了阐明这个模式,我们将构建一个简单的日志系统.它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们. 在我们的日志系统中每个运行的接收程序副本将获得这个消息.用这种方式我们将可以运行一个接收器并且直接日志到磁盘:而且同时我

RabbitMQ/JAVA (发布/订阅模式)

发布/订阅模式即生产者将消息发送给多个消费者. 下面介绍几个在发布/订阅模式中的关键概念-- 1. Exchanges (转发器) 可能原来我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式. Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列.实际上,生产者都不知道这个消息是发送给哪个队列的.相反,生产者只能发送消息给转发器. 转发器一方面接收生产者的消息,另一方面向队列推送消息. 转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者