灵感来袭,基于Redis的分布式延迟队列

延迟队列

延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。

Redisson Delayed Queue

如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单。

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

Java DelayQueue

DelayQueue它本质上是一个队列,而这个队列里也只有存放Delayed的子类才有意义。

延迟队列demo

public class DelayTask implements Delayed {
    private long startDate;
    public DelayTask(Long delayMillions) {
        this.startDate = System.currentTimeMillis() + delayMillions;
    }

    @Override
    public int compareTo(Delayed o) {
        Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return this.startDate - System.currentTimeMillis();
    }

    public static void main(String[] args) throws Exception {
        BlockingQueue<DelayTask> queue = new DelayQueue<>();
        DelayTask delayTask = new DelayTask(1000 * 5L);
        queue.put(delayTask);
        while (queue.size()>0){
            queue.take();
        }
    }
}

延迟队列消费原理

源码中出现了三次await字眼:

  • 第一次是当队列为空时,等待;
  • 第二次等待是因为,发现有任务,没有到执行时间,并且有准备执行的线程(leader),那不好意思,还得接续等待直到下一个可执行的任务。
  • 第三次是真正延时的地方了,available.awaitNanos(delay),此时也没有别的线程要执行,也就是我将要执行,等待剩下的延迟时间即可。

延迟队列生产原理

为保证消费者正常消费,如果优先队列头元素和当前放入元素相等,则说明当前元素消费的优先级高,重置准备消费的线程(leader)为null,唤醒消费者线程重新执行take方法逻辑。

手写一个Redis延迟队列

Redis延迟队列设计

延迟消息体设计

延迟消息体Message实现了Delayed接口,这样Java DelayQueue就知道什么时候取出消息体。

Redis延迟队列实现

RedisDelayQueue构造函数依赖redis操作缓存服务对象目标队列名称(redis key)。

offer方法传入member(具体消息),delay(延迟时间),timeUnit(时间单位),然后封装成延迟消息体Message对象,放入Java DelayQueue中。

run方法是一个循环体,不断的从Java DelayQueue对象中获取消息体,然后放入redis对应的目标队列里。

延迟队列测试demo

控制台打印效果

思考

这种方案实现比较简单,使用的时候一定要谨慎,应用于延迟小,消息量不大的场景是没问题的,毕竟Java DelayQueue是占用内存的。另外也可以考虑利用Redis的sorted set 结构实现延迟队列,使用TimeStamp作为score,比如你的任务是要延迟5分钟,那么就在当前时间上加5分钟作为 score ,轮询任务每秒只轮询 score 大于当前时间的 key即可,如果任务支持有误差,那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询。

原文地址:https://www.cnblogs.com/hujunzheng/p/12587572.html

时间: 2025-01-05 06:28:07

灵感来袭,基于Redis的分布式延迟队列的相关文章

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

[转载] 基于Redis实现分布式消息队列

转载自http://www.linuxidc.com/Linux/2015-05/117661.htm 1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.

基于Redis实现分布式消息队列(3)

1.Redis是什么鬼? Redis是一个简单的,高效的,分布式的,基于内存的缓存工具. 假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务. 简单,是Redis突出的特色. 简单可以保证核心功能的稳定和优异. 2.性能 性能方面:Redis是足够高效的. 和Memecached对比,在数据量较小大情况下,Redis性能更优秀. 数据量大到一定程度的时候,Memecached性能稍好. 简单结论:但总体上讲Redis性能已经足够好. // Ref: Redis性能测试

基于Redis实现分布式消息队列(2)

1.消息队列需提供哪些功能? 在功能设计上,我崇尚奥卡姆剃刀法则. 对于消息队列,只需要两个方法: 生产 和 消费. 具体的业务场景是任务队列,代码设计如下: public abstract class TaskQueue{ private final String name ; public String getName(){return this.name;} public abstract void addTask(Serializable taskId); public abstract

基于Redis实现分布式消息队列(4)

1.访问Redis的工具类 public class RedisManager { private static Pool<Jedis> pool; protected final static Logger logger = Logger.getLogger(RedisManager.class); static{ try { init(); } catch (Exception e) { e.printStackTrace(); } } public static void init()

基于Redis的分布式锁到底安全吗(上)?

网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现算法也看似合乎逻辑,但当我们着手去实现它们的时候,却发现如果你越是仔细推敲,疑虑也就越来越多. 实际上,大概在一年以前,关于Redis分布式锁的安全性问题,在分布式系统专家Martin Kleppmann和Redis的作者antirez之间就发生过一场争论.由于对这个问题一直以来比较关注,所以我前些日子仔细阅读了与这场争

转载:基于Redis实现分布式锁

转载:基于Redis实现分布式锁  ,出处: http://blog.csdn.net/ugg/article/details/41894947 背景在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等.大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系.其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制. Redis命令介绍使用Redis实现分

【redis】基于redis实现分布式并发锁

基于redis实现分布式并发锁(注解实现) 说明 前提, 应用服务是分布式或多服务, 而这些"多"有共同的"redis"; GitHub: https://github.com/vergilyn/SpringBootDemo 代码结构: 一.分布式并发锁的几种可行方案 (具体实现思路参考: 分布式锁的实现.如何用消息系统避免分布式事务?) 1.基于数据库 可以用数据库的行锁for update, 或专门新建一张锁控制表来实现. 过于依赖数据库, 且健壮性也不是特别好