.NET Core RabbitMQ探索(2)——RabbitMQ的Exchange

  实际上,RabbitMQ的生产者并不会直接把消息发送给队列,甚至生产者都不知道消息是否会被发送给一个队列。对于生产者而言,它们只能把消息发送到Exchange,一个Exchange所完成的工作相当简单,一方面,它从生产者那里接收消息;另一方面它将消息存入队列中。一个Exchange需要准确的知道它要如何处理它接收到的消息,例如,它需要把消息转发到特定的队列,还是进行广播处理,或者直接将它丢弃。可以通过exchange type来定义Exchange处理消息的规则。
  整个框架结构图如图所示。

  Exchange types有以下几种:direct、topic、headers和fanout。如果我们没有定义Exchange,那么系统就会默认使用一个默认的Exchange,名为:"",就像我们入门篇里的一样,它会自己创建一个""的默认Exchange,然后将消息转发给特定routingKey的队列。

  • Direct Exchange

  使用direct exchange时,会将exchange与特定的队列进行绑定,转发时由routingkey进行队列的匹配,如图所示。

  在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binding进行消息路由,代码示例如下所示:

  1. 首先我们需要将exchange和queue进行binding
channel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null);

  绑定时需要设置:队列名、exchange名和它们的routingkey。

  1. 在发送消息到exchange时会设置对应的routingkey
channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body);

  生产者发布消息时,需要设置exchange名和routingKey,如果exchange名和routingKey都与上述绑定的完全一致,那么该exchange就会将这条消息路由到队列。

  • Topic Exchange

  此类exchange与direct类似,唯一不同的地方是,direct类型要求routingKey完全一致,而这里可以可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“JiangYuZhou.#”能够匹配到“JiangYuZhou.pets.cat”,但是“JiangYuZhou.*” 只会匹配到“JiangYuZhou.money”。
  所以,Topic Exchange 使用非常灵活,topic exchange如图所示。

  例如,我们首先声明一个topic exchange,它的名称为"agreements":

// Topic类型的exchange, 名称 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Topic,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

  然后,我们声明三个队列,它们分别如下:

