RabbitMQ Exchange类型详解

前言

在上一篇文章中,我们知道了RabbitMQ的消息流程如下:

但在具体的使用中,我们还需知道exchange的类型,因为不同的类型对应不同的队列和路由规则。

在rabbitmq中,exchange有4个类型:direct,topic,fanout,header。

direct exchange

此类型的exchange路由规则很简单:

exchange在和queue进行binding时会设置routingkey

channel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null);

然后我们在将消息发送到exchange时会设置对应的routingkey

channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body);

在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

具体的流程如下:

通过代码可以会理解好一点:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Direct类型的exchange, 名称 pdf_events
    channel.ExchangeDeclare(exchange: "pdf_events",
                            type: ExchangeType.Direct,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 创建create_pdf_queue队列
    channel.QueueDeclare(queue: "create_pdf_queue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 pdf_log_queue队列
    channel.QueueDeclare(queue: "pdf_log_queue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //绑定 pdf_events --> create_pdf_queue 使用routingkey:pdf_create
    channel.QueueBind(queue: "create_pdf_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_create",
                        arguments: null);

    //绑定 pdf_events --> pdf_log_queue 使用routingkey:pdf_log
    channel.QueueBind(queue: "pdf_log_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_log",
                        arguments: null);

    var message = "Demo some pdf creating...";
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发送消息到exchange :pdf_events ,使用routingkey: pdf_create
    //通过binding routinekey的比较,次消息会路由到队列 create_pdf_queue
    channel.BasicPublish(exchange: "pdf_events",
                routingKey: "pdf_create",
                basicProperties: properties,
                body: body);

    message = "pdf loging ...";
    body = Encoding.UTF8.GetBytes(message);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发送消息到exchange :pdf_events ,使用routingkey: pdf_log
    //通过binding routinekey的比较,次消息会路由到队列 pdf_log_queue
    channel.BasicPublish(exchange: "pdf_events",
            routingKey: "pdf_log",
            basicProperties: properties,
            body: body);

}

topic exchange

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:‘*‘,‘#‘.

其中‘*‘表示匹配一个单词, ‘#‘则表示匹配没有或者多个单词

如上图第一个binding:

  • exchange: agreements
  • queue A:  berlin_agreements
  • binding routingkey: agreements.eu.berlin.#

第二个binding:

  • exchange: agreements
  • queue B: all_agreements
  • binding routingkey: agreements.#

第三个binding:

  • exchange: agreements
  • queue c: headstore_agreements
  • binding routingkey: agreements.eu.*.headstore

所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding,但最后一个不符合,具体的代码如下:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Topic类型的exchange, 名称 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Topic,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 创建berlin_agreements队列
    channel.QueueDeclare(queue: "berlin_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 all_agreements 队列
    channel.QueueDeclare(queue: "all_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //创建 headstore_agreements 队列
    channel.QueueDeclare(queue: "headstore_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
    channel.QueueBind(queue: "berlin_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.berlin.#",
                        arguments: null);

    //绑定 agreements --> all_agreements 使用routingkey:agreements.#
    channel.QueueBind(queue: "all_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.#",
                        arguments: null);

    //绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
    channel.QueueBind(queue: "headstore_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.*.headstore",
                        arguments: null);

    var message = "hello world";
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //发送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin
    //agreements.eu.berlin 匹配  agreements.eu.berlin.# 和agreements.#
    //agreements.eu.berlin 不匹配  agreements.eu.*.headstore
    //最终次消息会路由到队里:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)
    channel.BasicPublish(exchange: "agreements",
                            routingKey: "agreements.eu.berlin",
                            basicProperties: properties,
                            body: body);

}

fanout exchange

此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。

header exchange

此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:

Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",
                    exchange: "agreements",
                    routingKey: string.Empty,
                    arguments: aHeader);

其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。

在发布消息的时候就需要传入header值:

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;

具体的规则可以看以下代码:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Headers类型的exchange, 名称 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Headers,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 创建queue.A队列
    channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);

    //创建 queue.B 队列
    channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null);

    //创建 queue.C 队列
    channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null);

    //绑定 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all)
    Dictionary<string, object> aHeader = new Dictionary<string, object>();
    aHeader.Add("format", "pdf");
    aHeader.Add("type", "report");
    aHeader.Add("x-match", "all");
    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: aHeader);

    //绑定 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any)
    Dictionary<string, object> bHeader = new Dictionary<string, object>();
    bHeader.Add("format", "pdf");
    bHeader.Add("type", "log");
    bHeader.Add("x-match", "any");
    channel.QueueBind(queue: "queue.B",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: bHeader);

    //绑定 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all)
    Dictionary<string, object> cHeader = new Dictionary<string, object>();
    cHeader.Add("format", "zip");
    cHeader.Add("type", "report");
    cHeader.Add("x-match", "all");
    channel.QueueBind(queue: "queue.C",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: cHeader);

    string message1 = "hello world";
    var body = Encoding.UTF8.GetBytes(message1);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
    mHeader1.Add("format", "pdf");
    mHeader1.Add("type", "report");
    properties.Headers = mHeader1;
    //此消息路由到 queue.A 和 queue.B
    //queue.A 的binding (format=pdf, type=report, x-match=all)
    //queue.B 的binding (format = pdf, type = log, x - match = any)
    channel.BasicPublish(exchange: "agreements",
                            routingKey: string.Empty,
                            basicProperties: properties,
                            body: body);

    string message2 = "hello world";
    body = Encoding.UTF8.GetBytes(message2);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader2 = new Dictionary<string, object>();
    mHeader2.Add("type", "log");
    properties.Headers = mHeader2;
    //x-match 配置queue.B
    //queue.B 的binding (format = pdf, type = log, x-match = any)
    channel.BasicPublish(exchange: "agreements",
                    routingKey: string.Empty,
                    basicProperties: properties,
                    body: body);

    string message3= "hello world";
    body = Encoding.UTF8.GetBytes(message3);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader3 = new Dictionary<string, object>();
    mHeader3.Add("format", "zip");
    properties.Headers = mHeader3;
    //配置失败,不会被路由
    channel.BasicPublish(exchange: "agreements",
                    routingKey: string.Empty,
                    basicProperties: properties,
                    body: body);

}

