关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置
公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个 短信的winform的服务。那么到后期的话,登录服务器之后,全是
一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端
该案例的接收端,源自网上的代码片段 片内容,做了部分修改之后使用
日志中心的 功能要求使用注入解耦,所以,这里我也解耦了,如果日至那边使用的是 autofac,我只里使用的MEF实现注入 所以定义了相关的接口对象
IMQContextFactory:
using Ecostar.MQLogger.Core.Infrastructure; using System; namespace Ecostar.MQConsumer.Core.Infrastructure { /// <summary> /// 仅仅只有sender使用到 /// </summary> public interface IMQContextFactory { MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog); } }
对应的实现类:MQContextFactory
using Ecostar.MQLogger.Core.Infrastructure; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Security.Cryptography; namespace Ecostar.MQConsumer.Core.Infrastructure { /// <summary> /// 仅仅只有sender使用到 /// </summary> [Export(typeof(IMQContextFactory))] public class MQContextFactory : IMQContextFactory { /// <summary> /// 上下文字典 /// </summary> private static readonly Dictionary<string, MQContext> Contexts = new Dictionary<string, MQContext>(); /// <summary> /// 上下文操作锁字典,只创建一次 /// </summary> public static readonly Dictionary<string, object> contextLockers = new Dictionary<string, object>(); /// <summary> /// 更新上下文操作锁字典时的锁,只创建一次 /// </summary> private static readonly object contextLockersLocker = new object(); /// <summary> /// 获取指定的上下文 /// </summary> /// <param name="mqUri">mq地址</param> /// <param name="toLog">日志记录</param> /// <returns>上下文对象</returns> public MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog) { var key = MD5Encrypt(mqUri); var locker = GetFactoryLocker(key); lock (locker) { MQContext context; if (!Contexts.TryGetValue(key, out context)) { Guid contextId = Guid.NewGuid(); string logHeader = string.Format("[{0}]", contextId.ToString()); context = new MQContext() { ReceiveQueueName = "Logs", Id = contextId }; Console.WriteLine(logHeader + " 初始化发送上下文完毕"); // 获取连接 context.SendConnection = CreateConnection(mqUri); context.SendConnection.AutoClose = false; context.SendConnection.ConnectionShutdown += (o, e) => Console.WriteLine(" RabbitMQ错误,连接被关闭了:" + e.ReplyText); Console.WriteLine(logHeader + " 创建连接完毕", LogLevel.Trace); // 获取通道 context.SendChannel = CreateChannel(context.SendConnection); Console.WriteLine(logHeader + " 创建通道完毕", LogLevel.Trace); Contexts.Add(key, context); } return context; } } #region 私有方法 /// 创建连接 /// </summary> /// <param name="mqUrl"></param> /// <returns></returns> private static IConnection CreateConnection(string mqUrl) { const ushort heartbeta = 120; var factory = new ConnectionFactory() { Uri = mqUrl, RequestedHeartbeat = heartbeta, AutomaticRecoveryEnabled = true }; return factory.CreateConnection(); } /// <summary> /// 创建通道 /// </summary> /// <param name="connection"></param> /// <returns></returns> private static IModel CreateChannel(IConnection connection) { if (connection != null) return connection.CreateModel(); return null; } /// <summary> /// 获取上下文操作锁 /// </summary> /// <param name="contextKey">上下文工厂key</param> /// <returns></returns> private static object GetFactoryLocker(string contextKey) { lock (contextLockersLocker) { object locker; if (!contextLockers.TryGetValue(contextKey, out locker)) { locker = new object(); contextLockers.Add(contextKey, locker); } return locker; } } /// <summary> /// 获取字符的MD5值 /// </summary> /// <param name="str"></param> /// <returns></returns> private static string MD5Encrypt(string str) { MD5 md5 = new MD5CryptoServiceProvider(); byte[] result = md5.ComputeHash(System.Text.Encoding.Default.GetBytes(str)); return System.Text.Encoding.Default.GetString(result); } #endregion } }
注视我写的很明白,这部分的使用 是 生产者使用的类,也就是 发送消息
下面是消费者:
IReceiver.cs:
namespace Ecostar.MQConsumer.Core { public interface IReceiver { /// <summary> /// 初始化接收程序 /// </summary> /// <param name="mqUrls"></param> void InitialReceive(MQReceiverParam receiverParams); } }
对应的实现类:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Threading; namespace Ecostar.MQConsumer.Core { [Export(typeof(IReceiver))] public class Receiver : IReceiver { private MQContext _context; private const ushort Heartbeta = 60; private string _queueName; private bool _isAutoAck; private List<string> _mqUrls; private Func<byte[], bool> _processFunction; private Action<string> _mqActionLogFunc; private MQConnectionFactory _ConnectionFactoryParams; public void InitialReceive(MQReceiverParam receiverParams) { _queueName = receiverParams._queueName; _isAutoAck = receiverParams._isAutoAck; _mqUrls = receiverParams._mqUrls; _processFunction = receiverParams._processFunction; _mqActionLogFunc = receiverParams._mqActionLogFunc; _ConnectionFactoryParams = receiverParams.ConnectionFactoryParam; receiverParams._mqUrls.ForEach(url => InitReceive(_queueName, _isAutoAck, url)); } /// <summary> /// 初始化某个节点的接收 /// </summary> private void InitReceive(string queueName, bool isAutoAck, string mqUrl) { Guid contextId = Guid.NewGuid(); string logHeader = string.Format("[{0}, {1}]", queueName, contextId.ToString()); try { _context = new MQContext() { Id = contextId, ReceiveQueueName = queueName, IsAutoAck = isAutoAck, ReceiveConnection = new ConnectionFactory() { HostName = _ConnectionFactoryParams.HostName, UserName = _ConnectionFactoryParams.UserName, Password = _ConnectionFactoryParams.Password, VirtualHost = _ConnectionFactoryParams.VirtualHost }.CreateConnection() }; // 监听Shutdown事件,记录下LOG便于排查和监管服务的稳定性 _context.ReceiveConnection.ConnectionShutdown += (o, e) => { _mqActionLogFunc(" RabbitMQ错误,连接被关闭了:" + e.ReplyText); }; // 获取通道 _context.ReceiveChannel = _context.ReceiveConnection?.CreateModel(); // 创建事件驱动的消费者 var consumer = new EventingBasicConsumer(_context.ReceiveChannel); consumer.Received += (o, e) => { try { // 接受数据处理逻辑 // e.Body var result = _processFunction(e.Body); if (!isAutoAck) { if (!result) { Thread.Sleep(300); // 未能处理完成的话,将消息重新放入队列头 _context.ReceiveChannel.BasicReject(e.DeliveryTag, true); _mqActionLogFunc(" 消息未处理成功,将消息重新放入队列头"); } else if (!_context.ReceiveChannel.IsClosed) { // 处理成功并且通道未关闭时ack回去,删除队列中的消息 _context.ReceiveChannel.BasicAck(e.DeliveryTag, false); _mqActionLogFunc(" 消息处理成功,发送Ack完毕"); } } } catch (Exception ex) { Thread.Sleep(300); if (!isAutoAck) { // 将消息重新放入队列头 _context.ReceiveChannel.BasicReject(e.DeliveryTag, true); } _mqActionLogFunc(" 处理数据发生异常:" + ex.Message + ex.StackTrace); } }; // 一次只获取一条消息 _context.ReceiveChannel.BasicQos(0, 1, false); _context.ReceiveChannel.BasicConsume(_context.ReceiveQueueName, _context.IsAutoAck, consumer); _mqActionLogFunc(" 初始化队列完毕"); } catch (Exception ex) { _mqActionLogFunc(" 初始化RabbitMQ出错:" + ex.Message + ex.StackTrace); } } } }
使用到的参数 MQReceiverParam:
using System; using System.Collections.Generic; namespace Ecostar.MQConsumer.Core { /// <summary> /// 消费者入参 /// </summary> public class MQReceiverParam { public string _queueName { get; set; } public bool _isAutoAck { get; set; } public List<string> _mqUrls { get; set; } public Func<byte[], bool> _processFunction { get; set; } public Action<string> _mqActionLogFunc { get; set; } public MQConnectionFactory ConnectionFactoryParam { get; set; } } /// <summary> /// 服务配置 /// </summary> public class MQConnectionFactory { public string HostName {get;set;} public string UserName {get;set;} public string Password {get;set;} public string VirtualHost {get;set;} } }
重力要说明一下:
Func<byte[], bool> _processFunction { get; set; }
Action<string> _mqActionLogFunc { get; set; }
参数的对象中,有这么两个委托,原因是,如果你在学习 rabbitmq得这块内容的时候,你会发现,网上很多案例,以及官方提供的案例,写法都比较简单,而且,都是讲业务逻辑和 rabbitmq的消费的这跨功能 耦合到了一起
如果其他地方使用的时候,还是重复,创建 connection 创建queue,绑定,,,,,等相关动作,代码不仅不美观,而且显得繁琐,啰嗦,所以,这两个委托类型的参数,起到了接偶的作用,似的 具体的业务逻辑和 rabbitmq的消费逻辑 分离
使用如下:
(我是在窗体上直接放置了一个 richTextBox的控件,讲接收的信息打印出来,)
using Ecostar.MQConsumer.Core; using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.ComponentModel.Composition.Hosting; using System.IO; using System.Reflection; using System.Text; using System.Windows.Forms; namespace Ecostar.MQConsumer.UI { [Export] public partial class MQMainForm : Form { #region Fields private static CompositionContainer _container;//MEF 部件组合 管理 [Import] public IReceiver Receiver { get; set; } #endregion public MQMainForm() { InitializeComponent(); } private void MQMainForm_Load(object sender, EventArgs e) { InitForm(); InitialListener(); } public void InitForm() { AggregateCatalog catalog = new AggregateCatalog(); catalog.Catalogs.Add(new DirectoryCatalog(Directory.GetCurrentDirectory())); catalog.Catalogs.Add(new AssemblyCatalog(Assembly.GetExecutingAssembly())); _container = new CompositionContainer(catalog); } /// <summary> /// 初始化监听程序 /// </summary> void InitialListener() { MQMainForm form; try { form = _container.GetExportedValue<MQMainForm>(); } catch (Exception ex) { throw; } form.Receiver.InitialReceive(new MQReceiverParam() { _queueName = "testQueueName", _isAutoAck = false, _mqUrls = new List<string>() { "amqp://127.0.0.1:5672/" }, _processFunction = (buffer) => { string receiveMsg = Encoding.UTF8.GetString(buffer); this.rtb_receive.Invoke(new Action(() => { { this.rtb_receive.Text += receiveMsg + "\r\n"; } })); return true; }, _mqActionLogFunc = (msg) => { this.rtb_receive.Invoke(new Action(() => { this.rtb_receive.Text += "====MQ Action====" + msg + "\r\n"; })); }, ConnectionFactoryParam = new MQConnectionFactory() { HostName = "127.0.0.1", UserName = "CC", Password = "123qwe", VirtualHost = "/" } }); } } }
其中的 testQueueName,是客户端发送的 消息列队名称,也就是queue的名称,你也可以(如果是测试),在mq服务器上 人为的添加这个queue名称之后再测试。
这样一来,_processFunction 这个用于消费的方法,可以,写任意的处理方式,比如打印到控制台,输出到床体 控件显示,写入到日志,写入到数据库等等。
而且中的 _mqActionLogFunc,适用于记录mq的消费过程的日志,比如 mq消费操作执行过程中发生异常 ,那么直接找mq的问题即可。
截图中还一个:MQContext类,这是一个部分类,为了方便区分,我把消费者,生产者 公共部分分别放置到了三个部分类中:
MQContext.Consumer.cs
using RabbitMQ.Client; namespace Ecostar.MQConsumer.Core { /// <summary> /// MQ 消费者 /// </summary> public partial class MQContext { // <summary> /// 用户监听的Connection /// </summary> public IConnection ReceiveConnection { get; set; } /// <summary> /// 用于监听的Channel /// </summary> public IModel ReceiveChannel { get; set; } /// <summary> /// 监听队列名 /// </summary> public string ReceiveQueueName { get; set; } } }
MQContext.cs
namespace Ecostar.MQConsumer.Core { /// <summary> /// MQ 生产者消费者公共部分 /// </summary> public partial class MQContext { /// <summary> /// mq地址 /// </summary> public string MQUrl { get; set; } } }
MQContext.Producer.cs
using RabbitMQ.Client; using System; namespace Ecostar.MQConsumer.Core { /// <summary> /// MQ 生产者 /// </summary> public partial class MQContext { /// <summary> /// 用于发送消息的Connection /// </summary> public IConnection SendConnection { get; set; } /// <summary> /// 用于发送消息到Channel /// </summary> public IModel SendChannel { get; set; } /// <summary> /// 发送的Exchange /// </summary> public string Exchange { get; set; } /// <summary> /// 是否启用自动删除 /// </summary> public bool IsAutoAck { get; set; } /// <summary> /// 上下文ID /// </summary> public Guid Id { get; set; } /// <summary> /// 路由 /// </summary> public string RouteKey { get; set; } /// <summary> /// 是否正在运行,默认false /// </summary> public bool IsRunning { get; set; } /// <summary> /// 回收此上下文 /// </summary> public void Recovery() { IsRunning = false; } } }
到此,这个简单的消费案例就完成了。
下面的是 生产者,(发送消息的案例),为了造数据,所以写的随意些:(一个控制台程序),nuget引入 rabbitmq.client ,指令: install-package rabbitmq.Client
using RabbitMQ.Client; using System; using System.Text; namespace RubbitMQClient { /// <summary> /// 1.Routing (按路线发送接收) /// </summary> public class RoutingType { public static void RoutingProducer(string[] arguments) { arguments = new string[] { "0","" }; string serverAddress = "127.0.0.1"; string account = "CC"; string password = "123qwe"; ConnectionFactory factory = new ConnectionFactory() { HostName = serverAddress, UserName = account, Password = password, VirtualHost = "/" }; IConnection conn = factory.CreateConnection(); for (int i = 0; i < 1000; i++) { arguments[1] = i.ToString(); string queueName = "testQueueName"; using (var channel = conn.CreateModel()) { //---1.声明durable Exchange 和 Queue-------------------------------------------------------------------------------------------------------------- channel.ExchangeDeclare(Consts.EXCHANGE_NAME_DIRECT, "direct", durable: true, autoDelete: false, arguments: null); //arguments = new[] { "12321", "32432" }; channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queueName, Consts.EXCHANGE_NAME_DIRECT, routingKey: "");//queueName //---------------------------------------------------------------------------------------------------------------------- //---2.发布持久化消息到队列 --------------------------------------------------------------------------------------------------- var props = channel.CreateBasicProperties(); //props.Priority = 3;//控制优先级 props.DeliveryMode = 2;//将信息也持久化 props.Persistent = true;///SetPersistent方式提示已经过时,建议使用当前方式 string severity = getSeverity(arguments); string message = getMessage(arguments); byte[] buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish(Consts.EXCHANGE_NAME_DIRECT, routingKey: "", basicProperties: props, body: buffer); ////---消费消息 //BasicGetResult msgResponse = channel.BasicGet(queueName, noAck: true); //var msgBody = Encoding.UTF8.GetString(msgResponse.Body); //Console.WriteLine(msgBody); //3.1(发布方式还有一种 基于推送的事件订阅 )第二种方式(使用内置的 QueueingBasicConsumer 提供简化的编程模型,通过允许您在共享队列上阻塞,直到收到一条消息) //var consumer = new QueueingBasicConsumer(channel); //channel.BasicConsume(queueName, noAck: true, consumer: consumer); //var msgResponse = consumer.Queue.Dequeue(); //blocking //var msgBody = Encoding.UTF8.GetString(msgResponse.Body); } } conn.Close(); Console.ReadKey(); } private static String getSeverity(String[] strings) { if (strings.Length < 1) return "routing(direct) type info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.Length < 2) return "routing(direct) --> Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { return strings[1].ToString(); } } }
抽时间将上面涉及到的 mq一些相关 属性(常用的API的),在总结下,主要是零散,其实东西很简单,如何更好的,更灵活的组合到一起,是这个插件使用的 最主要一点。