Rabbitmq 实现延时任务

1、需要用到插件 rabbitmq_delayed_message_exchange 来实现,插件下载地址:https://www.rabbitmq.com/community-plugins.html

2、下载后把插件放到 plugins 里面,然后到 sbin里面打开cmd,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令

3、插件装好后,重新启动mq,然后集成mq。

  首先,导包

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  

  然后,配置文件配置连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.acknowledge-mode=manual

  

  mq 配置:

  

package com.rrg.gz.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 *  mq配置
 * @author huangsz  2019/4/25 0025
 */
@Configuration
public class RabbitPluginConfig {

    /**
     * 延时队列交换机
     * 注意这里的交换机类型:CustomExchange
     * @return
     */
    @Bean
    public CustomExchange delayExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("rrg_delay_exchange","x-delayed-message",true, false,args);
    }

    /**
     * 延时队列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue("rrg_delay_queue",true);
    }

    /**
     * 给延时队列绑定交换机
     * @return
     */
    @Bean
    public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange){
        return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("rrg_delay_key").noargs();
    }
}

  发送消息类、接收类和信息类,信息类是我们自己时间业务封装需要消费的信息。

package com.rrg.gz.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息发送者
 *
 * @author huangsz  2019/3/7 0007
 */
@Component
public class Sender {
    private static Logger log = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(MqEntity entity, long time) {
        // 这里的消息可以是任意对象,无需额外配置,直接传即可
        log.info("延时队列生产消息");
        this.rabbitTemplate.convertAndSend(
                "rrg_delay_exchange",
                "rrg_delay_key",
                entity,
                message -> {
                    // 注意这里时间可以使long,而且是设置header
                    message.getMessageProperties().setHeader("x-delay",time);
                    return message;
                }
        );
        log.info("{}ms后执行", time);
    }

}

package com.rrg.gz.mq;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 接受者
 *
 * @author huangsz  2019/3/7 0007
 */

@Component
public class Receiver {
    private static Logger log = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    private Sender sender;

    @RabbitListener(queues = "rrg_delay_queue")
    public void cfgUserReceiveDealy(MqEntity entity, Message message, Channel channel) throws Exception{
        log.info("开始接受消息!");
        // 通知 MQ 消息已被接收,可以ACK(从队列中删除)了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("接收消息并打印");
        System.out.println(entity);
    }
}

package com.rrg.gz.mq;

import java.io.Serializable;

/**
 *  一定要实现 Serializable
 * @author huangsz  2019/3/7 0007
 */
public class MqEntity implements Serializable {

    private Integer userId;
    private String msg;

    public MqEntity() {
    }

    public MqEntity(Integer userId, String msg) {
        this.userId = userId;
        this.msg = msg;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "MqEntity{" +
                "userId=" + userId +
                ", msg=‘" + msg + ‘\‘‘ +
                ‘}‘;
    }
}

4、写一个controller测试:

  

@RequestMapping("/test1")
    public void test(){
        MqEntity mqEntity = new MqEntity(1,"30秒后消费");
        sender.sendDelayMessage(mqEntity,30000);
    }

    @RequestMapping("/test2")
    public void test2(){
        MqEntity mqEntity = new MqEntity(1,"10秒后消费");
        sender.sendDelayMessage(mqEntity,10000);
    }

先执行test1,然后执行test2,这个时候,不需要等test1消费完之后,test2才消费信息。

原文地址:https://www.cnblogs.com/hsz-csy/p/11332418.html

时间: 2024-10-11 06:51:36

Rabbitmq 实现延时任务的相关文章

spring boot Rabbitmq集成,延时消息队列实现

本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现: spring boot提供对mq消息队列支持amqp相关包,引入即可: [html] view plain copy <!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-

基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景

前言 传统处理超时订单 采取定时任务轮训数据库订单,并且批量处理.其弊端也是显而易见的:对服务器.数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好 当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作 jdk延迟队列 DelayQueue 采取jdk自带的延迟队列能很好的优化传统的处理方案,但是该方案的弊.端也是非常致命的,所有的消息数据都是存于内存之中,一旦

springboot使用RabbitMQ实现延时任务

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费.那么,为什么需要延迟消费呢?我们来看以下的场景 订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单.短信通知: 下单成功后 60s 之后给用户发送短信通知.失败重试: 业务操作失败后,间隔一定的时间进行失败重试. 本文基于springboot,使用rabbitmq_delayed_message_exchange插件实现延时队列(RabbitMQ及其插件环境安装点此),具体实践如

RabbitMq 实现延时队列-Springboot版本

rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列: 原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列: 步骤: 1.创建带有时限的队列 dealLineQueue; 2.创建死信Faout交换机dealLineExchange; 3.创建消费队列realQueue,并和dealLineExchange绑定 4.配置dealLineQueue 的过期时间,消息过期后的死信交换机,重发的routing-key: 以下

RabbitMQ实现延时队列(死信队列)

基于队列和基于消息的TTL TTL是time to live 的简称,顾名思义指的是消息的存活时间.rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身. 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒.不同队列的过期时间互相之间没有影响,即使是对于同一条消息.队列中的消息存在队列中的时间超过过期时间则成为死信. 死信交换机DLX 队列中的消息在以下三种情况

Rabbitmq的延时队列的使用

配置: spring: rabbitmq: addresses: 192.168.108.128:5672 connection-timeout: 15000 username: guest password: guest publisher-confirms: true publisher-returns: true 依赖: <!--rabbitmq --> <dependency> <groupId>org.springframework.boot</grou

基于RabbitMQ实现分布式延时任务调度

一.分布式延时任务 传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成.这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行. 然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行).基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性.发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一

分布式 延时任务解决方案

在开发中,往往会遇到一些关于延时任务的需求.例如 生成订单30分钟未支付,则自动取消 生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务.那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别 定时任务有明确的触发时间,延时任务没有 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务 下面,我们以判断订单是否超时为例,进行方案分析 red

springboot-rabbitmq:实现延时队列

延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费.那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网)    系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会    系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个预约的