总计

以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。

header类型用的比较少,但还是知道一点好。

时间: 2024-10-04 14:52:56

RabbitMQ Exchange类型详解的相关文章

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

RabbitMQ基础知识详解

RabbitMQ基础知识详解 2017年08月28日 20:42:57 dreamchasering 阅读数:41890 标签: RabbitMQ 什么是MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息. RabbitMQ是MQ的一种.下面详细介绍一下RabbitMQ的基本概念. 1.队列.生产者.消费者 队列是RabbitMQ的内部对象,用于存

赋值运算符函数的返回值类型详解

在c++赋值运算符函数的学习中,对于返回值类型的问题,一直非常费解,今天彻底总结一些每种不同返回值类型的结果: 1.当返回值为空时: <span style="font-size:14px;">void hasptr::operator=(const hasptr& s)</span> 这个时候如果只有一个'='(a = b)运算那就没问题,但是如果存在'='(a = b = c)的链式操作时,编译器就会报错 我们看:a = b = c: 程序会先运行

【DataBase】sqlserver字段类型详解

bit    整型 bit数据类型是整型,其值只能是0.1或空值.这种数据类型用于存储只有两种可能值的数据,如Yes 或No.True 或False .On 或Off. 注意:很省空间的一种数据类型,如果能够满足需求应该尽量多用. tinyint   整型 tinyint 数据类型能存储从0到255 之间的整数.它在你只打算存储有限数目的数值时很有用.这种数据类型在数据库中占用1 个字节. 注意:如果bit类型太单调不能满足您的需求,您可以考虑用tinyint类型,因为这个类型相对也是比较安全的

c#分部类型详解

一.先看代码来理解 代码一 1 class ClassA 2 { 3 void A(){;} 4 void B(){;} 5 } 代码二 1 partial class ClassA 2 { 3 void A(){;} 4 } 5 partial class ClassA 6 { 7 void B(){;} 8 } 代码一和代码二效果是一样的,引用类的成员完全一样,只是声明不不同而已.主要partial在这里是分部类型的关键词 编译过程图解 二.应用场景 1.嵌套在一个类里面 1 class C

[转]C#进阶系列——WebApi 接口返回值不困惑:返回值类型详解

本文转自:http://www.cnblogs.com/landeanfen/p/5501487.html 阅读目录 一.void无返回值 二.IHttpActionResult 1.Json(T content) 2.Ok(). Ok(T content) 3.NotFound() 4.其他 5.自定义IHttpActionResult接口的实现 三.HttpResponseMessage 四.自定义类型 五.总结 正文 前言:已经有一个月没写点什么了,感觉心里空落落的.今天再来篇干货,想要学

JavaScript对象类型详解

JavaScript对象类型详解 JavaScrtip有六种数据类型,一种复杂的数据类型(引用类型),即Object对象类型,还有五种简单的数据类型(原始类型):Number.String.Boolean.Undefined和Null.其中,最核心的类型就是对象类型了.同时要注意,简单类型都是不可变的,而对象类型是可变的. 什么是对象 一个对象是一组简单数据类型(有时是引用数据类型)的无序列表,被存储为一系列的名-值对(name-value pairs).这个列表中的每一项被称为 属性(如果是函

C++ string类型详解

C++ string类型详解 string是非常强大的类型,很好的封装了字符串的操作,有些时候我们可以把string当做字符的容器,string也 支持大多数容器操作,下面就列出string类型所支持的所有操作,本文并不是为了讲解string的用法和应用, 而是希望作为string类型的参考文档,每个函数皆在注释后有详细说明,需要用时查阅即可. 1.构造函数 string();//空串 string(size_type length,char ch);//以length为长度的ch的拷贝(即le

网络互联技术(四)-LSA的第四和第五种类型详解

LSA的第四和第五种类型详解 一.External LSA:第五种LSA 我们前面已经详细介绍了前面三种LSA,今天就接着介绍第四种和第五种LSA.因为理解ASBR Summary LSA--第四种LSA需要涉及External LSA的一些知识,所以我们先介绍第五种LSA,然后再回过头来看第四种LSA. External LSA由ASBR(Autonomous System Border Router,自治系统边界路由器,我们前面说过了OSPF网络就是一个自治系统)产生.它是用来通告OSPF网