一、程序使用NetCore、引入Nuget:
Install-Package RabbitMQ.Client -Version 4.1.3
二、消息发部端:
using RabbitMQ.Client; using System; using System.Text; namespace ClientDemo { public class Client { static string exchangeName = "my-exchange"; static string queueName = "my-queue"; public static void Main() { Console.InputEncoding = Encoding.Unicode; Console.OutputEncoding = Encoding.Unicode; ConnectionFactory factory = new ConnectionFactory(); factory.Uri = new Uri("amqp://guest:[email protected]:5672/"); var conn = factory.CreateConnection(); IModel model = conn.CreateModel(); //model.ExchangeDelete(exchangeName); model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null); model.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); model.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName); var props = model.CreateBasicProperties(); props.Persistent = true;//是否持久化 while (true) { Console.WriteLine("请输入要发送的消息:"); var line = Console.ReadLine(); if (line == "exit") break; model.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: Encoding.UTF8.GetBytes(line)); } model.Close(); conn.Close(); } } }
二、消息消费端:
using RabbitMQ.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ServerDemo { public class Server { //static string exchangeName = "my-exchange"; static string queueName = "my-queue"; public static void Main() { Console.InputEncoding = Encoding.Unicode; Console.OutputEncoding = Encoding.Unicode; ConnectionFactory factory = new ConnectionFactory(); //factory.Uri = new Uri("amqp://guest:[email protected]:5672/"); var conn = factory.CreateConnection(); IModel model = conn.CreateModel(); //model.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null); //model.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); //model.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName); var task = Task.Run(() => { while (true) { var result = model.BasicGet(queue: queueName, autoAck: false); if (result == null) { Thread.Sleep(10);continue; }; var msg = Encoding.UTF8.GetString(result.Body); Console.WriteLine(msg); } }); task.Wait(); model.Close(); conn.Close(); } } }
时间: 2024-10-12 19:47:33