上篇文章讲了简单队列的使用,这其实就是RMQ给的demo,实际并没有什么用
本篇讲讲工作模式队列,也称之为任务队列
一个生产者发布了多条消息,消费者A可以接受消息,接受消息后该消息就消除,消费者B可以接受其他消息
使用场景,一些数据库操作比较缓慢的话可以分别给多个接口调用,降低压力,或者抢单场景也能考虑,
比如就10个商品,100个消费者来抢单,前10个抢到了后,消息队列就为空了,那么第11个以后的所有消费者都不会抢到
代码示例:
生产者
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明队列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 for (int i = 0; i < 50; i++) { 14 // 消息内容 15 String message = "" + i; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent ‘" + message + "‘"); 18 19 Thread.sleep(i * 10); 20 } 21 22 channel.close(); 23 connection.close(); 24 } 25 }
消费者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者 15 channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,手动返回完成 设置fasle代表需要手动返回消息的确认状态 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received ‘" + message + "‘"); 27 // 休眠 28 Thread.sleep(10); 29 // 手动确认 返回确认状态 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
消费者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者 15 channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,手动返回完成状态 设置fasle代表需要手动返回消息的确认状态 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received ‘" + message + "‘"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 // 手动确认 返回确认状态 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
时间: 2024-10-07 13:56:28