C# 封装RabbitMQ消息队列处理

现在使用.net领域使用RabbitMQ有很多解决方案,我自己使用过的有两个,一个是EasyNetQ,一个是CAP,都挺好用的,尤其是CAP,懒人推荐使用,怎么使用的文章也很多,小伙伴可以自行搜索。

最近我自己尝试根据目前手头项目的需要,自行封装一下基于RabbitMQ的使用,下面开搞,贴上我自己写的代码。

首先定义消息发布者/生产者接口:

 1 using System.Threading.Tasks;
 2
 3 namespace fx.MQ
 4 {
 5     public interface IPublisher
 6     {
 7         /// <summary>
 8         /// 释放资源。
 9         /// </summary>
10         void Dispose();
11         /// <summary>
12         ///
13         /// </summary>
14         /// <typeparam name="T"></typeparam>
15         /// <param name="message"></param>
16         void Publish<T>(T message) where T : class;
17         /// <summary>
18         ///
19         /// </summary>
20         /// <param name="message"></param>
21         /// <param name="channelName"></param>
22         void Publish(string message, string channelName);
23         /// <summary>
24         ///
25         /// </summary>
26         /// <typeparam name="T"></typeparam>
27         /// <param name="message"></param>
28         /// <returns></returns>
29         Task PublishAsync<T>(T message) where T : class;
30     }
31 }

定义订阅者/消费者接口:

 1 using System;
 2 using System.Threading.Tasks;
 3
 4 namespace fx.MQ
 5 {
 6     public interface ISubscriber
 7     {
 8         /// <summary>
 9         ///
10         /// </summary>
11         void Dispose();
12         /// <summary>
13         ///
14         /// </summary>
15         /// <typeparam name="T"></typeparam>
16         /// <param name="channelName"></param>
17         /// <returns></returns>
18         void Subscribe(string channelName, Action<string> callback);
19         /// <summary>
20         ///
21         /// </summary>
22         /// <typeparam name="T"></typeparam>
23         /// <param name="channelName"></param>
24         /// <returns></returns>
25         Task<T> SubscribeAsync<T>(string channelName) where T : class;
26     }
27 }

定义RabbmitMQProvider

 1 using RabbitMQ.Client;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Text;
 5
 6 namespace fx.MQ
 7 {
 8     public class RabbitMQProvider
 9     {
10         private readonly string _ipAddress;
11         private readonly int? _port;
12         private readonly string _username;
13         private readonly string _password;
14
15         public RabbitMQProvider(string ipAddress, int? port, string username, string password)
16         {
17             _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能为空!");
18             _port = port ?? throw new ArgumentException("端口不能为空");
19             _username = username ?? throw new ArgumentException("用户名不能为空");
20             _password = password ?? throw new ArgumentException("密码不能为空");
21
22             ConnectionFactory = new ConnectionFactory//创建连接工厂对象
23             {
24                 HostName = _ipAddress,//IP地址
25                 Port = (int)_port,//端口号
26                 UserName = _username,//用户账号
27                 Password = _password//用户密码
28             };
29         }
30
31         public IConnectionFactory ConnectionFactory { get; }
32
33     }
34 }

