rabbitMQ学习笔记(三) 消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者

 

一、消息确认

为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者

默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样



QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到消息后,就会自动反馈一个消息给服务器。

下面这个例子来测试消息确认的功能。

Sender03.java

 1 package com.zf.rabbitmq03;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8
 9 /**
10  * 发送消息
11  * @author zhoufeng
12  *
13  */
14 public class Sender03 {
15
16     public static void main(String[] args) throws IOException {
17
18
19         ConnectionFactory connFac = new ConnectionFactory() ;
20
21         //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
22         connFac.setHost("127.0.0.1");
23
24         //创建一个连接
25         Connection conn = connFac.newConnection() ;
26
27         //创建一个渠道
28         Channel channel = conn.createChannel() ;
29
30         //定义Queue名称
31         String queueName = "queue01" ;
32
33         //为channel定义queue的属性,queueName为Queue名称
34         channel.queueDeclare( queueName , false, false, false, null) ;
35
36         String msg = "Hello World!";
37
38         //发送消息
39         channel.basicPublish("", queueName , null , msg.getBytes());
40
41         System.out.println("send message[" + msg + "] to "+ queueName +" success!");
42
43         channel.close();
44         conn.close();
45
46     }
47
48 }

与Sender01.java一样,没有什么区别。

Recv03.java

 1 package com.zf.rabbitmq03;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.ConsumerCancelledException;
 9 import com.rabbitmq.client.QueueingConsumer;
10 import com.rabbitmq.client.QueueingConsumer.Delivery;
11 import com.rabbitmq.client.ShutdownSignalException;
12
13 /**
14  * 接收消息
15  * @author zhoufeng
16  *
17  */
18 public class Recv03 {
19
20     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
21
22         ConnectionFactory connFac = new ConnectionFactory() ;
23
24         connFac.setHost("127.0.0.1");
25
26         Connection conn = connFac.newConnection() ;
27
28         Channel channel = conn.createChannel() ;
29
30         String channelName = "channel01";
31
32         channel.queueDeclare(channelName, false, false, false, null) ;
33
34
35         //配置好获取消息的方式
36         QueueingConsumer consumer = new QueueingConsumer(channel) ;
37
38
39         //取消 autoAck
40         boolean autoAck = false ;
41
42         channel.basicConsume(channelName, autoAck, consumer) ;
43
44         //循环获取消息
45         while(true){
46
47             //获取消息,如果没有消息,这一步将会一直阻塞
48             Delivery delivery = consumer.nextDelivery() ;
49
50             String msg = new String(delivery.getBody()) ;
51
52             //确认消息,已经收到
53             channel.basicAck(delivery.getEnvelope().getDeliveryTag()
54                     , false);
55
56             System.out.println("received message[" + msg + "] from " + channelName);
57         }
58
59     }
60
61 }

注意:一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息

如果将上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);注释掉, Sender03.java只需要运行一次 , Recv03.java每次运行将都会收到HelloWorld消息

注意:

但是这样还是不够的,如果rabbitMQ-Server突然挂掉了,那么还没有被读取的消息还是会丢失 ,所以我们可以让消息持久化。 只需要在定义Queue时,设置持久化消息就可以了,方法如下:



boolean durable = true;
channel.queueDeclare(channelName, durable, false, false, null);

这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起的数据丢失了。  但是服务器如果刚收到消息,还没来得及写入到硬盘,就挂掉了,这样还是无法避免消息的丢失。

 

二、公平调度

上一个例子能够实现发送一个Message与接收一个Message

从上一个Recv01中可以看出,必须处理完一个消息,才会去接收下一个消息。如果生产者众多,那么一个消费者肯定是忙不过来的。此时就可以用多个消费者来对同一个Channel的消息进行处理,并且要公平的分配任务给多个消费者。不能部分很忙  部分总是空闲

实现公平调度的方式就是让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(1);可以设置

时间: 2024-11-08 09:19:57

