由于公司业务需求,最近想上RabbitMQ,之前我研究了一段时间微软的MSMQ。开源队列有很多,各有优劣。就先拿RabbitMQ练练手吧。本篇着重代码部分,至于怎么安装,怎么配置不在赘述。而且代码是在RabbitMQ.NET Client 类库基础上实现。
假设阅读本文的人已经安装好RabbitMQ并且做了相应的用户配置。而且项目中已经从nuget安装了rabbitmq.client.dll.我们开始做一个简单的队列发送和接收消息。
- 将需要配置的东西放在配置文件里,例如主机地址,端口,用户名,密码等。
- 实现消息发送端:Product
- 实现消息接收端:Customer
- Demo测试
将以下内容作为可配置部分放在配置文件中
<appSettings> <!--RabbitMQ--> <add key="RabbitMQ_HostUri" value="amqp://192.168.1.119:5672/"/> <add key="RabbitMQ_HostName" value="192.168.1.119"/> <add key="RabbitMQ_UserName" value="test_user"/> <add key="RabbitMQ_Password" value="123456"/> <add key="RabbitMQ_VirtualHost" value="ms_mq"/> </appSettings>
由于只是对RabbitMQ.Client.dll中的又一次封装,所以代码不过多解释,其中要注意的就是某些配置问题,例如是否持久化,消息处理模式是怎么样的等等。
首先我们创建一个连接工厂:
public ConnectionFactory CreateFactory() { if (_factory == null) { const ushort heartbeat = 0; //主机地址 Uri uri = new Uri(RabbitMQConfig.HostUri); _factory = new ConnectionFactory(); //_factory.HostName = RabbitMQConfig.HostName; //用户名 _factory.UserName = RabbitMQConfig.UserName; //密码 _factory.Password = RabbitMQConfig.PassWord; //虚拟主机名 _factory.VirtualHost = RabbitMQConfig.VirtualHost; //连接终端 _factory.Endpoint = new AmqpTcpEndpoint(uri); _factory.RequestedHeartbeat = heartbeat; //自动重连 _factory.AutomaticRecoveryEnabled = true; } return _factory; }
一个简单的消息发布:(对代码研究不够透彻,只能一切从简~~)
public void Publish(string message, string queueName=null) { if (queueName == null) { queueName = _queueName; } var factory = RabbitMQFactory.Instance.CreateFactory(); using (var connection = factory.CreateConnection()) { using (var model = connection.CreateModel()) { //消息持久化,防止丢失 model.QueueDeclare(queueName, RabbitMQConfig.IsDurable, false, false, null); var properties = model.CreateBasicProperties(); properties.Persistent = RabbitMQConfig.IsDurable; properties.DeliveryMode = 2; //消息转换为二进制 var msgBody = Encoding.UTF8.GetBytes(message); //消息发出到队列 model.BasicPublish("", queueName, properties, msgBody); } } }
消息接收:
public void Consume() { var factory = RabbitMQFactory.Instance.CreateFactory(); var connection = factory.CreateConnection(); connection.ConnectionShutdown += Connection_ConnectionShutdown; ListenChannel = connection.CreateModel(); bool autoDeleteMessage = false; var queue = ListenChannel.QueueDeclare(_queueName, RabbitMQConfig.IsDurable, false, false, null); //公平分发,不要同一时间给一个工作者发送多于一个消息 ListenChannel.BasicQos(0, 1, false); //创建事件驱动的消费者类型,不要用下边的死循环来消费消息 var consumer = new EventingBasicConsumer(ListenChannel); consumer.Received += Consumer_Received; //消费消息 ListenChannel.BasicConsume(_queueName, autoDeleteMessage, consumer); }
我在Customer中定义了一个 ReceiveMessageCallback Func回调,这里就是当客户端从队列接收到消息之后,怎么处理由客户端来决定
public Func<string, bool> ReceiveMessageCallback { get; set; }
处理消息:
private void Consumer_Received(object sender, BasicDeliverEventArgs args) { try { var body = args.Body; var message = Encoding.UTF8.GetString(body); //将消息业务处理交给外部业务 bool result = ReceiveMessageCallback(message); if (result) { if (ListenChannel != null && !ListenChannel.IsClosed) { ListenChannel.BasicAck(args.DeliveryTag, false); } } else { } } catch (Exception ex) { throw ex; } }
基本代码已经完成,我们写一个测试,消息发送端:
static void Main(string[] args) { var testQueueName = "test"; IMessageProduct product = new MessageProduct(testQueueName); for (int i = 0; i < 10000; i++) { Console.WriteLine("正在发送第" + i + "条消息..."); product.Publish("消息体" + i); } Console.Read(); }
消息接收端:(开多个口接收)
static void Main(string[] args) { Parallel.For(0, RabbitMQConfig.ThreadCount, i => { IMessageCustomer customer = new MessageCustomer("test"); //开始监听 customer.StartListening(); customer.ReceiveMessageCallback = message => { //客户端处理消息(打印) Console.WriteLine("接收到消息:" + message); return true; }; }); Console.Read(); }
打开发送消息端:
打开消息接收端:
到此为止,RabbitMQ队列的简单测试就完成了,没有介绍什么新知识,基本就是套DLL中的方法,不过也有很多不合理的地方,如果真正应用到项目中,还需要多加测试和修改。
DEMO地址:https://github.com/fanpan26/RabbitMQ.NETClient
时间: 2024-11-07 22:16:52