rabbitmq学习3:Publish/Subscribe

在前面的Work Queue中的消息是均匀分配消息给消费者;如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅)。让我们开始Publish/Subscribe之旅吧!

Publish/Subscribe的工作示意图如下:

在上图中的X表示Exchange(交换区);Exchange的类型有:direct , topic , headers 和 fanout

Publish/Subscribe的Exchang的类型为fanout;声明Publish/Subscribe的Exchang代码如下:

Java代码  

  1. channel.exchangeDeclare("logs", "fanout");

对于Work Queue中提到的发布消息的代码如下:

Java代码  

  1. channel.basicPublish("", queueName,   null, message.getBytes());

但对于Publish/Subscribe中发布消息中的Queue的使用的是默认的;代码如下:

Java代码  

  1. channel.basicPublish( "logs", "", null, message.getBytes());

Exchange和各Queue之间是如何通信的呢?主要是通过把Exchange和各Queue绑定(binding);示意代码如下:

Java代码  

  1. channel.queueBind(queueName, exchangeName, "");

Publish/Subscribe加入绑定的工作示意图如下:

那我们就开始程序代码吧:P端的代码如下:

Java代码  

  1. package com.abin.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class EmitLog {
  6. private static final String EXCHANGE_NAME = "logs";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明Exchange
  13. for (int i = 0; i <= 2; i++) {
  14. String message = "hello word!" + i;
  15. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  16. System.out.println(" [x] Sent ‘" + message + "‘");
  17. }
  18. channel.close();
  19. connection.close();
  20. }
  21. }

运行结果如下:

Java代码  

  1. [x] Sent ‘hello word!0‘
  2. [x] Sent ‘hello word!1‘
  3. [x] Sent ‘hello word!2‘

C端的代码如下:

Java代码  

  1. package com.abin.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class ReceiveLogsOne {
  7. private static final String EXCHANGE_NAME = "logs";
  8. public static void main(String[] argv) throws Exception {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("localhost");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  14. String queueName = "log-fb1";
  15. channel.queueDeclare(queueName, false, false, false, null);
  16. channel.queueBind(queueName, EXCHANGE_NAME, "");//把Queue、Exchange绑定
  17. QueueingConsumer consumer = new QueueingConsumer(channel);
  18. channel.basicConsume(queueName, true, consumer);
  19. while (true) {
  20. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  21. String message = new String(delivery.getBody());
  22. System.out.println(" [x] Received ‘" + message + "‘");
  23. }
  24. }
  25. }

对于C端的代码我写了二个差不多的程序,只需要修改一下queueName。这样就形成了二个Queue;运行结果相同;

运行结果可能如下:

Java代码  

  1. [x] Received ‘hello word!0‘
  2. [x] Received ‘hello word!1‘
  3. [x] Received ‘hello word!2‘
时间: 2024-08-01 13:37:54

rabbitmq学习3:Publish/Subscribe的相关文章

3.6.4 RabbitMQ教程四 - Publish/Subscribe

Publish/Subscribe发布/订阅 What This Tutorial Focuses On In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we

RabbitMQ学习(三).NET Client之Publish/Subscribe

3 Publish/Subscribe Sending messages to many consumers at once Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Publish/Subscribe (using the .NET Client) 前面的教程我们已经学习了如何创建工作队列,工作队列背后的假设是每一个任务都被准确地递送给一个worker进行处理.这里我们将介绍完全不同的模式,即一个消息可以递送给多

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

RabbitMQ - Publish/Subscribe in Java

这次我们试试publish / subscribe模式, 也就是将一个消息发送给多个consumer. 这里用一个简单的小程序来说明publish / subscribe. 由一个provider提供消息,这个消息会被多个consumer接收. consumer对同一个消息做出不同的反应,比如打印.保存到文件.数据库什么的. 之前的例子可能会给人这种感觉: producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息. 但这并不正确. 在rabbit中,producer从

RabbitMQ(三) -- Publish/Subscribe

RabbitMQ(三) -- Publish/Subscribe `rabbitmq`支持一对多的模式,一般称为发布/订阅.也就是说,生产者产生一条消息后,`rabbitmq`会把该消息分发给所有的消费者. Exchanges 之前的教程中,仅仅使用了基本的消息模型: 生产者产生消息 把消息添加到消息队列 消费者接收消息 而在`rabbitmq完整的消息模型`中,并不是这样的.事实上,生产者并不知道消息是否发送到队列,而是把消息直接发送给`Exchanges`. `Exchanges`的功能理解

Part1.2 、RabbitMQ -- Publish/Subscribe 【发布和订阅】

python 目录 (一).交换 (Exchanges) -- 1.1 武sir 经典 Exchanges 案例展示. (二).临时队列( Temporary queues ) (三).绑定(Bindings) (四).汇总(Putting it all together) python系列之 RabbitMQ -- Publish/Subscribe [发布和订阅] >>前面的部分我们创建了一个工作队列(work queue). 设想是每个任务都能分发到一个 worker[queue],这一

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)

目录(?)[-] Exchanges Temporary queues Bindings绑定 最终版本 上篇文章中,我们把每个Message都是deliver到某个Consumer.在这篇文章中,我们将会将同一个Message deliver到多个Consumer中.这个模式也被成为 "publish / subscribe".    这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer). 我们将构建

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)[转]

上篇文章中,我们把每个Message都是deliver(提供)到某个Consumer.在这篇文章中,我们将会将同一个Message deliver(提供)到多个Consumer中.这个模式也被成为 "publish / subscribe".     这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer). 我们将构建两个Consumer,第一个将log写到物理磁盘上:第二个将log输出的屏幕. 1.