【EasyNetQ 教程】- 订阅

EasyNetQ订阅者订阅消息类型(消息类的.NET类型)。一旦通过调用Subscribe方法设置了对类型的订阅,就会在RabbitMQ代理上创建一个持久队列,并且该类型的任何消息都将被放置在队列中。只要连接,RabbitMQ就会将任何消息从队列发送给用户。

要订阅消息,我们需要为EasyNetQ提供在消息到达时执行的操作。我们通过传递订阅委托来做到这一点:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

现在每次发布MyMessage实例时,EasyNetQ都会调用我们的委托并将消息的Text属性打印到控制台。

您传递给Subscribe的订阅ID很重要。EasyNetQ将在RabbitMQ代理上为消息类型和订阅ID的每个唯一组合创建一个唯一的队列。

每次调用Subscribe都会创建一个新的队列使用者。如果使用相同的消息类型和订阅ID调用“订阅”两次,则将创建两个使用相同队列的消费者。然后RabbitMQ将依次循环连续消息给每个消费者。这非常适合扩展和工作共享。假设您已经创建了一个处理特定消息的服务,但它的工作负担过重。只需启动该服务的新实例(在同一台机器上,或在不同的机器上),无需配置任何内容,您就可以自动扩展。

如果使用不同的订阅ID(但具有相同的消息类型)两次调用“订阅”,则将创建两个队列,每个队列都有自己的使用者。给定类型的每条消息的副本将被路由到每个队列,因此每个消费者将获得所有消息(该类型的消息)。如果你有几个不同的服务都关心相同的消息类型,这是很好的。

编写订阅回调委托时的注意事项

当从通过EasyNetQ订阅的队列接收消息时,它们被放置在内存中队列中。单个线程位于一个循环中,从队列中获取消息并调用其Action代理。由于在单个线程上一次处理一个委托,因此应避免长时间运行的同步IO操作。尽快从代表处返回控制权。

使用SubscribeAsync

SubscribeAsync允许您的订阅者委托立即返回任务,然后异步执行长时间运行的IO操作。完成长期订阅后,只需完成任务即可。在下面的示例中,我们使用异步IO操作(DownloadStringTask)向Web服务发出请求。任务完成后,我们在控制台上写一行。

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message =>
    new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
        .ContinueWith(task =>
            Console.WriteLine("Received: ‘{0}‘, Downloaded: ‘{1}‘",
                message.Text,
                task.Result)));

另一个示例将导致在出现错误时抛出异常,然后导致消息被置于缺省错误队列中:

_bus.SubscribeAsync<MessageType>("Queue_Identifier",
            message => Task.Factory.StartNew(() =>
            {
                // Perform some actions here
                // If there is a exception it will result in a task complete but task faulted which
                // is dealt with below in the continuation
            }).ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsFaulted)
                {
                    // Everything worked out ok
                }
                else
                {
                    // Dont catch this, it is caught further up the heirarchy and results in being sent to the default error queue
                    // on the broker
                    throw new EasyNetQException("Message processing exception - look in the default error queue (broker)");
                }
            }));

取消订阅

所有subscribe方法都返回一个ISubscriptionResult。它包含描述底层IConsumerIExchangeIQueue使用的属性,如果需要,可以使用高级API进一步操作这些属性。IAdvancedBus

您可以通过在ISubscriptionResult实例或其ConsumerCancellation属性上调用Dispose来随时取消订阅者:

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

subscriptionResult.Dispose();
// this is equivalent to subscriptionResult.ConsumerCancellation.Dispose();

这将阻止EasyNetQ从队列中消耗并关闭消费者的频道。

请注意,处理IBusIAdvancedBus实例也将取消所有使用者并关闭与RabbitMQ的连接。

难道不叫subscriptionResult.Dispose()一个消息处理程序内。这将在EasyNetQ确认消费者频道上的消息subscriptionResult.Dispose()与关闭该频道的呼叫之间产生竞争条件。由于EasyNetQ的内部架构,这些调用将在不同的线程上调用,并且时序不确定。

原文地址:https://www.cnblogs.com/wangwust/p/9437361.html

时间: 2024-10-12 13:49:59

【EasyNetQ 教程】- 订阅的相关文章

微信公众平台开发教程 订阅号与服务号的区别

为了消除大家对订阅号与服务号的疑问,特总结如下: 功能点 介绍 订阅号 服务号 注册 注册账号 个人信息 个人信息和企业相关信息 展示 在手机端展现方式 显示在订阅号文件夹中 跟微信好友一样显示 收发消息 接受和发送消息,包括: 1 文本消息 2 图片消息 3 语音消息 4 视频消息 5 音乐消息 6 图文消息 有 有 事件响应 获取关注.取消关注.自定义菜单点击事件,并产生响应 有(自定义年菜单点击事件,取决于自定义菜单权限) 有 群发消息 向全部客户或指定客户发送消息 每天一条 每月一条 自

