安装环境
1.下载安装 Erlang 运行时环境
2.下载安装 RabbitMQ Server 应用程序
3.启动 RabbitMQ 服务(默认启动)
4.安装管理平台插件并打开远程访问权限
4.1.打开 RabbitMQ Comman Prompt
4.2.执行 rabbitmq-plugins enable rabbitmq_management
4.3.访问 http://localhost:15672 查看 RabbitMQ Server 相关信息(默认账密为guest)
4.4.新增远程访问用户
4.5.点击 Set permission 授权给新用户
4.6.停止 RabbitMQ 服务
4.7.修改 ...\rabbitmq_server-3.6.10\etc\rabbitmq.config.example 文件(新增第18行代码)
4.8.启动 RabbitMQ 服务
4.9.访问 http://IP:15672 测试是否成功
基本概念
单个消息生产者 VS 单个消息消费者
在 .NET 中使用 RabbitMQ 需要下载它的客户端程序集来获取并引用 RabbitMQ 相关的组件。
1 using RabbitMQ.Client; 2 using System; 3 using System.Text; 4 5 //消息生产者控制台 6 namespace Producer 7 { 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 //创建连接工厂对象 13 var factory = new ConnectionFactory 14 { 15 //目标主机IP或名称 16 HostName = "10.202.228.107", 17 //RabbitMQ Server的用户名称 18 UserName = "Tua", 19 //RabbitMQ Server的密码 20 Password = "Tua", 21 //RabbitMQ Server的默认端口号是5672可以不用指定 22 Port = 5672 23 }; 24 //创建连接对象 25 using (var connection = factory.CreateConnection()) 26 { 27 //创建消息信道对象 28 using (var channel = connection.CreateModel()) 29 { 30 //创建消息队列,只有在该消息队列不存在时才会创建 31 channel.QueueDeclare 32 ( 33 //消息队列名称 34 queue: "Tua", 35 //是否开启持久,true:即不会因为RabbitMQ服务崩溃重启而丢失消息队列 36 durable: false, 37 //是否开启反外,true:即只允许在当前连接中被访问,当连接断开时会自动清除该消息队列 38 exclusive: false, 39 //是否开启自动删除,true:即当无任何消息消费者时,也就是说最后一个连接断开时会自动清除该消息队列 40 autoDelete: false, 41 //用于消息队列的其它属性(构造参数) 42 arguments: null 43 ); 44 string msg = "Mr.Tua"; 45 //将字符串消息转换成二进制数组 46 var body = Encoding.UTF8.GetBytes(msg); 47 //发送消息,将消息发布到消息队列中 48 channel.BasicPublish 49 ( 50 //消息交换机名称 51 exchange: string.Empty, 52 //路由键名称 53 routingKey: "Tua", 54 //消息的其它属性 55 basicProperties: null, 56 //消息内容 57 body: body 58 ); 59 Console.WriteLine("Producer sent message: {0}", msg); 60 Console.ReadLine(); 61 } 62 } 63 } 64 } 65 }
1 using RabbitMQ.Client; 2 using RabbitMQ.Client.Events; 3 using System; 4 using System.Text; 5 6 //消息消费者控制台 7 namespace Consumer 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var factory = new ConnectionFactory 14 { 15 //目标主机为本地 16 HostName = "localhost" 17 }; 18 using (var connection = factory.CreateConnection()) 19 { 20 using (var channel = connection.CreateModel()) 21 { 22 //创建消息队列,用于确保不受发送端和接收端先后启动顺序影响 23 channel.QueueDeclare 24 ( 25 queue: "Tua", 26 durable: false, 27 exclusive: false, 28 autoDelete: false, 29 arguments: null 30 ); 31 //创建消息消费者对象 32 var consumer = new EventingBasicConsumer(channel); 33 //异步接收消息时的回调 34 consumer.Received += (sender, e) => 35 { 36 var body = e.Body; 37 var msg = Encoding.UTF8.GetString(body); 38 Console.WriteLine("Consumer received message: {0}", msg); 39 }; 40 //启动消息消费者 41 //消息消费者处理完消息任务时需要回应消息生产者,使其删除该消息 42 //如果消息消费者没有回应,那么消息生产者会将该消息重新发送给其它消息消费者 43 channel.BasicConsume 44 ( 45 queue: "Tua", 46 //是否自动回应,false:即需要手动进行消息回应 47 noAck: true, 48 consumer: consumer 49 ); 50 Console.ReadLine(); 51 } 52 } 53 } 54 } 55 }
运行结果
启动消息生产者控制台:
启动消息消费者控制台:
在 RabbitMQ Comman Prompt 中执行 rabbitmqctl list_queues 查看消息队列:
从本示例的运行结果可以看出消息生产者在 RabbitMQ Server 中创建了一个名为 Tua 的消息队列并含有一条 Mr.Tua 的消息,当消息消费者接收到该消息后就会立即删除该消息。
时间: 2024-10-13 06:31:25