// 创建berlin_agreements队列
    channel.QueueDeclare(queue: "berlin_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 all_agreements 队列
    channel.QueueDeclare(queue: "all_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 headstore_agreements 队列
    channel.QueueDeclare(queue: "headstore_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

  最后,我们将agreements exchange分别与上面的三个队列以不同通配符的routingKey进行绑定:

//绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
    channel.QueueBind(queue: "berlin_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.berlin.#",
                        arguments: null);

    //绑定 agreements --> all_agreements 使用routingkey:agreements.#
    channel.QueueBind(queue: "all_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.#",
                        arguments: null);

    //绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
    channel.QueueBind(queue: "headstore_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.*.headstore",
                        arguments: null);

  这时我们如果发送下列消息:

 var message = "hello world";
 var body = Encoding.UTF8.GetBytes(message);
 var properties = channel.CreateBasicProperties();
 properties.Persistent = true;

channel.BasicPublish(exchange: "agreements",
    routingKey: "agreements.eu.berlin",
    basicProperties: properties,
    body: body);

  该消息设置的exchange为"agreements",routingKey为"agreements.eu.berlin",所以它可以匹配上面的"agreements.eu.berlin.#"和"agreements.#",消息被转发到了"berlin_agreements"和"all_agreements"队列。

  • Fanout Exchange

  该exchange无需对routingKey进行匹配操作,而是很简单的直接将消息路由到所有绑定的队列中,如图所示。

  • Header Exchange

  此类型的路由规是根据header来判断的,首先需要以键值对的形式设置header的参数,在绑定exchange的时候将header以arguments的形式传递进去,传递参数时,键为"x-match"的header可以设置它的值为all或any,其中,all表示只有当发布的消息匹配该header中除"x-match"以外的所有值时,消息才会被转发到该队列;any表示当发布的消息匹配该header种除"x-match"外的任意值时,该消息会被转发到匹配队列。

代码操练

  最后我们以header exchange为例,演示我们的Exchange。首先我们创建四个项目,其中一个作为生产者,另作三个均作为消费者,并且使用:

dotnet add package RabbitMQ.Client

  给四个项目均安装上RabbitMQ的.NET包,并进行restore,项目结构如图所示:

  开始编写Send端的代码,其中,RabbitMQ还是使用我们在上一章种使用的Docker中RabbitMQ,程序如下:

using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;

namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //声明Headers类型的exchange,名称为agreements
                    channel.ExchangeDeclare(exchange: "agreements",
                        type: ExchangeType.Headers,
                        autoDelete: false,
                        arguments: null);

                    //创建队列queue.A
                    channel.QueueDeclare(queue: "queue.A",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    //创建队列queue.B
                    channel.QueueDeclare(queue: "queue.B",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    //创建队列queue.C
                    channel.QueueDeclare(queue: "queue.C",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    //绑定agreements=>queue.A,使用arguments(format=pdf、type=report、x-match=all)
                    //只有当header中同时满足format=pdf、type=report时,消息才会被转发到队列A
                    Dictionary<string, object> aHeader = new Dictionary<string, object>();
                    aHeader.Add("format", "pdf");
                    aHeader.Add("type", "report");
                    aHeader.Add("x-match", "all");
                    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: aHeader);

                    //绑定agreements=>queue.B,使用arguments(format=pdf、type=log、x-match=any)
                    //当header中满足format=pdf或type=log任意一个时,消息就会被转发到队列B
                    Dictionary<string, object> bHeader = new Dictionary<string, object>();
                    bHeader.Add("format", "pdf");
                    bHeader.Add("type", "log");
                    bHeader.Add("x-match", "any");
                    channel.QueueBind(queue: "queue.B",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: bHeader);

                    //绑定agreements=>queue.C,使用arguments(format=zip、type=report、x-match=all)
                    //当header中同时满足format=zip和type=report时,消息会被转发到队列C
                    Dictionary<string, object> cHeader = new Dictionary<string, object>();
                    cHeader.Add("format", "zip");
                    cHeader.Add("type", "report");
                    cHeader.Add("x-match", "all");
                    channel.QueueBind(queue: "queue.C",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: cHeader);

                    string message1 = "hello world From 1";
                    var body = Encoding.UTF8.GetBytes(message1);
                    var properties1 = channel.CreateBasicProperties();
                    properties1.Persistent = true;
                    Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
                    mHeader1.Add("format", "pdf");
                    mHeader1.Add("type", "report");
                    properties1.Headers = mHeader1;

                    //这条消息会被转发到queue.A和queue.B
                    //queue.A 的binding (format=pdf, type=report, x-match=all)
                    //queue.B 的binding (format=pdf, type=log, x-match=any)
                    channel.BasicPublish(exchange: "agreements",
                                    routingKey: string.Empty,
                                    basicProperties: properties1,
                                    body: body);

                    string message2 = "hello world From 2";
                    body = Encoding.UTF8.GetBytes(message2);
                    var properties2 = channel.CreateBasicProperties();
                    properties2.Persistent = true;
                    Dictionary<string, object> mHeader2 = new Dictionary<string, object>();
                    mHeader2.Add("type", "log");
                    properties2.Headers = mHeader2;

                    //这条消息会被转发到queue.B
                    //queue.B 的binding (format = pdf, type = log, x-match = any)
                    channel.BasicPublish(exchange: "agreements",
                                    routingKey: string.Empty,
                                    basicProperties: properties2,
                                    body: body);

                    string message3 = "hello world From 3";
                    body = Encoding.UTF8.GetBytes(message3);
                    var properties3 = channel.CreateBasicProperties();
                    properties3.Persistent = true;
                    Dictionary<string, object> mHeader3 = new Dictionary<string, object>();
                    mHeader3.Add("format", "zip");
                    properties3.Headers = mHeader3;

                    //这条消息不会被路由
                    //队列C要求同时满足两个条件,这里只满足了一个,没有匹配的队列
                    channel.BasicPublish(exchange: "agreements",
                                    routingKey: string.Empty,
                                    basicProperties: properties3,
                                    body: body);
                }
            }
        }
    }
}

  运行程序后,可以看到,queue.A中匹配了三条消息、queue.B中匹配了两条、queue.C中没有匹配到任何消息。

  可以看到,队列A中匹配了一条信息,即Message 1,队列B中匹配了两条信息,即Message 1和Message2,队列C中没有匹配信息,符合我们程序的编写,下面用接收端进行接收。
  接收端分别写了三个程序,分别接收队列A、B、C的消息,它们除了绑定队列名称不同外,其余全部相同,下面是绑定队列A的接收程序:

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Recieve1
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //注意要与发送端的声明一致
                    channel.ExchangeDeclare(exchange: "agreements",
                        type: ExchangeType.Headers,
                        autoDelete: false,
                        arguments: null);

                    //绑定了queue.C和agreements Exchange
                    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty);

                    Console.WriteLine("Waiting for messages");

                    var consumer = new EventingBasicConsumer(channel);

                    //绑定接收完成事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"Recieve Message:{message}");
                    };

                    channel.BasicConsume(queue: "queue.A",
                        autoAck: true,
                        consumer: consumer);

                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

  最后,我们分别运行这三个接收程序:

  符合程序设计。

  参考:JulyLuo——https://www.cnblogs.com/julyluo/p/6265775.html

原文地址:https://www.cnblogs.com/cquptjyz/p/10888923.html

时间: 2024-10-13 18:29:44