rabbitMQ学习笔记(三) 消息确认与公平调度消费者的相关文章

RabbitMQ 消息确认与公平调度消费者

一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer = new QueueingConsumer(channel

rabbitMQ学习笔记(五) 消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉. 这时候就可以对消息进行过滤了. 在消费者端设置好需要接收的消息类型. 如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型. channel.basicPublish( exchangeName , messageType , null , msg.getBytes()); 示例

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

rabbitmq学习(三) —— 工作队列

工作队列,又称任务队列,主要思想是避免立即执行资源密集型任务,并且必须等待完成.相反地,我们进行任务调度,我们将一个任务封装成一个消息,并将其发送到队列.工作进行在后台运行不断的从队列中取出任务然后执行.当你运行了多个工作进程时,这些任务队列中的任务将会被工作进程共享执行. 这个概念在 Web 应用程序中特别有用,在短时间 HTTP 请求内需要执行复杂的任务. 准备工作 现在,假装我们很忙,我们使用 Thread.sleep 来模拟耗时的任务. 发送端 public class NewTask

Caliburn.Micro学习笔记(三)----事件聚合IEventAggregator和 Ihandle&lt;T&gt;

Caliburn.Micro学习笔记(三)----事件聚合IEventAggregator和 Ihandle<T> 今天 说一下Caliburn.Micro的IEventAggregator和IHandle<T>分成两篇去讲这一篇写一个简单的例子 看一它的的实现和源码 下一篇用它们做一个多语言的demo 这两个是事件的订阅和广播,很强大,但用的时候要小心发生不必要的冲突. 先看一下它的实现思想 在Caliburn.Micro里EventAggregator要以单例的形式出现这样可以

NFC学习笔记——三(在windows操作系统上安装libnfc)

本篇翻译文章: 这篇文章主要是说明如何在windows操作系统上安装.配置和使用libnfc. 一.基本信息 1.操作系统: Windows Vista Home Premium SP 2 2.硬件信息: System: Dell Inspiron 1720 Processor: Intel Core 2 Duo CPU T9300 @ 2.5GHz 2.5GHz System type: 32-bit Operating System 3.所需软件: 在windows操作系统上安装软件需要下列

加壳学习笔记(三)-简单的脱壳思路&amp;调试思路

首先一些windows的常用API: GetWindowTextA:以ASCII的形式的输入框 GetWindowTextW:以Unicaode宽字符的输入框 GetDlgItemTextA:以ASCII的形式的输入框 GetDlgItemTextW:以Unicaode宽字符的输入框 这些函数在使用的时候会有些参数提前入栈,如这函数要求的参数是字符串数目.还有大小写啦之类的东西,这些东西是要在调用该函数之前入栈,也就是依次push,就是说一般前面几个push接着一个call,那前面的push可能

马哥学习笔记三十二——计算机及操作系统原理

缓存方式: 直接映射 N路关联 缓存策略: write through:通写 write back:回写 进程类别: 交互式进程(IO密集型) 批处理进程(CPU密集型) 实时进程(Real-time) CPU: 时间片长,优先级低IO:时间片短,优先级高 Linux优先级:priority 实时优先级: 1-99,数字越小,优先级越低 静态优先级:100-139,数据越小,优先级越高 实时优先级比静态优先级高 nice值:调整静态优先级   -20,19:100,139   0:120 ps

Spring Batch学习笔记三:JobRepository

此系列博客皆为学习Spring Batch时的一些笔记: Spring Batch Job在运行时有很多元数据,这些元数据一般会被保存在内存或者数据库中,由于Spring Batch在默认配置是使用HSQLDB,也就是说在Job的运行过程中,所有的元数据都被储存在内存中,在Job结束后会随着进程的结束自动消失:在这里我们推荐配置JobRepository去使用MySQL. 在这种情况下,Spring Batch在单次执行或者从一个执行到另外一个执行的时候会使用数据库去维护状态,Job执行的信息包