使用rabbitmq手动确认消息的,定时获取队列消息实现

描述问题

  最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互。

相关业务

  本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送

相关分析

  网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理、性能和效率等相关基础业务的工作量,望而却步。。。。。。

  还好spring有相关的 org.springframework.amqp 工具包,简化的大量麻烦>_> 让我们开始吧

  了解rabbit的相关几个概念

 了解了这几个概念的时候你可能已经关注到了我们今天的主题SimpleMessageListenerContainer

 我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写,通过SimpleRabbitListenerContainerFactory我们可以完成相关SimpleMessageListenerContainer容器的管理,

  但对于使用此容器批量消费的方式,官方并没有相关说明,网络上你可能只找到这篇SimpleMessageListenerContainer批量消息处理对于问题描述是很清晰,但是回答只是说的比较简单

  下面我们就对这个问题的答案来个coding

解决办法

  首先我们因为需要失败重试,使用spring的RepublishMessageRecoverer可以解决这个问题,这显然有一个缺点,即将在整个重试期间占用线程。所以我们使用了死信队列

  相关配置

  1     @Bean
  2     ObjectMapper objectMapper() {
  3         ObjectMapper objectMapper = new ObjectMapper();
  4         DateFormat dateFormat = objectMapper.getDateFormat();
  5         JavaTimeModule javaTimeModule = new JavaTimeModule();
  6
  7         SimpleModule module = new SimpleModule();
  8         module.addSerializer(new ToStringSerializer(Long.TYPE));
  9         module.addSerializer(new ToStringSerializer(Long.class));
 10         module.addSerializer(new ToStringSerializer(BigInteger.class));
 11
 12         javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
 13         javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
 14         javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
 15
 16         objectMapper.registerModule(module);
 17         objectMapper.registerModule(javaTimeModule);
 18         objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化扩展日期格式支持
 19         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
 20         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 21         return objectMapper;
 22 }
 23
 24
 25
 26   @Bean
 27   RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
 28     return new RabbitAdmin(aConnectionFactory);
 29   }
 30
 31   @Bean
 32   MessageConverter jacksonAmqpMessageConverter( ) {
 33     return new Jackson2JsonMessageConverter(objectMapper());
 34   }
 35
 36
 37   @Bean
 38   Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
 39     Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
 40     rabbitAdmin.declareQueue(queue);
 41     return queue;
 42   }
 43   @Bean
 44   Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
 45     Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
 46     rabbitAdmin.declareQueue(queue);
 47     return queue;
 48   }
 49   @Bean
 50   Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
 51     Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
 52     rabbitAdmin.declareQueue(queue);
 53     return queue;
 54   }
 55     /**
 56      * 声明一个交换机
 57      * @return
 58      */
 59   @Bean
 60   TopicExchange controlExchange () {
 61       return new TopicExchange(Exchanges.ExangeTOPIC);
 62   }
 63
 64
 65     /**
 66      * 延时重试队列
 67      */
 68     @Bean
 69     public Queue bcwPayControlRetryQueue() {
 70         Map<String, Object> arguments = new HashMap<>();
 71         arguments.put("x-message-ttl", 10 * 1000);
 72         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 73 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
 74         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 75         return new Queue("[email protected]@retry", true, false, false, arguments);
 76     }
 77     /**
 78      * 延时重试队列
 79      */
 80     @Bean
 81     public Queue bcwPushControlRetryQueue() {
 82         Map<String, Object> arguments = new HashMap<>();
 83         arguments.put("x-message-ttl", 10 * 1000);
 84         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 85 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
 86         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 87         return new Queue("[email protected]@retry", true, false, false, arguments);
 88     }
 89     /**
 90      * 延时重试队列
 91      */
 92     @Bean
 93     public Queue bcwPullControlRetryQueue() {
 94         Map<String, Object> arguments = new HashMap<>();
 95         arguments.put("x-message-ttl", 10 * 1000);
 96         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 97 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
 98 //        arguments.put("x-dead-letter-routing-key", "queue_bcw");
 99         return new Queue("[email protected]@retry", true, false, false, arguments);
100     }
101     @Bean
102     public Binding  bcwPayControlRetryBinding() {
103         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
104     }
105     @Bean
106     public Binding  bcwPushControlRetryBinding() {
107         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
108     }
109     @Bean
110     public Binding   bcwPullControlRetryBinding() {
111         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
112     }
113
114   /**
115    * 队列绑定并关联到RoutingKey
116    *
117    * @param queueMessages 队列名称
118    * @param exchange      交换机
119    * @return 绑定
120    */
121   @Bean
122   Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
123     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
124   }
125   /**
126    * 队列绑定并关联到RoutingKey
127    *
128    * @param queueMessages 队列名称
129    * @param exchange      交换机
130    * @return 绑定
131    */
132   @Bean
133   Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
134     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
135   }
136   /**
137    * 队列绑定并关联到RoutingKey
138    *
139    * @param queueMessages 队列名称
140    * @param exchange      交换机
141    * @return 绑定
142    */
143   @Bean
144   Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
145     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
146   }
147
148   @Bean
149   @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
150   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
151           SimpleRabbitListenerContainerFactoryConfigurer configurer,
152           ConnectionFactory connectionFactory) {
153     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
154     configurer.configure(factory, connectionFactory);
155     factory.setMessageConverter(jacksonAmqpMessageConverter());
156     return factory;
157   }