.NET Core RabbitMQ探索(2)——RabbitMQ的Exchange的相关文章

RabbitMQ inequivalent arg &#39;durable&#39; for exchange &#39;csExchange&#39; in vhost &#39;/&#39;: received

错误:inequivalent arg 'durable' for exchange 'csExchange' in vhost '/': received 使用不同的MQ客户端时,常常会出现以上错误信息. 如使用php作为product,使用java, rubby, python作为consume. 最常见的原因是: durable, auto_delete,passive参数不一致,保持参数一致性就ok了 RabbitMQ inequivalent arg 'durable' for exc

Github开源:Sheng.RabbitMQ.CommandExecuter (RabbitMQ 的命令模式实现)

[Github]:https://github.com/iccb1013/Sheng.RabbitMQ.CommandExecuter Sheng.RabbitMQ.CommandExecuter 是使用 .Net 对 RabbitMQ 的一个简单封装. 它通过XML配置文件定义Exchange及队列等信息,根据此配置文件自动声明及初始化相关队列信息,方便 .Net 开发人员使用 RabbitMQ. 并实现了一个基于 MQ 的命令执行器,将 MQ 消息抽象化为命令,发布端和订阅端通过命令进行交互

.NET文件并发与RabbitMQ(初探RabbitMQ)

本文版权归博客园和作者吴双本人共同所有.欢迎转载,转载和爬虫请注明原文地址:http://www.cnblogs.com/tdws/p/5860668.html 想必MQ这两个字母对于各位前辈们和老司机们并不陌生.本文初探RabbitMQ的简单分享可能值得学习之处不怎么多,本人对于RabbitMQ的研究目前也很初级,这个月打算按照好的学习线路提高一下,欢迎新老司机留下你们的见解. 首先提到第一个简单的场景,文件并发.我先手动实现一下文件并发,引发异常,请看如下代码. 1 static void

Rabbitmq学习(一) Rabbitmq初探

Rabbitmq学习(一) Rabbitmq初探 理论 定义 消息队列:在消息的传输过程中保存消息的的容器. 这是一个较为经典的消费-生产者模型,说起来比较抽象,打个比方:A线程需要给B线程发送消息(A.B线程不一定是在同一台机器上的),A线程先把消息发送到消息队列服务器上,然后B线程去读取或是订阅消息服务器上消息队列中的消息,线程A和B之间并没有进行直接通信.MQ服务器在中间起到中继的作用. 适用的应用场景 比较适合异步传输,这里解释一下什么是异步和同步. 异步:发送方不关心消息有没有发送成功

RabbitMq学习(一) Exchange的四种类型和属性

一.什么是Exchange 首先我们要了解到,RabbitMQ 是 AMQP(高级消息队列协议)的标准实现: 从 AMQP 协议可以看出,Queue.Exchange 和 Binding 构成了 AMQP 协议的核心 Producer:消息生产者,即投递消息的程序. Broker:消息队列服务器实体. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列. Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来. Queue:消息队列载体,每个

消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!

前言 来了解RabbitMQ一个重要的概念:Exchange交换机 1. Exchange概念 Exchange:接收消息,并根据路由键转发消息所绑定的队列. 蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列. 黄色框:交换机和队列通过路由键有一个绑定的关系. 绿色框:消费端通过监听队列来接收消息. 2. 交换机属性 Name:交换机名称 Type:交换机类型--direct.topic.fanout.headers.sharding(此篇不讲) Durability:是否需要持久化,

RabbitMQ八:交换机类型Exchange Types--Topic介绍

前言 上一章节,我们说了两个类型,本章我们说一下其三:Topic Exchange Topic Exchange  Topic Exchange – 将路由键和某模式进行匹配.此时队列需要绑定要一个模式上.符号"#"匹配一个或多个词,符号"*"匹配不多不少一个词.因此"audit.#"能够匹配到"audit.irs.corporate",但是"audit.*" 只会匹配到"audit.irs&qu

linux安装rabbitmq ssm集成rabbitmq

rabbitmq 3.8.0 & erlang 22.1 源码编译安装 摘自 https://www.cnblogs.com/amosli/p/11765483.html 1.安装erlang编译依赖 sudo yum install -y gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git 1.2.下载erlang22.1源码包 wget http://er

RabbitMQ交换机、RabbitMQ整合springCloud

目标 1.交换机 2.RabbitMQ整合springCloud 交换机 蓝色区域===生产者 红色区域===Server:又称Broker,接受客户端的连接,实现AMQP实体服务 绿色区域===消费者 黄色区域===就是我们的交换机以及队列 由生产者投递信息到RabbitMQ Server里面某一个交换机对应的队列中,消费者则是从对应的队列中获取信息 交换机属性: Name:交换机名称 Type:交换机类型 direct.topic.fanout.headers Durability:是否需要