RabbitMQ (七) : 订阅者模式之主体模式 ( topic )

主体模式和路由模式很像

路由模式是精确匹配

主体模式是模糊匹配

依然先通过管理后台添加一个交换机.

生产者

    public class Producer
    {

        private const string ExchangeName = "test_exchange_topic";

        public static void Send()
        {
            //获取一个连接
            IConnection connection = ConnectionHelper.GetConnection();

            //从连接中获取一个通道
            IModel channel = connection.CreateModel();

            //声明交换机
            //channel.ExchangeDeclare(ExchangeName, "topic", false, false, null);

            //每次只向消费者发送一条消息,消费者使用后,手动确认后,才会发送另外一条
            channel.BasicQos(0, 1, false);

            string msg = "hello world ";

            //发送消息,只发送到路由键为"product.delete" 或者 "product.#"的队列.
            //# 匹配一个或多个
            //* 匹配一个
            //上限为 255 个字节
            channel.BasicPublish(ExchangeName, "product.delete", null, Encoding.Default.GetBytes(msg));
            Console.WriteLine($"send {msg}");

            channel.Close();
            connection.Close();
        }
    }

消费者1

    public class Consumer1
    {
        private const string QueueName = "test_exchange1_queue";
        private const string ExchangeName = "test_exchange_topic";

        public static void Receive()
        {
            //获取连接
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //创建通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //声明队列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //将队列绑定到交换机上
            channel.QueueBind(QueueName, ExchangeName, "product.add", null);
            channel.QueueBind(QueueName, ExchangeName, "product.update", null);
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注册事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + str);
                channel.BasicAck(e.DeliveryTag, false);//手动应答
            };

            //监听队列
            //bool autoAck = true;//自动确认,一旦mq将消息分发给了消费者,就会从内存中删除该消息
            bool autoAck = false;//手动应答.
            channel.BasicConsume(QueueName, autoAck, "", false, false, null, consumer);
        }
    }

消费者2

    public class Consumer2
    {
        private const string QueueName = "test_exchange2_queue";
        private const string ExchangeName = "test_exchange_topic";
        public static void Receive()
        {
            //获取连接
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //创建通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //声明队列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //将队列绑定到交换机上
            channel.QueueBind(QueueName, ExchangeName, "product.#", null);         //添加消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注册事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("         consumer2 : " + str);
                channel.BasicAck(e.DeliveryTag, false);//手动应答
            };

            //监听队列
            //bool autoAck = true;//自动确认,一旦mq将消息分发给了消费者,就会从内存中删除该消息
            bool autoAck = false;//手动应答.
            channel.BasicConsume(QueueName, autoAck, "", false, false, null, consumer);
        }
    }

运行结果:

由于消费者1的路由键只有 "product.add" 和 "product.update" ,不包含"product.delete",

而消费者2的路由键是"product.#",可以模糊匹配上"product.delete",

所以交换机只会把消息转发到消费者2声明的队列中.

原文地址:https://www.cnblogs.com/refuge/p/10350933.html

时间: 2024-11-02 17:12:58

RabbitMQ (七) : 订阅者模式之主体模式 ( topic )的相关文章

【RabbitMQ】4、几种Exchange 模式

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中. RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header . header模式在实际使用中较少,本文只对前三种模式进行比

linux的bond模式绑定及模式区别

