.net平台的rabbitmq使用封装

原文:.net平台的rabbitmq使用封装

前言

  RabbitMq大家再熟悉不过,这篇文章主要整对rabbitmq学习后封装RabbitMQ.Client的一个分享。文章最后,我会把封装组件和demo奉上。

Rabbitmq的关键术语

  1、绑定器(Binding):根据路由规则绑定Queue和Exchange。

  2、路由键(Routing Key):Exchange根据关键字进行消息投递。

  3、交换机(Exchange):指定消息按照路由规则进入指定队列

  4、消息队列(Queue):消息的存储载体

  5、生产者(Producer):消息发布者。

  6、消费者(Consumer):消息接收者。

Rabbitmq的运作

  从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。

  那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。

  推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)

  拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)

  使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。

  可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。

  这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。

Publish(发布)的封装

  步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。

 1         /// <summary>
 2         /// 交换器声明
 3         /// </summary>
 4         /// <param name="iModel"></param>
 5         /// <param name="exchange">交换器</param>
 6         /// <param name="type">交换器类型:
 7         /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
 8         /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
 9         /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
10         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
11         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
12         /// 交换机转发消息是最快的。
13         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
14         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
15         /// 只会匹配到“audit.irs”。</param>
16         /// <param name="durable">持久化</param>
17         /// <param name="autoDelete">自动删除</param>
18         /// <param name="arguments">参数</param>
19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,
20             bool durable = true,
21             bool autoDelete = false, IDictionary<string, object> arguments = null)
22         {
23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
25         }
26
27         /// <summary>
28         /// 队列声明
29         /// </summary>
30         /// <param name="channel"></param>
31         /// <param name="queue">队列</param>
32         /// <param name="durable">持久化</param>
33         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
34         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
35         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
36         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
37         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
38         /// <param name="autoDelete">自动删除</param>
39         /// <param name="arguments">参数</param>
40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
41             bool autoDelete = false, IDictionary<string, object> arguments = null)
42         {
43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
45         }
46
47         /// <summary>
48         /// 获取Model
49         /// </summary>
50         /// <param name="exchange">交换机名称</param>
51         /// <param name="queue">队列名称</param>
52         /// <param name="routingKey"></param>
53         /// <param name="isProperties">是否持久化</param>
54         /// <returns></returns>
55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
56         {
57             return ModelDic.GetOrAdd(queue, key =>
58             {
59                 var model = _conn.CreateModel();
60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
61                 QueueDeclare(model, queue, isProperties);
62                 model.QueueBind(queue, exchange, routingKey);
63                 ModelDic[queue] = model;
64                 return model;
65             });
66         }
67
68         /// <summary>
69         /// 发布消息
70         /// </summary>
71         /// <param name="routingKey">路由键</param>
72         /// <param name="body">队列信息</param>
73         /// <param name="exchange">交换机名称</param>
74         /// <param name="queue">队列名</param>
75         /// <param name="isProperties">是否持久化</param>
76         /// <returns></returns>
77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
78         {
79             var channel = GetModel(exchange, queue, routingKey, isProperties);
80
81             try
82             {
83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());
84             }
85             catch (Exception ex)
86             {
87                 throw ex.GetInnestException();
88             }
89         }        

  下次是本机测试的发布速度截图:

  4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。

Subscribe(订阅)的封装

  发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的JOB进行定时重发,因此,finally是应答成功的。

        /// <summary>
        /// 获取Model
        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {
            return ModelDic.GetOrAdd(queue, value =>
             {
                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);

                 //每次消费的消息数
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;

                 return model;
             });
        }    

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {
            //队列声明
            var channel = GetModel(queue, isProperties);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();
                try
                {
                    handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }
                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }        

  下次是本机测试的发布速度截图:

  快的时候有1.9K/S,慢的时候也有1.7K/S

Pull(拉)的封装

  直接上代码:

        /// <summary>
        /// 获取消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {
            var channel = GetModel(exchange, queue, routingKey);

            var result = channel.BasicGet(queue, false);
            if (result.IsNull())
                return;

            var msg = result.Body.DeserializeUtf8().FromJson<T>();
            try
            {
                handler(msg);
            }
            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }
            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }    

  快的时候有1.8K/s,稳定是1.5K/S

