MQ的订阅模式

一:介绍

1.模式

  

2.使用场景

  一个生产者,多个消费者

  每一个消费者都有自己的队列

  生产者没有直接把消息发送给队列,而是发送到了交换机

  每一个队列都要绑定到交换机

  可以实现一个消息被多个消费者消费。

二:程序

1.生产者

 1 package com.mq.PubSubFanout;
 2
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6
 7 public class FanoutSend {
 8     private static final String EXCHANGE_NAME="text_exchange_fanout";
 9     public static void main(String[] args) throws Exception {
10         //获取一个连接
11         Connection connection= ConnectionUtil.getConnection();
12         //从连接中获取一个通道
13         Channel channel=connection.createChannel();
14         //创建交换机
15         channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
16
17         //消息
18         String msg="hello pubsub";
19
20         //发送
21         channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
22
23         System.out.println("send msg:"+msg);
24         //关闭连接
25         channel.close();
26         connection.close();
27     }
28 }

2.消费者一

 1 package com.mq.PubSubFanout;
 2
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5
 6 import java.io.IOException;
 7
 8 public class FanoutReceive1 {
 9     private static final String EXCHANGE_NAME="text_exchange_fanout";
10     private static final String QUENE_NAME="test_fanout_queue_email";
11     public static void main(String[] args)throws Exception{
12         //获取一个连接
13         Connection connection = ConnectionUtil.getConnection();
14         //创建通道
15         final Channel channel = connection.createChannel();
16         //创建队列声明
17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
18
19         //绑定交换机
20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
21
22         //一次只能发送一个消息
23         channel.basicQos(1);
24
25         //创建消费者
26         DefaultConsumer consumer=new DefaultConsumer(channel){
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 String msg=new String(body,"utf-8");
30                 System.out.println("[1]receive msg:"+msg);
31                 try {
32                     Thread.sleep(200);
33                 } catch (InterruptedException e) {
34                     e.printStackTrace();
35                 }finally {
36                     System.out.println("done");
37                     //手动应答
38                     channel.basicAck(envelope.getDeliveryTag(),false);
39                 }
40             }
41         };
42         //监听队列,不是自动应答
43         boolean autoAck=false;
44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
45     }
46 }

3.消费者二

 1 package com.mq.PubSubFanout;
 2
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5
 6 import java.io.IOException;
 7
 8 public class FanoutReceive2 {
 9     private static final String EXCHANGE_NAME="text_exchange_fanout";
10     private static final String QUENE_NAME="test_fanout_queue_ems";
11     public static void main(String[] args)throws Exception{
12         //获取一个连接
13         Connection connection = ConnectionUtil.getConnection();
14         //创建通道
15         final Channel channel = connection.createChannel();
16         //创建队列声明
17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
18
19         //绑定交换机
20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
21
22         //一次只能发送一个消息
23         channel.basicQos(1);
24
25         //创建消费者
26         DefaultConsumer consumer=new DefaultConsumer(channel){
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 String msg=new String(body,"utf-8");
30                 System.out.println("[2]receive msg:"+msg);
31                 try {
32                     Thread.sleep(200);
33                 } catch (InterruptedException e) {
34                     e.printStackTrace();
35                 }finally {
36                     System.out.println("done");
37                     //手动应答
38                     channel.basicAck(envelope.getDeliveryTag(),false);
39                 }
40             }
41         };
42         //监听队列,不是自动应答
43         boolean autoAck=false;
44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
45     }
46 }

4.效果

  send:

  

  receive1:

  

  receive2:

  

5.运行注意点

  如果之间运行receive类,会发现报错,因为没有交换机。

  所以,可以先运行send类,虽然交换机不能存储发送的消息,但是可以创建交换机。

  然后,就可以按照原来的方式。

  先启动两个消费者进行监听,然后启动生产者。

  现象:就是消费者都获取到了生产者生产的消息。