[linux的bond模式配置] 原理: 多块网卡虚拟成一张,实现冗余:多张网卡对外显示一张,具有同一个IP: 工作在网卡是混杂模式的情况下: 对于多物理网卡的 Bond 网卡而言,其中一块物理网卡会被设置为 Master,其他的网卡都是 Slave,Bond 网卡的 MAC 地址取自标志为 Master 的物理网卡,然后将这个 MAC 地址复制到其他物理网卡上: 工作模式: 在主备模式下 , 只有主网卡 eth0 工作,eth1 作为备份网卡是不工作的,只有当一个网络接口失效时 ( 例如主交换

对于观察者模式,Reactor模式,Proactor模式的一点理解

最近就服务器程序IO效率这一块了解一下设计模式中的Reacotr模式和proactor模式,感觉跟观察者模式有些类似的地方,网上也看了一些其他人对三者之间区别的理解,都讲得很仔细,在此根据自己的理解做一点简单的记录和总结,如果理解不对的地方,以后再慢慢深入和更新. 观察者模式: 也可以称为为 发布-订阅 模式,主要适用于多个对象依赖某一个对象的状态并,当某对象状态发生改变时,要通知其他依赖对象做出更新.是一种1对多的关系.当然,如果依赖的对象只有一个时也是一种特殊的一对一关系.通常,观察者模式适

Java设计模式(四) 装饰器模式 代理器模式

(七)装饰器模式 Decorator 装饰器模式是为了动态的给一个对象增加一些新功能.装饰对象与被装饰的对象需要实现同一个接口,装饰对象持有被装饰对象的实例. interface DecoratorSourceable{ public void method(); } //被装饰类 class DecoratorSource implements DecoratorSourceable{ public void method(){ System.out.println("Source"

4-11第二课:yum使用、单用户模式、救援模式

一.YUM使用 yum (全称为 Yellow dog Updater, Modified)是一个在Fedora和RedHat以及CentOS中的Shell前端软件包管理器.基于RPM包管理,能够从指定的服务器自动下载 RPM包并且安装,可以自动处理依赖性关系,并且一次安装所有依赖的软件包,无须繁琐地一次次下载.安装.    yum list                 列出指定yum源中所有本地已安装或未安装的包    yum grouplist        列出指定yum源中所有本地已

ThreadLocal模式与OSIV模式(53)

ThreadLocal: 维护线程局部的变量. ThreadLocal 不是线程.它就是一个Map.可以保存对象. 它保存的对象,只与当前线程相关. 当一个线程还没有运行完成时,如果不想传递数据,可以通过ThreadLocal来保存与这个Thread相关数据. 用ThreadLocal保存和获取数据的示例: public class BaseDemo { public static void main(String[] args) { //声明Map<Object key,Object valu

策略模式和工厂模式的区别

工厂模式和策略模式看着很像,经常让人混淆不清; 它们的区别在哪里,需要细细体味: 相似点 在模式结构上,两者很相似: 差异 用途不一样 工厂是创建型模式,它的作用就是创建对象: 策略是行为型模式,它的作用是让一个对象在许多行为中选择一种行为; 关注点不一样 一个关注对象创建 一个关注行为的封装 解决不同的问题 工厂模式是创建型的设计模式,它接受指令,创建出符合要求的实例:它主要解决的是资源的统一分发,将对象的创建完全独立出来,让对象的创建和具体的使用客户无关.主要应用在多数据库选择,类库文件加载

JS魔法堂:浏览器模式和文档模式怎么玩?

一.前言 从IE8开始引入了文档兼容模式的概念,作为开发人员的我们可以在开发人员工具中通过“浏览器模式”和“文档模式”(IE11开始改为“浏览器模式”改成更贴切的“用户代理字符串”)品味一番,它的出现极大地方便了苦逼的前端攻城狮们适配各版本的IE,但jser们也不能完全信任它,因为它只是提供尽可能的文档模式模拟而已. 本篇大部分内容来源于官方解说:http://msdn.microsoft.com/library/cc288325(v=vs.85).aspx,并尽量融入个人平常工作中踩过的坑加以

存储器的保护(三)——《x86汇编语言:从实模式到保护模式》读书笔记20

存储器的保护(三) 改动本章代码清单,使之能够检測1MB以上的内存空间(从地址0x0010_0000開始,不考虑快速缓存的影响).要求:对内存的读写按双字的长度进行.并在检測的同一时候显示已检測的内存数量.建议对每一个双字单元用两个花码0x55AA55AA和0xAA55AA55进行检測. 上面的文字选自原书第12章的习题1. 这篇博文就讨论一下这道题.由于是初学,我不正确自己做太高的要求.仅仅要实现功能就可以. 代码清单 ;文件说明:第12章习题-1 ;创建日期:2016-3-7 ;------