confirm 机制分串行和并行两种.
串行
生产者
public class Producer { private const string QueueName = "test_confirm_queue"; public static void Send() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare(QueueName, false, false, false, null); //开启confirm机制.注意 : 一个队列如果之前已经设置成了事务机制,那么不能再设置为 confirm 机制.反之亦然 channel.ConfirmSelect(); string msg = "hello world "; //发送消息 for (int i = 0; i < 10; i++) { channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i)); } //设置confirm机制的串行机制,如果消息采用的是持久化,那么确认消息会在持久化后发出. //可以发一批后,调用该方法;也可以每发一条调用一次. //当然是发一批消息后调用好些!
if (channel.WaitForConfirms()) { Console.WriteLine("send is success"); } else { Console.WriteLine("send is failed"); } channel.Close(); connection.Close(); } }
并行(异步)
生产者
public class Producer { private const string QueueName = "test_confirm_queue"; //这里一定要用 SortedSet private static readonly SortedSet<ulong> ConfirmSort = new SortedSet<ulong>(); public static void Send() { using (IConnection connection = ConnectionHelper.GetConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, false, false, false, null); channel.ConfirmSelect(); //成功 channel.BasicAcks += (s, e) => { //多条 if (e.Multiple) { Console.WriteLine("最后成功的一条是 : " + e.DeliveryTag); ConfirmSort.RemoveWhere(r => r <= e.DeliveryTag); } //单条 else { Console.WriteLine(e.DeliveryTag + " 成功发送 "); ConfirmSort.Remove(e.DeliveryTag); } }; //失败 channel.BasicNacks += (s, e) => { //多条 if (e.Multiple) { Console.WriteLine("最后失败的一条是 : " + e.DeliveryTag); } //单条 else { Console.WriteLine(e.DeliveryTag + " 发送失败 "); } }; //发送消息 string msg = "hello world "; int i = 0; while (true) { ulong seqNo = channel.NextPublishSeqNo; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i)); ConfirmSort.Add(seqNo);//把编号加入到集合中 i++; } } } } }
原文地址:https://www.cnblogs.com/refuge/p/10351218.html
时间: 2024-11-29 12:07:45