Rpc(远程调用)的封装

  首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

  1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

  2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {
            var channel = GetModel(exchange, queue, routingKey, isProperties);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);

            try
            {
                var correlationId = Guid.NewGuid().ToString();
                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());

                var sw = Stopwatch.StartNew();
                while (true)
                {
                    var ea = consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        return ea.Body.DeserializeUtf8();
                    }

                    if (sw.ElapsedMilliseconds > 30000)
                        throw new Exception("等待响应超时");
                }
            }
            catch (Exception ex)
            {
                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {
            //队列声明
            var channel = GetModel(queue, isProperties);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();

                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    msg = handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }
                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

  可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

结尾

  本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 https://github.com/SkyChenSky/RabbitMq 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

  如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

原文地址:https://www.cnblogs.com/lonelyxmas/p/10309429.html

时间: 2024-10-12 05:33:25

.net平台的rabbitmq使用封装的相关文章

微信公众平台消息接口开发-封装weixin.class.php(转)

一.封装weixin.class.php 由于微信公众平台的通信使用的是特定格式的XML数据,每次接受和回复都要去做一大堆的数据处理. 我们就考虑在这个基础上做一次封装,weixin.class.php,代码如下: <?php class Weixin {     public $token = '';//token     public $debug =  false;//是否debug的状态标示,方便我们在调试的时候记录一些中间数据     public $setFlag = false;

微信公众平台消息接口开发-封装weixin.class.php

原文:微信公众平台消息接口开发-封装weixin.class.php 一.封装weixin.class.php 由于微信公众平台的通信使用的是特定格式的XML数据,每次接受和回复都要去做一大堆的数据处理. 我们就考虑在这个基础上做一次封装,weixin.class.php,代码如下: <?php class Weixin {     public $token = '';//token     public $debug =  false;//是否debug的状态标示,方便我们在调试的时候记录一

三维引擎设计-多线程渲染(平台API基础和封装大致框架)

第一部分: Linux线程API基础 一:线程创建与结束 (1)pthread_t //线程的标识符类型 (2)pthread_create //用来创建一个线程, 参数线程标识符, 线程属性, 线程运行函数地址 (3)pthread_join //用来等待一个线程的结束, 参数被等待线程标识符,用户自定义指针 (4)pthread_exit //线程非正常结束,参数线程返回代码 二:修改线程属性 (1)pthread_attr_t //线程属性结构类型 (2)pthread_attr_init

NET操作RabbitMQ组件EasyNetQ

NET操作RabbitMQ组件EasyNetQ使用中文简版文档. 本文出自EasyNetQ官方文档,内容为自己理解加翻译.文档地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Quick-Start EasyNetQ简介 EasyNetQ是基于官方.NET组件RabbitMQ.Client 的又一层封装,使用起来更加方便,开发者不用关心具体队列声明,路由声明等细节,几句简单代码即可发送消息到队列,接收消息也很简单,下面将简单介绍EasyNetQ的使用方法.

.NET操作RabbitMQ组件EasyNetQ使用中文简版文档。

EasyNetQ简介 EasyNetQ是基于官方.NET组件RabbitMQ.Client 的又一层封装,使用起来更加方便,开发者不用关心具体队列声明,路由声明等细节,几句简单代码即可发送消息到队列,接收消息也很简单,下面将简单介绍EasyNetQ的使用方法.不知道什么是RabbitMQ?您可以关闭网页了. 安装EasyNetQ 从NuGet上安装即可,由于EasyNetQ是依赖RabbitMQ.Client所以,会同时安装两个dll. PM> Install-Package EasyNetQ

Java异步消息平台

l  JAVA平台异步消息模块 JAVA平台异步消息模块,是一个针对RabbitMQ的消息发送及处理封装,包含消息的配置.发送.接收.失败重试.日志记录等,总共分为4个部分: 1)RabbitMQ访问封装:JAMQP(Jar包) 2)消息模块公共对象.配置读取及接口定义:JMSG(Jar包) 3)消息发送端:JMSG—Client(Jar包) 4)消息接收端:JMSG—Server(War包)   l  RabbitMQ简介 MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息

架构设计之NodeJS操作消息队列RabbitMQ

一. 什么是消息队列? 消息(Message)是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象. 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递.消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的.这样发布者和使用者都不用知道对方的存在. 二. 常用的消息队列有哪些? RabbitMQ.RocketMQ.ActiveMQ.Kafka

RabbitMQ-从基础到实战(1)— Hello RabbitMQ

转自:https://yq.aliyun.com/articles/589923 1.简介 本篇博文介绍了在windows平台下安装RabbitMQ Server端,并用JAVA代码实现收发消息 2.安装RabbitMQ RabbitMQ是用Erlang开发的,所以需要先安装Erlang环境,在这里下载对应系统的Erlang安装包进行安装 点击这里下载对应平台的RabbitMQ安装包进行安装 Windows平台安装完成后如图 3.启用RabbitMQ Web控制台 RabbitMQ提供一个控制台

【JavaScript】Hybrid App开发 四大主流移平台分析

转自http://dev.yesky.com/238/34657738.shtml Hybrid App在过去的两年中已经成为移动界的核心话题,但是作为一名Web开发者来说要如何站在移动互联网的浪潮之巅呢?是选择学习原生开发,研究Java.Object-C.C#等语言,还是选择继续使用网页开发,容忍HTML5功能的局限性?就在开发者左右为难的情况下Hybrid App作为一个折中的解决方案诞生了.那么究竟什么才是Hybrid App呢? Hybrid App概念 Hybrid App:Hybri