实现生产者:

  1 using Newtonsoft.Json;
  2 using RabbitMQ.Client;
  3 using System;
  4 using System.Text;
  5 using System.Threading.Tasks;
  6
  7 namespace fx.MQ
  8 {
  9     /// <summary>
 10     /// 消息发布者。
 11     /// </summary>
 12     public class RabbitMQPublisher : IPublisher
 13     {
 14
 15         private readonly RabbitMQProvider _provider;
 16         private IConnection _connection;
 17         public RabbitMQPublisher(RabbitMQProvider provider)
 18         {
 19             _provider = provider;
 20             _connection = _provider.ConnectionFactory.CreateConnection();
 21         }
 22
 23         public IConnection Connection
 24         {
 25             get
 26             {
 27                 if (_connection != null)
 28                     return _connection;
 29                 return _connection = _provider.ConnectionFactory.CreateConnection();
 30             }
 31         }
 32
 33         private IModel _channel;
 34         public IModel Channel
 35         {
 36             get
 37             {
 38                 if (_channel != null)
 39                     return _channel;
 40                 else
 41                     return _channel = _connection.CreateModel();
 42             }
 43         }
 44
 45         public void Dispose()
 46         {
 47             if (Channel != null)
 48             {
 49                 if (Channel.IsOpen)
 50                     Channel.Close();
 51                 Channel.Abort();
 52                 Channel.Dispose();
 53             }
 54
 55             if (Connection != null)
 56             {
 57                 if (Connection.IsOpen)
 58                     Connection.Close();
 59             }
 60         }
 61
 62         public void Publish<T>(T message) where T : class
 63         {
 64             var channelName = typeof(T).Name;
 65             Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
 66
 67             var msgContent = JsonConvert.SerializeObject(message);
 68             var msgByte = Encoding.UTF8.GetBytes(msgContent);
 69             Channel.BasicPublish
 70             (
 71                 exchange: channelName,
 72                 routingKey: string.Empty,
 73                 mandatory: false,
 74                 basicProperties: null,
 75                 body: msgByte
 76             );
 77         }
 78
 79
 80         public void Publish(string message, string channelName)
 81         {
 82             Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
 83
 84             var msgByte = Encoding.UTF8.GetBytes(message);
 85             Channel.BasicPublish
 86             (
 87                 exchange: channelName,
 88                 routingKey: string.Empty,
 89                 mandatory: false,
 90                 basicProperties: null,
 91                 body: msgByte
 92             );
 93         }
 94
 95         public Task PublishAsync<T>(T message) where T : class
 96         {
 97             throw new NotImplementedException();
 98         }
 99     }
100 }

实现消费者:

  1 using RabbitMQ.Client;
  2 using RabbitMQ.Client.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7
  8 namespace fx.MQ
  9 {
 10     /// <summary>
 11     /// 消息订阅者/消费者。
 12     /// </summary>
 13     public class RabbitMQSubscriber : ISubscriber
 14     {
 15         private readonly RabbitMQProvider _provider;
 16         private IConnection _connection;
 17         public RabbitMQSubscriber(RabbitMQProvider provider)
 18         {
 19             _provider = provider;
 20             _connection = _provider.ConnectionFactory.CreateConnection();
 21         }
 22
 23         public IConnection Connection
 24         {
 25             get
 26             {
 27                 if (_connection != null)
 28                     return _connection;
 29                 return _connection = _provider.ConnectionFactory.CreateConnection();
 30             }
 31         }
 32
 33         private IModel _channel;
 34         public IModel Channel
 35         {
 36             get
 37             {
 38                 if (_channel != null)
 39                     return _channel;
 40                 else
 41                     return _channel = _connection.CreateModel();
 42             }
 43         }
 44
 45
 46         public void Dispose()
 47         {
 48             if (_channel != null)
 49             {
 50                 _channel.Abort();
 51                 if (_channel.IsOpen)
 52                     _channel.Close();
 53
 54                 _channel.Dispose();
 55             }
 56
 57             if (_connection != null)
 58             {
 59                 if (_connection.IsOpen)
 60                     _connection.Close();
 61
 62                 _connection.Dispose();
 63             }
 64         }
 65
 66         /// <summary>
 67         /// 消费消息,并执行回调。
 68         /// </summary>
 69         /// <param name="channelName"></param>
 70         /// <param name="callback"></param>
 71         public void Subscribe(string channelName, Action<string> callback)
 72         {
 73             //声明交换机
 74             Channel.ExchangeDeclare(exchange: channelName, type: "fanout");
 75             //消息队列名称
 76             var queueName = channelName + "_" + Guid.NewGuid().ToString().Replace("-", "");
 77             //声明队列
 78             Channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
 79             //将队列与交换机进行绑定
 80             Channel.QueueBind(queue: queueName, exchange: channelName, routingKey: "");
 81             //声明为手动确认,每次只消费1条消息。
 82             Channel.BasicQos(0, 1, false);
 83             //定义消费者
 84             var consumer = new EventingBasicConsumer(Channel);
 85             //接收事件
 86             consumer.Received += (eventSender, args) =>
 87             {
 88                 var message = args.Body;//接收到的消息
 89
 90                 callback(Encoding.UTF8.GetString(message));
 91                 //返回消息确认
 92                 Channel.BasicAck(args.DeliveryTag, true);
 93             };
 94             //开启监听
 95             Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
 96
 97         }
 98
 99         public Task<T> SubscribeAsync<T>(string channelName) where T : class
100         {
101             throw new NotImplementedException();
102         }
103     }
104 }

