RabbitMQ 原文译05--路由

前一篇文章中我们构建了一个简单的日志系统,我们可以向多个接受者广播消息。

在这篇文章我,我们将要添加一些功能使得针对部分消息的接受成为可能,例如我们只对错误的消息进行磁盘记录,同时又可以把所有的消息打印到屏幕上。

绑定

在之前的案例中,我们已经创建了一个绑定,可以重新调用如下的代码:

channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");

绑定是交换机和队列之间的关系,可以简单的理解为队列对该交换机上的消息感兴趣。

绑定可以设定参数routingKey,为了避免和BasicPublish 方法的参数混淆,我们暂且称之为binding key,下面是我们创建一个带有指定binding key的绑定:

channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: "black");

Direct exchange

我们之前的日志系统,把接受到的消息广播给所有的接受者,我们将要扩展它使得其能够根据消息的级别来过滤发送消息,例如我们想让记录日志的接受者仅仅接受严重性级别的错误消息,而不用在警告和信息级别的消息上浪费磁盘空间。

fanout没有办法提供给我们这样的灵活性,它只能对接受到的消息进行直接广播,而不去关心 routing key.

这里我们使用direct类型的交换机去代替,direct类型的交换机的实现思想非常简单--消息将会被发送到其Binding key 和消息的routing key 完全匹配的队列上。

为了说明这个问题,考虑下面的设定:

在这个交换机上绑定了两个队列,第一个队列绑定的bingding key 为"orange",第二个队列绑定了两个binding key,一个是"black",另外一个是"green",在这样的配置下,如果一个带有routing key为"orange"的消息,竟会被路由到队列Q1上,而带有routing key为"black" 或 "green"的消息将会被路由到队列Q2上。

多绑定

使用一个routing key 绑定多个队列完全是合法的,在我们案例中我们可以在 路由X 和 队列Q1,Q2中同时添加一个binding key为"black"的绑定,在这种情况下路由 X 将会像 fanout交换机一样帮匹配到的消息发送给所有的接受者:路由会把routing key 为"black"的消息发送给Q1和Q2。

嵌入日志

我们使用这种模式来构建我们的日志系统,代替fanout我们将会把我们的消息发送到direct类型的交换机,我们把日志级别作为路由的rouing key,这样接受者就可以根据日志级别选择接受其感兴趣的日志。

像往常一样,首先我们需要创建一个交换机:

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

然后我们准备发送消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null, body: body);

我们简单的假设 severity 分为以下几种:"info","warning","error".

订阅

接受消息和之前的一样,唯一的区别就是我们将会为我们感兴趣的每一个级别的消息新建绑定:

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);
}

汇总

EmitLogDirect.cs:

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

ReceiveLogsDirect.cs:

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

编译执行代码:

例如:如果你仅仅想保存"warning"和"error"的消息,打开控制台输入:ReceiveLogsDirect.exe warning error

发送一个Error消息, 控制台输入 EmitLogDirect.exe error "Run. Run. Or it will explode."

时间: 2024-10-15 04:41:54

RabbitMQ 原文译05--路由的相关文章

RabbitMQ 原文译04--发布和订阅

发布/订阅 在之前的案例中我们创建了一个工作队列,这个工作队列的实现思想就是一个把每一个任务平均分配给每一个执行者,在这个篇文章我们会做一些不一样的东西,把一个消息发送给多个消费者,这种模式就被称作"发布/订阅". 为了说明这个模式,我们将要创建一个简单的日志系统,一个负责发布消息,另外一个负责接收打印他们. 在我们的日志系统中,每一个运行中的接收者副本将都会获得消息,这种方式可以让我们在运行一个接收者直接把消息保存在磁盘的同时,另外一个消费者可以把消息打印到屏幕上. 本质上,发布一个

RabbitMQ(四) ——路由

RabbitMQ(四) --路由 (转载请附上本文链接--linhxx) 一.概述 路由模式(routing)是交换机不将消息广播到全部的队列中,而是采用交换机的另一种模式--direct.该模式下,交换机会精准的将消息发送到某个与其绑定的队列,而不是发送给全部队列. 如果没有队列绑定交换机,消息会丢失. 路由模式如下图所示: 二.绑定方式(binding) 在交换机的fanout模式下,不需要routing key,但是在此模式下,由于交换机需要精准的将消息发送给某个(某些)队列,则需要队列与

RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)

在上一章中,我们构建了一个简单的日志系统,我们可以把消息广播给很多的消费者.在本章中我们将增加一个特性:我们可以订阅这些信息中的一些信息.例如,我们希望只将error级别的错误存储到硬盘中,同时可以将所有级别(error.info.warning等)的日志都打印在控制台上. 1.绑定(Bindings) 在上一章中,我们已经创建了绑定关系,回顾一下代码: 1 channel.queueBind(queueName, EXCHANGE_NAME, ""); 一个绑定是一个交换器与队列之间

rabbitmq系列四 之路由

1.路由 在上一个的教程中,我们构建了一个简单的日志记录系统.我们能够向许多接收者广播日志消息. 在本次教程中,我们向该系统添加一些特性,比如,我只需要严重错误(erroe级别)的部分日志打印到磁盘文件中,但是同时仍然把所有的日志打印到控制台. 2.绑定 在前面的例子中.我们已经用以下的代码创建了绑定. 1 channel.queueBind(queueName, EXCHANGE_NAME, ""); 绑定是指交换机(exchange)与队列(queue)之间的联系,也可以理解为,当

python使用rabbitMQ介绍四(路由模式)

一.模式介绍 路由模式,与发布-订阅模式一样,消息发送到exchange中,消费者把队列绑定到exchange上. 这种模式在exchange上添加添加了一个路由键(routing-key),生产者发布消息的时候添加路由键(routing-key),消费者绑定队列到交换机时添加键值(routing-key),这样就可以接收到对应的消息. 路由模式的direct exchange. 队列模型: 与发布-订阅模式不同的是,每个消费者队列接收的消息不同,根据消息的routing-key把消息发送到不同

RabbitMQ官网教程---路由

(使用python客户端pika 0.9.8) 在前面的教程中我们构建了一个简单的日志系统.我们可以给许多接收者广播日志消息. 在这个教程中我们将添加一个特性给它-我们将订阅仅仅一种消息子集成为可能.例如,我们可以指挥仅仅错误消息到日志文件(保存到磁盘空间),它任然可以在控制台打印所有的日志消息. 绑定 在前面的例子中我们已经创建了绑定,你可以重新调用像这样的代码: channel.queue_bind(exchange=exchange_name, queue=queue_name) 绑定是e

RabbitMQ使用场景_004_路由模式

路由模式 queue只感兴趣这个的exchange.只希望接收交换机中的关键信息,或者说指定内容的信息, 而忽略交换机中的其他消息. direct exchange背后的路由算法很简单--即一个消息的队列binding_key完全匹配message的routing_key. 交换机绑定不同key的队列: 在上图中: exchange X和两个queue绑定在一起.queue Q1的binding key是orange.queue Q2的binding key是black和green. 当P pu

RabbitMQ (消息队列)专题学习05 routing(路由)

(使用Java客户端) 一.概述 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 二.路由功能实现 2.1.绑定(bindings) 在前面的学习中已经创建了绑定(bindings),代码如下: channel.queueBind

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re