原文地址:https://www.cnblogs.com/juncaoit/p/8605874.html

时间: 2024-11-13 08:56:46

MQ的订阅模式的相关文章

RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用

之前讲的消费者互相可以把队列中的消息全部读取,但是不是读完整的所有信息 那么采用订阅模式就行,这就是微信公众号的模式, 比如10个人订阅了我的公众号"BeJavaGod",当我发送一条消息的时候, 那么这10个人都能收到我的消息并且查看,比如本条消息,对吧? 生产者制造消息发送给交换机X,而不是发送给队列,队列和交换机绑定,消费者从各自的队列中获得消息 这样则实现一个生产者发送的所有消息都能被所有的消费者同时接收到 需要注意的地方是,在生产者创建消息发送到交换机时,此时没有队列,那么消

ActiveMQ订阅模式持久化实现

实现步骤:1.配置发送xml,applicationContext-send.xml [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-ins

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)

ActiveMQ简单简绍 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBMWEBSPHERE MQ. MQ特点: M

发布-订阅模式

1.什么是发布订阅模式 发布订阅模式 又叫观察者模式,他是定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变,所有依赖他的对象都将得到通知. 在javascript开发中,我们一般用事件模型来替代传统的发布-订阅模式. 2.Dom事件 实际上,只要我们曾经在dom节点上绑定过事件函数,那么我们就曾经使用过发布-订阅模式. document.getElementById('test').addEventListener('click',function(){ alert(2)},fasle

C# 委托和事件 与 观察者模式(发布-订阅模式)讲解 by天命

使用面向对象的思想 用c#控制台代码模拟猫抓老鼠 我们先来分析一下猫抓老鼠的过程 1.猫叫了 2.所有老鼠听到叫声,知道是哪只猫来了 3.老鼠们逃跑,边逃边喊:"xx猫来了,快跑啊!我是老鼠xxx" 一  双向耦合的代码 首先需要一个猫类Cat 一个老鼠类Rat 和一个测试类Program 老鼠类的代码如下 //老鼠类 public class Rat { public string Name { get; set; } //老鼠的名字 public Cat MyCat { get;

JavaScript设计模式与开发实践---读书笔记(8) 发布-订阅模式

发布-订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知. 发布-订阅模式可以广泛应用于异步编程中,这是一种替代传递回调函数的方案. 可以取代对象之间硬编码的通知机制,一个对象不用再显式地调用另外一个对象的某个接口. 自定义事件 首先要指定好谁充当发布者: 然后给发布者添加一个缓存列表,用于存放回调函数以便通知订阅者: 最后发布消息时,发布者会遍历这个缓存列表,依次触发里面存放的订阅者回调函数. 另外,我们还可以往回调函数里填入

订阅模式

订阅模式 Javascript中理解发布--订阅模式 2015-07-30 00:39 by 龙恩0707, 358 阅读, 1 评论, 收藏, 编辑 Javascript中理解发布--订阅模式 阅读目录 发布订阅模式介绍 如何实现发布--订阅模式? 发布---订阅模式的代码封装 如何取消订阅事件? 全局--发布订阅对象代码封装 理解模块间通信 回到顶部 发布订阅模式介绍 发布---订阅模式又叫观察者模式,它定义了对象间的一种一对多的关系,让多个观察者对象同时监听某一个主题对象,当一个对象发生改

观察者模式和发布/订阅模式的区别

在事件总线(EventBus)的架构设计中,用到了发布/订阅模式,但发现和观察者模式挺接近,有时容易发生混淆,现试图分清一下他们的关系. 观察者模式的角色为观察者(observer)和主题(subject)对象,observer需要观察subject时,需先到subject里面进行注册(subject对象持有observer对象的集合句柄),然后,当subject对象的内部状态发生变化时,把这个变化通知所有的观察者. 发布.订阅模式的角色为发布者(publisher)和订阅者(subscribe