spring cloud延时队列的使用

  • 用户购买一笔订单,需要在订单的有效截止时间前一定时间,提醒用户去使用。到达有效结束时间,将订单设置为失效。这时候可以用延时队列可以很好的解决,用户下单之后,在有效期前发送一条提醒用户去使用的消息,和一条订单已经失效的消息。

    入口

    /**
     * 爆品助力状态提醒
     *
     * @param req 爆品助力失败
     */
    @RequestMapping(path = "/mq/product/sendProductHelpStatusMessage", method = RequestMethod.POST)
    Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req);

生产者

    @Override
    public Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        req.put("sendTime",sdf.format(new Date()));
        String beanToJson = JsonUtils.beanToJson(req);
        log.info("sendProductHelpStatusMessage:{}",beanToJson);
        productHelpStatusMessageChannel.productHelpStatusOutput().send(MessageBuilder.withPayload(beanToJson).setHeader("x-delay", req.get("delay")).build());
        return 1;
    }

将消息发送出去,延时delay毫秒,同时记录下消息发送的时间。这样就可以根据传递的参数来确定延时的具体时长。

消费者

package org.xxx.mq.provider.consumer.product;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.xxx.mq.api.channel.consumer.ProductHelpStatusMessageChannel;

import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@EnableBinding(value = {ProductHelpStatusMessageChannel.class})
public class ProductHelpStatusConsumer {
    @StreamListener(target = ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
    public void receiveProductHelpStatusWxMessage(String message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("receiveProductHelpStatusWxMessage:{},receiveTime,{}",message,sdf.format(new Date()));

    }
}

接受消息,同时记录下接受消息的时间。

通道

package org.xxx.mq.api.channel.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ProductHelpStatusMessageChannel {
    //爆品助力
    String PRODUCT_HELP_STATUS_OUTPUT = "productHelpStatusOutput";

    String PRODUCT_HELP_STATUS_WX_INPUT = "productHelpStatusWxInput";

    /**
     * 爆品助力消息发送通道
     *
     * @return
     */
    @Output(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_OUTPUT)
    MessageChannel productHelpStatusOutput();

    /**
     * 爆品助力消息订阅(微信消息)
     *
     * @return SubscribableChannel,消息订阅通道
     */
    @Input(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
    SubscribableChannel getProductHelpStatusWxInputChannel();
}

配置application.yml

spring:
  datasource:
    hikari:
      maximum-pool-size: 50
      minimum-idle: 50
  cloud:
    stream:
      rabbit:
        bindings:
          #订阅通道 测试通道
          productHelpStatusOutput:
            producer:
              delayed-exchange: true
          productHelpStatusWxInput:
            consumer:
              auto-bind-dlq: true
              republishToDlq: true
              requeueRejected: true
              delayed-exchange: true
              dlq-ttl: ${axd.queue.dlq.ttl}
              dlq-dead-letter-exchange:
      bindings:
        #生产者 爆品助力状态消息发送通道
        productHelpStatusOutput:
          destination: productHelpStatusExchange
          group: productHelpFailQueueGroup
        #消费者
        productHelpStatusWxInput:
          destination: productHelpStatusExchange
          group: productHelpStatusWxGroup
          consumer:
            max-attempts: 3
            backOffInitialInterval: 1000
            backOffMaxInterval: 10000
            backOffMultiplier: 2.0

需要配置等等。。。如上,
spring.cloud.stream.rabbit.bindings.productHelpStatusOutput.producer.delayed-exchange=true
spring.cloud.stream.bindings.productHelpStatusOutput.producer.delayed-exchange=true

测试

打开postman,请求接口,

{
    "delay": 10000,
    "orderSn":1
}

orderSn订单和delay延时时间(单位为毫秒)。请求orderSn=1的延时10秒delay=10000,orderSn=2的延时5秒delay=5000

2020-02-21 17:03:45.125  INFO [axd-mq,b4e13b1dc86b0d7f,b4e13b1dc86b0d7f,true] 90220 --- [0.0-1205-exec-3] o.a.m.p.controller.ProductMqController   : sendProductHelpStatusMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"}
2020-02-21 17:03:51.769  INFO [axd-mq,820007cc648d3e43,820007cc648d3e43,true] 90220 --- [0.0-1205-exec-4] o.a.m.p.controller.ProductMqController   : sendProductHelpStatusMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"}
2020-02-21 17:03:55.369  INFO [axd-mq,b4e13b1dc86b0d7f,2b113702a181d49b,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer    : receiveProductHelpStatusWxMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"},receiveTime,2020-02-21 17:03:55
2020-02-21 17:03:56.847  INFO [axd-mq,820007cc648d3e43,60f5a71795043ac2,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer    : receiveProductHelpStatusWxMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"},receiveTime,2020-02-21 17:03:56

如上订单1在 17:03:45发送消息,10秒后17:03:55消费者受到消息。订单2也在5秒后受到消息。

原文地址:https://www.cnblogs.com/mentalidade/p/12342111.html

时间: 2024-10-14 12:01:22

spring cloud延时队列的使用的相关文章

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率. 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复. 那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错

几种常见的微服务架构方案简述——ZeroC IceGrid、Spring Cloud、基于消息队列

微服务架构是当前很热门的一个概念,它不是凭空产生的,是技术发展的必然结果.虽然微服务架构没有公认的技术标准和规范草案,但业界已经有一些很有影响力的开源微服务架构平台,架构师可以根据公司的技术实力并结合项目的特点来选择某个合适的微服务架构平台,以此稳妥地实施项目的微服务化改造或开发进程.本文选自<架构解密:从分布式到微服务>一书,了解本书详情请点击阅读原文. 本文盘点了四种常用的微服务架构方案,分别是ZeroC IceGrid.Spring Cloud.基于消息队列与Docker Swarm 1

你真的了解微服务架构吗?听听八年阿里架构师怎样讲述Dubbo和Spring Cloud微服务架构

微服务架构是互联网很热门的话题,是互联网技术发展的必然结果.它提倡将单一应用程序划分成一组小的服务,服务之间互相协调.互相配合,为用户提供最终价值.虽然微服务架构没有公认的技术标准和规范或者草案,但业界已经有一些很有影响力的开源微服务架构框架提供了微服务的关键思路,例如Dubbo和Spring Cloud.各大互联网公司也有自研的微服务框架,但其模式都于这二者相差不大. 微服务主要的优势如下: 1.降低复杂度 将原来偶合在一起的复杂业务拆分为单个服务,规避了原本复杂度无止境的积累.每一个微服务专

Dubbo和Spring Cloud微服务架构‘

微服务架构是互联网很热门的话题,是互联网技术发展的必然结果.它提倡将单一应用程序划分成一组小的服务,服务之间互相协调.互相配合,为用户提供最终价值.虽然微服务架构没有公认的技术标准和规范或者草案,但业界已经有一些很有影响力的开源微服务架构框架提供了微服务的关键思路,例如Dubbo和Spring Cloud.各大互联网公司也有自研的微服务框架,但其模式都于这二者相差不大. 微服务主要的优势如下: 1.降低复杂度 将原来偶合在一起的复杂业务拆分为单个服务,规避了原本复杂度无止境的积累.每一个微服务专

听听八年阿里架构师怎样讲述Dubbo和Spring Cloud微服务架构

转自:https://baijiahao.baidu.com/s?id=1600174787011483381&wfr=spider&for=pc 微服务架构是互联网很热门的话题,是互联网技术发展的必然结果.它提倡将单一应用程序划分成一组小的服务,服务之间互相协调.互相配合,为用户提供最终价值.虽然微服务架构没有公认的技术标准和规范或者草案,但业界已经有一些很有影响力的开源微服务架构框架提供了微服务的关键思路,例如Dubbo和Spring Cloud.各大互联网公司也有自研的微服务框架,但

Dubbo和Spring Cloud微服务架构对比

微服务架构是互联网很热门的话题,是互联网技术发展的必然结果.它提倡将单一应用程序划分成一组小的服务,服务之间互相协调.互相配合,为用户提供最终价值. 虽然微服务架构没有公认的技术标准和规范或者草案,但业界已经有一些很有影响力的开源微服务架构框架提供了微服务的关键思路,例如 Dubbo 和 Spring Cloud. 各大互联网公司也有自研的微服务框架,但其模式都与这二者相差不大. 微服务主要的优势 降低复杂度 将原来耦合在一起的复杂业务拆分为单个服务,规避了原本复杂度无止境的积累. 每一个微服务

Spring Cloud Alibaba 新一代微服务解决方案

本篇是「跟我学 Spring Cloud Alibaba」系列的第一篇, 每期文章会在公众号「架构进化论」进行首发更新,欢迎关注. 1.Spring Cloud Alibaba 是什么 Spring Cloud Alibaba 是阿里巴巴提供的微服务开发一站式解决方案,是阿里巴巴开源中间件与 Spring Cloud 体系的融合. 马老师左手双十一,右手阿里开源组件,不仅占据了程序员的购物车,还要攻占大家的开发工具. 先说说 Spring Cloud 提起微服务,不得不提 Spring Clou

(一)整合spring cloud云服务架构 - Spring Cloud简介

Spring Cloud是一系列框架的有序集合.利用Spring Boot的开发模式简化了分布式系统基础设施的开发,如服务发现.注册.配置中心.消息总线.负载均衡.断路器.数据监控等(这里只简单的列了一部分),都可以用Spring Boot的开发风格做到一键启动和部署.Spring Cloud将目前比较成熟.经得起实际考验的服务框架组合起来,通过Spring Boot风格进行再封装,屏蔽掉了复杂的配置和实现原理,最终整合出一套简单易懂.易部署和易维护的分布式系统架构平台. Spring Clou

【译文】用Spring Cloud和Docker搭建微服务平台

by Kenny Bastani Sunday, July 12, 2015 转自:http://www.kennybastani.com/2015/07/spring-cloud-docker-microservices.html This blog series will introduce you to some of the foundational concepts of building a microservice-based platform using Spring Cloud