4 交换机-fanout(订阅发布模式)

目录

  • 订阅发布模式

    • 1、交换器(Exchange)

      • 1.1、创建交换器
      • 1.2 、推送消息到交换器
    • 2、临时队列
    • 3、绑定(bingdings)
    • 5、代码例子
      • 5.1、生产者代码示例
      • 5.2、消费者代码示例

订阅发布模式

1、交换器(Exchange)

Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)

RabbitMQ的消息发送模型核心思想是生产者不直接把消息发送到消息队列中。事实上,生产者不知道自己的消息将会被缓存到哪个队列中。

其实生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义

1.1、创建交换器

有一些可用的exchange类型:direct, topic, headersfanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为logs、类型为fanoutexchange:

channel.exchangeDeclare("logs", "fanout");

fanout类型的exchange是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中。

  • 没有名字的exchange
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

如上面的代码我们没有指定exchagne的名字,采用的是“”,空字符串的符号指的是默认的或没有命名的exchange:消息会根据routingKey被路由到指定的消息队列中

// 申明交换器,第一个参数:交换器的名字;第二个参数:交换器的类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

1.2 、推送消息到交换器

现在我们来把消息推送到已命名的exchange上,原来的做法是推送到默认的交换器上面的;

  • 原来的做法
// 第一个参数:交换器的名称
// 第二个参数:队列名称
// 第三个参数:消息的属性
// 第四个参数:消息体
 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
  • 推送到交换器

// 第一个参数:交换器名称;
// 第二个参数:队列名称;
// 第三个参数:消息属性;
// 第四个参数:消息体
channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());

2、临时队列

之前的例子中,应该会发现我们都是使用了一个指定名字的消息队列。对应的生产者和消费者之间都要使用相同的消息队列名称

但是在我们的log系统中却不是这样,我们希望能够接收到所有的log消息,不只是其中的一部分。我们只要处理当前的log消息,不用管过去的历史log。为了实现,我们需要做以下两步:

  • 无论什么时候我们和RabbitMQ建立连接时,我们都要刷新、清空Queue。为了达到这一的目的,我们可以用一个随机的名字(随机性可由自己来定义)来创建Queue,也可以让服务器来自动建立一个随见的Queue
  • 当消费者断开连接时,Queue能自动被删除。

使用java客户端时,我们使用无参数的queueDeclare方法,就可以创建一个已经生成名字的排他性会自动删除Queue

String queueName = channel.queueDeclare().getQueue();

这里面我们就可以拿到一个随机名字的queue,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

3、绑定(bingdings)

现在已经创建好了一个fanout类型的exchange和一个队列。那么接下来我们就需要让exchange向我们的queue里发送消息,Exchangequeue之间的关系就是绑定(bindings

    channel.queueBind(queueName,exchangeName,"");

5、代码例子

现在的代码和之前的区别不是很大;

主要的区别就是:

  • 我们把消息推送到一个命名的exchange上,而不是之前未命名的默认exchange
  • 在我们发送消息时需要提供一个routingKey,但对于fanout类型的exchange可以忽略

5.1、生产者代码示例


/**
 * @author zhaodi
 * @description
 * @date 2018/9/28 16:50
 */
public class Producer {
    private static final String EXCHANGE_NAME = "my-exchange-1";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 申明交换器,
        // 第一个参数:交换器的名字;
        // 第二个参数:交换器的类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 第一个参数:交换器名称;
        // 第二个参数:队列名称;
        // 第三个参数:消息属性;
        // 第四个参数:消息体
        channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());
        channel.close();
        connection.close();
}

正如你所见,在建立连接后我们声明了exchange。这一步是必须的,因为禁止向一个不存在的exchange推送消息。

如果没有对exchange负责的queue,那么消息将会被丢失,这是没有问题的;如果没有消费者监听的话,我们会安全的丢掉这些消息。

5.2、消费者代码示例


/**
 * @author zhaodi
 * @desc 发布订阅模式
 */
public class Consumer {

    private static final String EXCHANGE_NAME="my-exchange-1";
    public static void main(String[] args) throws IOException {

        Connection connection = MqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 申明消息路由的名称和类型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        // 申明一个随机的消息队列名称
        String queueName = channel.queueDeclare().getQueue();

        // 绑定消息路由和消息队列
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        // 创建消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("c1--->:"+new String(body));
                // 手动应答
                // 第一个参数:消息标志
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };
        // 监听,关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(queueName,autoAck,consumer);
    }
}