到这里为止,简单的实现消息队列的接受,发送,已经满足我自己当前项目的需要了。这里我用的exchange进行消息队列的生产消费,并且用fanout模式,就是一个生产者对应多个消费者,有点类似于消息广播,另外还有两种模式,可以根据需要修改。

下面是测试代码:

 1 using System;
 2 using System.Windows.Forms;
 3
 4 namespace fx.MQ.TestForm
 5 {
 6     public partial class Form1 : Form
 7     {
 8         private readonly RabbitMQProvider _provider;
 9         private readonly RabbitMQPublisher _publisher;
10         private readonly RabbitMQSubscriber _subscriber;
11         delegate void Callback(string msg);
12
13         public Form1()
14         {
15             _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin");
16             _publisher = new RabbitMQPublisher(_provider);
17             _subscriber = new RabbitMQSubscriber(_provider);
18             //callback = new Callback(ShowMessage);
19             InitializeComponent();
20         }
21
22         private void button1_Click(object sender, EventArgs e)
23         {
24             _publisher.Publish(textBox1.Text, "public");
25         }
26
27         private void Form1_Load(object sender, EventArgs e)
28         {
29
30             _subscriber.Subscribe("public", c=> {
31                 ShowMessage(c);
32             });
33         }
34
35
36         private void ShowMessage(string msg)
37         {
38             if (this.richTextBox1.InvokeRequired)
39             {
40                 var cb = new Callback(ShowMessage);
41                 this.Invoke(cb, new object[] { msg });
42             }
43             else
44             {
45                 this.richTextBox1.Text = msg;
46             }
47         }
48     }
49 }

运行效果如图所示:

OK,没有问题。

另外注意,退出程序时消息发布者和订阅者都需要Dispose()来释放连接。

原文地址:https://www.cnblogs.com/my85016629/p/12072401.html

时间: 2024-08-09 05:59:52

C# 封装RabbitMQ消息队列处理的相关文章

Golang调用Rabbitmq消息队列和封装

前言 介绍Rabbimq Rabbitmq消息队列是干嘛的? 简单的说,消息队列,引申一下就是传递消息用的队列,也可以称为传递消息的通信方法.用争抢订单的快车举个例子,假如,A用户发送了一个用车的消息,那么消息队列要做的就是把A用户用车的这个消息广而告之,发送到一个公用队列当中,司机只管取到消息,而不管是谁发布的,这就是一个简单的消息队列例子,Rabbitmq其实就是消息队列的一种,用的比较多的还可能有Redis,kafka,ActiceMq等等,这个后面的博文里面我会说,这次我们只说Rabbi

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列1: Detailed Introduction 详细介绍

1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco .Redhat.iMatix 等联合制定了 AMQP 的公开标

rabbitmq 消息队列

rabbitmq 消息队列: 解耦:降低一个程序降低耦合性 异步: 优点:--解决排队的问题. --解决资源浪费的问题.   --讲要处理的事物,进行存放,集中处理. 缺点:--不能保证任务被及时执行 应该场景:--去哪儿网 --12306 同步: 优点:--可以保证任务被及时执行 缺点:--排队问题,占用资源,造成资源浪费 大并发: web环境: --Nginx (epoll模式)   10000-20000 --Apache(epoll模式)1000-2000 pv = page visit