下面就是我们的主题,定时任务使用的是org.springframework.scheduling

  1 /**
  2  * 手动确认消息的,定时获取队列消息实现
  3  */
  4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
  5     protected final Logger logger = LoggerFactory.getLogger(getClass());
  6     private List<Message> body = new LinkedList<>();
  7     public long start_time;
  8     private Channel channel;
  9     @Autowired
 10     private ObjectMapper objectMapper;
 11     @Autowired
 12     private RabbitTemplate rabbitTemplate;
 13
 14     public QuartzSimpleMessageListenerContainer() {
 15         // 手动确认
 16         this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 17
 18         this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
 19             long current_time = System.currentTimeMillis();
 20             int time = (int) ((current_time - start_time)/1000);
 21             logger.info("====接收到{}队列的消息=====",message.getMessageProperties().getConsumerQueue());
 22             Long retryCount = getRetryCount(message.getMessageProperties());
 23             if (retryCount > 3) {
 24                 logger.info("====此消息失败超过三次{}从队列的消息删除=====",message.getMessageProperties().getConsumerQueue());
 25                 try {
 26                     channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
 27                 } catch (IOException ex) {
 28                     ex.printStackTrace();
 29                 }
 30                 return;
 31             }
 32
 33             this.body.add(message);
 34             /**
 35              * 判断数组数据是否满了,判断此监听器时间是否大于执行时间
 36              * 如果在最后延时时间段内没有业务消息,此监听器会一直开着
 37              */
 38             if(body.size()>=3 || time>60){
 39                 this.channel = channel;
 40                 callback();
 41             }
 42         });
 43
 44
 45
 46     }
 47     private void callback(){
 48 //         channel = getChannel(getTransactionalResourceHolder());
 49         if(body.size()>0 && channel !=null &&  channel.isOpen()){
 50             try {
 51                 callbackWork();
 52             }catch (Exception e){
 53                 logger.error("推送数据出错:{}",e.getMessage());
 54
 55                 body.stream().forEach(message -> {
 56                     Long retryCount = getRetryCount(message.getMessageProperties());
 57                     if (retryCount <= 3) {
 58                         logger.info("将消息置入延时重试队列,重试次数:" + retryCount);
 59                         rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
 60                     }
 61                 });
 62
 63             } finally{
 64
 65                 logger.info("flsher too data");
 66
 67                 body.stream().forEach(message -> {
 68                     //手动acknowledge
 69                     try {
 70                         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 71                     } catch (IOException e) {
 72                         logger.error("手动确认消息失败!");
 73                         e.printStackTrace();
 74                     }
 75                 });
 76
 77                 body.clear();
 78                 this.stop();
 79
 80             }
 81         }
 82
 83     }
 84     abstract void callbackWork() throws Exception;
 85     /**
 86      * 获取消息失败次数
 87      * @param properties
 88      * @return
 89      */
 90     private long getRetryCount(MessageProperties properties){
 91         long retryCount = 0L;
 92         Map<String,Object> header = properties.getHeaders();
 93         if(header != null && header.containsKey("x-death")){
 94             List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
 95             if(deaths.size()>0){
 96                 Map<String,Object> death = deaths.get(0);
 97                 retryCount = (Long)death.get("count");
 98             }
 99         }
100         return retryCount;
101     }
102
103     @Override
104     @Scheduled(cron = "0 0/2 * * * ? ")
105     public void start() {
106         logger.info("start push data scheduled!");
107         //初始化数据,将未处理的调用stop方法,返还至rabbit
108         body.clear();
109         super.stop();
110         start_time = System.currentTimeMillis();
111         super.start();
112
113         logger.info("end push data scheduled!");
114     }
115
116     public List<WDNJPullOrder> getBody() {
117
118         List<WDNJPullOrder> collect = body.stream().map(data -> {
119                     byte[] body = data.getBody();
120                     WDNJPullOrder readValue = null;
121                     try {
122                         readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
123                         });
124                     } catch (IOException e) {
125                         logger.error("处理数据出错{}",e.getMessage());
126                     }
127                     return readValue;
128                 }
129         ).collect(Collectors.toList());
130
131         return collect;
132
133
134     }
135
136 }