【EasyNetQ 教程】- 发布

EasyNetQ支持的最简单的消息传递模式是发布/ 订阅.这种模式是消除消费者信息提供者的绝佳方式.出版商简单地向全世界说,“这已经发生了”或“我现在有了这些信息”.它不关心是否有人正在倾听,他们可能是谁或他们所在的位置.我们可以添加和删除特定消息类型的订阅者,而无需重新配置发布者.我们还可以让许多发布商发布相同的消息,再次添加和删除发布者,而无需重新配置任何其他发布者或订阅者. 要使用EasyNetQ发布(假设您已经创建了IBus实例): 创建消息的实例,它可以是任何可序列化的.NET类型.

【EasyNetQ 教程】- 连接RabbitMQ

如果您习惯于处理与SQL Server等关系数据库的连接,那么您可能会发现EasyNetQ处理连接的方式有点奇怪.与关系数据库的通信始终由客户端启动.客户端打开连接,发出SQL命令,在必要时处理结果,然后关闭连接.一般的建议是,您应该在尽可能短的时间内保持打开连接,并将连接池保留给API. 与RabbitMQ等消息代理进行交谈有点不同,因为连接往往会持续应用程序的生命周期.通常,您将打开连接,创建订阅,然后等待任何消息到达打开的连接.EasyNetQ不假设经纪人随时可用.相反,它采用延迟连接方法

【EasyNetQ 教程】- 使用SSL连接

EasyNetQ可以通过SSL连接.戈登·库尔特(Gordon Coulter)撰写的这本指南最初是针对一个提出的问题而写的. 首先,您必须仔细按照https://www.rabbitmq.com/ssl.html上的步骤操作.我花了很多时间试图让openssl部分工作,然后花更多的时间让它按照我需要的方式工作而不仅仅是罐装演示. 即使你让EasyNetQ使用SSL,他们在该页面上显示的那些DotNet示例代码可用于测试也是一个很大的帮助.我有一个简单的控制台应用程序,其中包含Rabbit和下面

微信开发系列教程:(1)订阅号和服务号深入分析

微信开发系列教程,将以一个实际的微信平台项目为案例,深入浅出的讲解微信开发.应用各环节的实现方案和技术细节. 原创内容,欢迎转载,转载请注明出处. 首先在第1章节中,我们先理清什么是订阅号,什么又是服务号,以及两者的关键性区别. 订阅号和服务号统称为微信公众号. 订阅号强调媒体资讯属性,为会员提供信息订阅,适用于新闻传媒类企业.订阅号从某种意义上来讲,是拿来做内容的,如果你没有原创作品,对用户没有任何意义. 服务号强调服务和应用属性,旨在为会员提供服务并与之互动.支持所有微信高级接口,几乎所有的

RabbitMQ官网教程---发布/订阅

(使用python客户端pika 0.9.8) 在前面的教程中我们创建了一个工作队列.假设在一个工作队列后面是每一个被传递给正确的工作者的任务.在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息.这种模式被称为"发布/订阅". 为了阐明这个模式,我们将构建一个简单的日志系统.它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们. 在我们的日志系统中每个运行的接收程序副本将获得这个消息.用这种方式我们将可以运行一个接收器并且直接日志到磁盘:而且同时我

redis 实战教程、redis缓存教程、redis消息发布、订阅、redis消息队列教程

一:本教程使用环境: ubuntu12.x .jdk1.7 .Intellij idea.spring3.2.8 .redis服务端3.0,jedis客户端2.7.3 spring-data-redis 1.6.0 二:redis 服务端安装教程 这里不详解 三:redis 缓存特性 示例如下: spring配置: <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

(本教程是使用Net客户端,也就是针对微软技术平台的) 在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅". 为了说明这种模式,我们将构建一个简单的日志系统.它将包括两个程序,第一个将发出日志消息,第二个将接收并打印它们. 在我们的日志系统中每个接收程序的运行副本都会得到消息.这样我们就可以运行一个接收者程序,将日志记录到磁盘:同时我们可以运行另

redis学习教程三《发送订阅、事务、连接》

redis学习教程三<发送订阅.事务.连接> 一:发送订阅      Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息.Redis 发布订阅(pub/sub)实现了消息系统,发送者(在redis术语中称为发布者)在接收者(订阅者)接收消息时发送消息.传送消息的链路称为信道. 示例 以下示例说明了发布用户概念的工作原理. 在以下示例中,一个客户端订阅名为"redisChat"的信道. redis 127.0.0.1:6