java延迟队列

大多数用到定时执行的功能都是用任务调度来做的,单身当碰到类似订餐业务/购物等这种业务就不好处理了,比如购物的订单功能,在你的订单管理中

有N个订单,当订单超过十分钟未支付的时候自动释放购物车中的商品,订单失效。这种高频率的延迟任务再用任务调度(定时)实现就得不偿失了。推荐用Java延迟队列来实现,DelayQueue是java.util.concurrent中提供的一个类DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,

其中的对象只能在其到期时才能从队列中中取走。这种队列是有序的,即对头对象的延迟到期时间最长。注意:不能讲null元素放置到这种队列中。

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

//任务线程 实现delayed接口
public class DelayItem<T extends Runnable> implements Delayed {
//到期时间
private final long time;
//任务对象
private final T task;
//原子类
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;

public DelayItem(long timeout, T t) {
    this.time = Systen,nanoTime() + timeout;
    this.task = t;
    this.n = atomic.getAndIncrement();
}
//返回与此对象相关的剩余延迟时间,以给定的时间单位表示
public long getDelay(TimeUnit unit) {
    return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
}

public int compareTo(Delayed other) {
    if(other == this) {
    return 0;
    }
    if(other instanceof DelayItem) {
        DelayItem<?> x = (DelayItem<?>)other;
        long diff = tiem - x.time;
        if(diff < 0) {
            return -1;
        }else if(diff > 0) {
            return 1;
        }else if( n < x.n){
            return -1;
        }else {
            return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -           other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public T getTask(){
        return this.task;
    }
    @Override
    public int hashCode(){
        return task.hashCode();
    }
    @Override
    public boolean equals(Object object){
        if(object instanceof DelayItem){
            return object.hashCode() == hashCode() ? true : false;
    }
    return false;
}
}

//管理延迟任务的类
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
//延迟队列存放有效期对象
public class ItemQueueThread {
private static final Logger logger = Logger.getLogger(this.class);
private ItemQueueThread(){}
//延迟加载(线程安全)
private static class LazyHolder{
    private static ItemQueueThread itemQueueThread = new ItemQueueThread();
}
public static ItemQueueThread getInstance(){
    return LazyHolder.itemQueueThread;
}
//缓存线程池
ExecutorService executor = Executors.newCacheThreadPool();
//线程
private Thread daemonThead;
//初始化线程
public void init() {
    daemonThread = new Thread(() -> {
        try{
            execute();
        }cathc(InterruptedException e){
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    });
    System.out.println("init......start");
    daemonThread.start();
}

private void execute() throws InterrupedException {
    while(true) {
        Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
        System.out.println("线程数...." + map.size());
        System.out.println(System.currentTimeMills());
        System.out.println(item.size());
        System.out.println("线程状态----" + Thread.currentThread().getState());
        try{
            //从延迟队列中取值,如果没有对象过期责队列一直等待
            DelayItem<?> t1 = item.take();
            if(t1 != null){
                Runnable task = t1.getTask();
                    if(task == null){
                        continue;
                }
                executor.execute(task);
            }
        }catch(Exception e) {
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    }
}
//创建空的延迟队列
private DelayQueue<DelayItem<?>> item = new DelayQueue<>();
//往队列中添加任务
public void put(long time, Runnable task, TimeUnit timeUnit){
    //转换成ns
    long nanoTime = TimeUnit.NANOSECONDS.convert(time,timeUnit);
    DelayItem<?> k = new DelayItem(nanoTime,task);
    item.put(k);_:
}
//结束任务
public boolean endTask(DelayItem<Runnable> task){
    return item.remove(task);
}
}

//把需要延迟的功能代码单独抽取出来作为一个类,继承Runnable实现run方法
public class DataDemo implements Runnable {
    int a = -1;
    public DataDemo(int i){
        this.a = i;
}
@Override
public void run(){
    System.out.println("超时,要撤销订单...." + a);
}
}

//test class
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DelayTest{
public static void main(String[] args){
    ItemQueueThread  ith = ItemQueueThread.getInstance();
    ith.init();
    Random r = new Random();
    for(int i = 0; i < 5; i++){
        int a = r.nextInt(20);
        System.out.println("预先知道等待时间:" + a);
        DataDemo dd = new DataDemo(a);//创建一个任务对象
        ith.put(a,dd,TimeUnit.SECONDS);//将任务添加到队列中
    }
}
}
//注意ItemQueueThread的init方法,要在容器初始化的时候就要执行,或在第一次put延迟对象任务之前就要初始化完成,当设定的延迟时间到期时会执行任务对象中的run
}

  

原文地址:https://www.cnblogs.com/codechange/p/8367208.html

时间: 2024-11-11 09:30:27

java延迟队列的相关文章

java延迟队列DelayQueue使用及原理

概述 java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素.没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断.延时队列不能存放空元素. 延时队列实现了Iterator接口,但iterator()遍历顺序不保证是元素的实际存放顺序. 队列元素 DelayQueue<E extends Delayed>的队列元素需要实现Delayed接口,该接口类定义如下:

java DelayedQueue延迟队列

代码如下: package com.example.base.concurrent; import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import com.example.spring.MyLog; public class MyDelayedQueue { public

有赞延迟队列设计

延迟队列,顾名思义它是一种带有延迟功能的消息队列. 那么,是在什么场景下我才需要这样的队列呢? 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存? 如何定期检查处于退款状态的订单是否已经退款成功? 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?等等 为了解决以上问题,最简单直接的办法就是定时去扫表.每个业务都要维护一个自己的扫表逻辑. 当业务越来越多时,我们会发现扫表部分的逻辑会非常类似.我们可以考虑将这部分逻辑从具体的业务逻辑里面

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制. 例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的

Spring Boot(十四)RabbitMQ延迟队列

一.前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单:2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度:3.过1分钟给新注册会员的用户,发送注册邮件等. 实现延迟队列的方式有两种: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能: 使用rabbitmq-delayed-message-exchange插件实现延迟功能: 注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上

rabbitmq延迟队列demo

工程结构: 定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错: <properties> <springframework.version>4.2.7.RELEASE</springframework.version> <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version> <junit.version>4.12</junit.

SpringBoot:RabbitMQ 延迟队列

SpringBoot 是为了简化 Spring 应用的创建.运行.调试.部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程 初探RabbitMQ消息队列中介绍了RabbitMQ的简单用法,顺带提及了下延迟队列的作用.所谓延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 延迟队列 延迟队列能做什么? 订单业务: 在电商/点餐

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

本文口味:鱼香肉丝? ?预计阅读:10分钟 一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读. 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 二.本文大纲

RabbitMQ延迟队列简单示例

简介 延迟队列存储的消息是不希望被消费者立刻拿到的,而是等待特定时间后,消费者才能拿到这个消息进行消费.使用场景比较多,例如订单限时30分钟内支付,否则取消,再如分布式环境中每隔一段时间重复执行某操作. 下面举一个简单的例子,例子大概意思是分别在首次发送消息后的10秒.40秒.100秒后重新读取到消息.为了直观,不使用RabbitMQ其他多余的特性. 准备工作 在Centos7下安装RabbitMQ,版本为3.6.12单机版本(非集群),IP是127.0.0.1,端口是15672,使用web管理