原文地址:https://www.cnblogs.com/zhaod/p/11391258.html

时间: 2024-08-27 11:49:21

4 交换机-fanout(订阅发布模式)的相关文章

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

Publisher/Subscriber 订阅-发布模式

Publisher/Subscriber 订阅-发布模式 本博后续将陆续整理这些年做的一些预研demo,及一些前沿技术的研究,与大家共研技术,共同进步. 关于发布订阅有很多种实现方式,下面主要介绍WCF中的发布订阅,主要参考书籍<Programming WCF Services>,闲话不多说进入正题.使用传统的双工回调(例子 http://www.cnblogs.com/artech/archive/2007/03/02/661969.html)实现发布订阅模式存在许多缺陷,主要问题是,它会引

Spring基于事件驱动模型的订阅发布模式代码实例详解

代码下载地址:http://www.zuidaima.com/share/1791499571923968.htm 原文:Spring基于事件驱动模型的订阅发布模式代码实例详解 事件驱动模型简介 事件驱动模型也就是我们常说的观察者,或者发布-订阅模型:理解它的几个关键点: 首先是一种对象间的一对多的关系:最简单的如交通信号灯,信号灯是目标(一方),行人注视着信号灯(多方): 当目标发送改变(发布),观察者(订阅者)就可以接收到改变: 观察者如何处理(如行人如何走,是快走/慢走/不走,目标不会管的

JS实现观察者模式(订阅/发布模式)

实现 /*  * js 观察者模式 又称 订阅/发布模式  * 通过创建"可观察"对象,当发生一个感兴趣的事件时可将该事件通告给  * 所有观察者,从而形成松耦合 */ // 通用的发布者 EventPublisher = Base.extend({ publish: function(data, type) { EventPublisher.publish(data, type); } }, { subscribers : {         any : []    // 事件类型:

AngularJS的简单订阅发布模式例子

控制器之间的交互方式广播 broadcast, 发射 emit 事件 类似于 js中的事件 , 可以自己定义事件 向上传递直到 document 在AngularJs中 向上传递直到 rootScope 观察者模式, 订阅发布模式 类似于js中的事件机制 订阅者.on('xx发布博客', function([内容]){ 通知我, 接收到博客的[内容] }) 发布者.emit('xxx发布博客', {内容}) 优点: 业务和实际触发者分离, 代码维护性相对好 缺点: 代码复杂性更高 Angular

订阅发布模式

场景概述: 有时需要将多个应用程序集成到一个框架中,这些应用程序常见的基础通信方式包含总线模式.代理模式. 或者点对点模式.一些应用程序发送多种类型的消息,其他应用程序可能更关注这些消息类型的组合. 例如,在一个金融系统存在多个应用程序管理同一客户信息的情况,存在一个客户关系管理程序(CRM)掌握客户信息. 一种典型的情况:客户信息存在于其他系统中,且这些系统执行各自客户信息管理函数来处理客户信息. 当某个面向客户的应用程序生成更新客户信息的消息,例如客户地址的修改时,CRM和其他管理客户信息的

Node中EventEmitter以及如何实现JavaScript中的订阅/发布模式

1.EventEmitter Node中很多模块都能够使用EventEmitter,有了EventEmitter才能方便的进行事件的监听.下面看一下Node.js中的EventEmitter如何使用. (1)基本使用 EventEmitter是对事件触发和事件监听功能的封装,在node.js中的event模块中,event模块只有一个对象就是EventEmitter,下面是一个最基本的使用方法: var EventEmitter = require('events').EventEmitter;

Java里观察者模式(订阅发布模式)

创建主题(Subject)接口 创建订阅者(Observer)接口 实现主题 实现观察者 测试 总结 在公司开发项目,如果碰到一些在特定条件下触发某些逻辑操作的功能的实现基本上都是用的定时器 比如用户注册完后,发送邮件,为了防止邮件发送失败或者发送邮件比较耗时,一般也都是通过定时器去扫库里注册没有发邮件的用户数据 再比如一个订单,在改变状态后,要归档,这也是通过定时器来实现的,扫描订单的数据,通过判断状态来做相对应的处理 但这样处理的话,定时器就会越来越多,总觉得不太好 然后,从一些资讯网站上的

【并发】9、借助redis 实现生产消费,消息订阅发布模式队列

这个就是一个消息可以被多次消费的范例了 其实这个实现的方式可以参考我之前的设计模式,观察者模式 https://www.cnblogs.com/cutter-point/p/5249780.html 不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象 不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些