1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者。
2.实际操作:
这里使用一个生产者产生多条数据提供给3个消费者
生产者代码:
public class Producter { //队列名称 private final public static void main(String[] //配置rabbitmq服务器地址 ConnectionFactory factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("starktan"); factory.setPassword("starktan"); factory.setVirtualHost("/"); //建立连接和通道 Connection Channel channel = //声明队列,可以手动在mq中创建 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //写入10条数据(直接循环写入) for (int i = 0; i < 10; i++) { System.out.println("发送第" + i + "信息!"); String message = "WorkQueue channel.basicPublish("", QUEUE_NAME, true, null, } channel.close(); connection.close(); } } |
消费者代码
public class Consumer {
//队列名称 private final static String QUEUE_NAME = "Work_Queue"; public static void main(String[] args) throws IOException, //创建连接和通道 ConnectionFactory factory = factory.setHost("localhost"); final Connection connection = factory.newConnection(); ExecutorService service = for(int i=0;i<3;i++){ final int cur = i; service.submit(new Runnable() { Channel channel = connection.createChannel(); public void run() { //创建队列消费者 QueueingConsumer consumer = //指定消费队列 try { channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery String System.out.println("线程 "+cur+" 获取到消息 " + message + "开始处理"); Thread.sleep(1000*(cur+5)*2); System.out.println("线程 "+cur+" "+message + "处理完成"); } } catch (Exception e) { e.printStackTrace(); } } }); } service.shutdown(); } } |
运行效果:(消费者循环调度)
3.消息确认:
在处理一个比较耗时的任务的时候,如果消费者在中途崩溃掉,则对应的这条数据就丢失了,为了避免消息丢失的情况,RabbitMQ提供了消息确认
使用两个消费者进行演示,调用方法
public void getConsum() throws IOException, TimeoutException, InterruptedException { //创建连接和通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //创建队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消费队列,且改为手动确认 channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 获取到消息 " + message + "开始处理"); try{ Thread.sleep(10000); }catch (InterruptedException e){} finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag() , false); } System.out.println( message + "处理完成"); } } |
手动关掉一个消费者
消息被另一个消费者继续进行处理;
4.公平调度:
channel.basicQos(1);//保证一次只分发一个
5.持久化: 保证当RabbitMQ服务器崩溃关机也不会造成消息丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第二个参数改为true