后续

当然定时任务的启动,你可以写到相关rabbit容器实现的里面,但是这里并不是很需要,所以对于这个的小改动,同学你可以自己实现

 @Scheduled(cron = "0 0/2 * * * ? ")

public void start()


原文地址:https://www.cnblogs.com/dmeck/p/12207284.html

时间: 2024-10-08 08:51:49

使用rabbitmq手动确认消息的,定时获取队列消息实现的相关文章

SpringBoot集成RabbitMQ(注解+手动确认)

1.pom文件 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2.yml配置文件 spring: #MQ配置 rabbitmq: addresses: 127.0.0.1 port: 5672 username: adminmq passwo

解决RabbitMQ消息丢失问题和保证消息可靠性(一)

原文链接(作者一个人):https://juejin.im/post/5d468591f265da03b810427e 工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消息可靠性如何解决的? 本文分三部分说明 RabbitMQ 消息丢失场景有哪些? 如何避免消息丢失? 如何设计部署消息中间件保证消息可靠性? RabbitMQ 消息丢失场景有哪些?

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

微信定时获取token

为了使第三方开发者能够为用户提供更多更有价值的个性化服务,微信公众平台开放了许多接口,包括自定义菜单接口.客服接口.获取用户信息接口.用户分组接口.群发接口等,开发者在调用这些接口时,都需要传入一个相同的参数access_token,它是公众账号的全局唯一票据,它是接口访问凭证. access_token的有效期是7200秒(两小时),在有效期内,可以一直使用,只有当access_token过期时,才需要再次调用接口获取access_token.在理想情况下,一个7x24小时运行的系统,每天只需

利用RabbitMQ、MySQL实现超大用户级别的消息在/离线收发

由于RabbitMQ中只有队列(queue)才能存储信息,所以用RabbitMQ实现超大用户级别(百万计)的消息在/离线收发需要对每一个用户创建一个永久队列. 但是RabbitMQ节点内存有限,经测试后发现节点集群也无法满足数百万用户队列收发数据的要求,所以最终决定采用数据库辅助实现该功能. 一.数据库结构 user_list数据库下有4张表:user_info.group_info.groupmember_info.message_info. user_info表中含有username(主键,

.Net Core 商城微服务项目系列(十五): 构建定时任务调度和消息队列管理系统

一.系统描述 嗨,好久不见各位老哥,最近有点懒,技术博客写的太少了,因为最近在写小说,写的顺利的话说不定就转行了,哈哈哈哈哈哈哈哈哈. 今天要介绍的是基于.Net Core的定时任务调度和消息队列管理系统.相信大家对这两个肯定都已经很熟悉了,在开发过程中,这两个组件扮演了不可或缺的角色: 消息队列帮助我们进行 ”解耦“.”异步“.”削峰“ 定时任务帮助我们进行 "后台".”监控".“补偿" 定时任务调度系统大家都介绍过很多次了,园子里的很多文章我也都拜读过,我相信大

XMPP系列(四)---发送和接收文字消息,获取历史消息功能

今天开始做到最主要的功能发送和接收消息.获取本地历史数据. 先上到目前为止的效果图:              首先是要在XMPPFramework.h中引入数据存储模块: //聊天记录模块的导入 #import "XMPPMessageArchiving.h" #import "XMPPMessageArchivingCoreDataStorage.h" #import "XMPPMessageArchiving_Contact_CoreDataObje

SAP RFC 获取BDC 消息文本的实现

SAP RFC 获取BDC 消息文本的实现 最近做RFC时 ,有调用一个BDC 用于 信息录入 调试程序时,单纯的处理正确.处理异常不能满足,希望获取具体的类似GUI上的提示消息 消息文本该如何获取呢? messtab内容: 初步猜测  可能是存放表 或者 用什么函数 转化,找到下面的内容: SE11  TABLE:T100 SE37  Function module:WRITE_MESSAGE  另外,需要注意一点,   messtab里有一个参数MSGNR不能直接被函数接口,需要类型转化处理

C#中使用RabbitMQ收发队列消息

一.程序使用NetCore.引入Nuget: Install-Package RabbitMQ.Client -Version 4.1.3 二.消息发部端: using RabbitMQ.Client; using System; using System.Text; namespace ClientDemo { public class Client { static string exchangeName = "my-exchange"; static string queueNa