RabbitMQ是一个消息代理,基本思路很简单:接收和推送消息。你可以把它当做是邮局,当你把信件放进邮箱里时,你可以非常的确信邮递员一定会把信件交给收件人。RabbitMQ就是一个邮箱,邮局以及快递员。
RabbitMQ就是一个邮箱,邮局以及快递员。
和邮局的不同的是,RabbitMQ处理的不是信纸,而是接收,存储,转发二进制的消息数据。
以下是RabbitMQ中的一些术语:
- 生产(Producing)就是发送。发送消息的程序就是生产者(Producer)。生产者用“P”代表,如下图
- 队列(queue)是信箱在RabbitMQ中的名称。虽然消息是RabbitMQ和应用程序之间进行传递,但它们也可以被存储在队列中。队列的大小没有任何限制,你可以存储任意多的消息进队列到无限缓存当中。可以多个生产者发送消息到同一个队列,也可以多个消费者从同一个队列接收消息。队列表示如下图
- 消费(Consuming)的意思是接收。等待接收消息的程序称为消费者(Consumer),用一个“C“代表,如下图
注意:生产者,消费者,代理一般放在不同的服务器上。
Hello World
这部分实现两个程序,生产者发送一个简单的消息,然后接收者接收消息并显示在屏幕上,发送的消息内容是”Hello world”。
如下图示,”P”是生产者,”C”是消费者。中间的长方体是队列,用作RabbitMQ的消息缓存,代消费者使用。
可以通过Nuget安装RabbitMQ的客户端程序包
发送
命名消息发送器为Sende.cs,消息接收器为Receive.cs。发送器连接RabbitMQ,发送一个消息,然后退出。
在Send.cs中创建连接,代码如下
using System; using RabbitMQ.Client; using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { ... } } } }
这个连接封装了Socket连接,提供协议版本制定和认证等。这里我们连接本机上的代理,如果要连接其它服务器,只要指定机器名或IP地址。
接下来是创建信道,大部分的API都已经封装。
为了发送消息,需要定义一个队列,然后发送消息到这个队列。
using System; using RabbitMQ.Client; using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
定义的队列如果不存的话就会创建。消息的内容是字节数组,所以你可以编码任何的你想发送的消息。
上边的代码运行结束后,信道和连接会被释放。
接收
接收器接收RabbitMQ推送过来的消息,所以不像发送器只发送单一的消息,接收器需要监听消息并显示。
Receive.cs中的代码和发送者类似,建立一个连接和信道,并定义要使用的队列。注意队列名要和发送器里的队列一样。
class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); ... } } } }
注意这里我们定义了队列。因为可能在发送器之前先启动接收器,我们需要确保需要使用的队列已经存在。
我们要告诉服务器从队列里推送消息,因为消息是民步发送的,所以我们需要提供一个回调事件EventingBasicConsumer,用于处理接收到的消息。
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
运行
建立两个控制台项目Send和Receive,分别运行以上代码。
接收器通过RabbitMQ接收来自发送器的消息并显示,接收器会一直运行等待消息。
如果要检查队列,可以使用命令 rabbitmqctl list_queues