对于DelayQueue的理解

今天看公司代码,发现里面使用了 DelayQueue,学习以后记录下来:

概念:DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。

我理解为:延迟队列用于需要延迟处理的场景:比如延迟会话关闭,如果某一会话1分钟后需要关闭,则可以使用延迟队列,再比如:订单超时取消

例子场景:订单超时

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>我们看到队列泛型需要继承 Delayed 接口。创建一个实体类,实现此接口

创建实体类:  OrderInfo

 1 package wyp.delayqueue;
 2
 3 import lombok.Data;
 4
 5 import java.io.Serializable;
 6 import java.text.ParseException;
 7 import java.text.SimpleDateFormat;
 8 import java.util.concurrent.Delayed;
 9 import java.util.concurrent.TimeUnit;
10
11 /**
12  * @author : miles wang
13  * @date : 2019/7/2  3:39 PM
14  * DelayQueue<T>
15  *      延迟队列的泛型必须实现 Delayed接口
16  */
17 @Data
18 public class OrderInfo implements Serializable , Delayed {
19     private static final long serialVersionUID = 1L;
20     private String orderNo;// 订单号
21     private String status;// 订单状态
22     private String expTime;// 订单过期时间
23     private String createTime;//订单创建时间
24
25     /**
26      * 用于延时队列内部比较排序:当前订单的过期时间 与 队列中对象的过期时间 比较
27      * 排序方法:
28      * 我理解为:从延迟队列中取出过期元素的顺序,就是由此排序方法控制
29      */
30     @Override
31     public int compareTo(Delayed o) {
32         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
33         long nowThreadtime = 0;
34         long queueThreadtime = 0;
35         try {
36             nowThreadtime = formatter.parse(this.expTime).getTime();
37             queueThreadtime = formatter.parse(((OrderInfo)o).expTime).getTime();
38         } catch (ParseException e) {
39             e.printStackTrace();
40         }
41         return Long.valueOf(nowThreadtime).compareTo(Long.valueOf(queueThreadtime));
42     }
43
44
45     /**
46      * 时间单位:秒
47      * 延迟关闭时间 = 过期时间 - 当前时间
48      * 跟你官方提供的DOC文档,我理解为:
49      * 当此方法的返回值为负数的时候,表示次方法可以从延迟队列中移出
50      * 然后此时就可以接受一系列的消费者处理
51      */
52     @Override
53     public long getDelay(TimeUnit unit) {
54         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
55         long time = 0;
56         try {
57             time = formatter.parse(this.expTime).getTime();
58         } catch (ParseException e) {
59             e.printStackTrace();
60         }
61         return time - System.currentTimeMillis();
62     }
63
64
65 }

创建延迟队列监听类

package wyp.delayqueue;

import java.util.Objects;
import java.util.concurrent.DelayQueue;

/**
 * @author : miles wang
 * @date : 2019/7/2  3:42 PM
 *
 * 使用延时队列DelayQueue实现订单超时关闭
 * 后台守护线程不断的执行检测工作
 * 双检查模式实现单例模式
 * 后面我把线程池改为了单线程,所有是否为单利模式不重要
 */
public class OrderOverTimeClose  {

    private volatile static OrderOverTimeClose oderOverTimeClose = null;

    private OrderOverTimeClose() {

    }

    /**
     * 守护线程
     */
    private Thread mainThread;

    /**
     * 创建空延时队列
     */
    private  DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();

    /**
     * 单例模式,双检查锁模式,在并发环境下对象只被初始化一次
     */
    public static OrderOverTimeClose getInstance(){
        if(oderOverTimeClose == null ){
            synchronized(OrderOverTimeClose.class){
                oderOverTimeClose =  new OrderOverTimeClose();
            }
        }
        return oderOverTimeClose;
    }

    /**
     * 启动方法
     */
    public void init(){
        mainThread =  new Thread(()->execute());
        mainThread.setDaemon(true);
        mainThread.setName("守护线程-->");
        mainThread.start();
    }

    private void execute() {
        while (true) {
            try {
                if(queue.size() > 0){
                    //从队列里获取超时的订单
                    OrderInfo orderInfo = queue.take();
                    // 检查订单状态,是否已经成功,成功则将订单从队列中删除。
                    if (Objects.equals(orderInfo.getStatus(), "成功")) {
                        System.out.println("线程:"+Thread.currentThread().getName()+",订单号:"
                                + orderInfo.getOrderNo() + ",订单状态:"
                                + orderInfo.getStatus() + ",订单创建时间:"
                                + orderInfo.getCreateTime()
                                + ",订单超时时间:" + orderInfo.getExpTime()+",当前时间:"+OrderPay.getTime(System.currentTimeMillis()));
                        Thread.sleep(2000);
                    } else {
                        System.out.println("线程:"+Thread.currentThread().getName()+",订单号:"
                                + orderInfo.getOrderNo() + ",变更订单状态为:超时关闭"
                                + ",订单创建时间:"
                                + orderInfo.getCreateTime()
                                + ",订单超时时间:" + orderInfo.getExpTime()+",当前时间:"+OrderPay.getTime(System.currentTimeMillis()));
                        Thread.sleep(2000);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 插入订单到超时队列中
     */
    public void orderPutQueue(OrderInfo orderInfo, String createTime,
                              String overTime) {
        System.out.println("订单号:" + orderInfo.getOrderNo() + ",订单创建时间:"
                + createTime + ",订单过期时间:" + overTime);
      //  queue.add(orderInfo);
          queue.put(orderInfo);
    }
}

测试 主要是向延迟队列中插入元素

package wyp.delayqueue;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

/**
 * @author : miles wang
 * @date : 2019/7/2  3:46 PM
 */
public class OrderPay {
    static String[] str = new String[]{"成功","支付中","订单初始化"};

    public static String getTime(long time){
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date(time);
        String currentTime = formatter.format(date);
        return currentTime;
    }

    public static void main(String[] args) throws InterruptedException {
        OrderOverTimeClose.getInstance().init();
        for (int i = 0; i < 20; i++) {
            // 创建初始订单
            long createTime = System.currentTimeMillis();
            String currentTime = getTime(createTime);
            String overTime = getTime(createTime + 10000);// 十秒后超时
            String orderNo = String.valueOf(new Random().nextLong());
            OrderInfo order = new OrderInfo();
            order.setOrderNo(orderNo);
            order.setExpTime(overTime);
            int random_index = (int) (Math.random()*str.length);
            order.setStatus(str[random_index]);// 随机分配
            order.setCreateTime(currentTime);
            OrderOverTimeClose.getInstance().orderPutQueue(order, currentTime, overTime);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

以上仅仅是个人学习测试理解和记录,不一定对。

原文地址:https://www.cnblogs.com/wangpipi/p/11122416.html

时间: 2024-08-30 16:34:32

对于DelayQueue的理解的相关文章

java.util.concurrent.DelayQueue 源码学习

jdk1.8 DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间).DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,其内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间.DelayQueue不允许包含null元素. Leader/Followers模式,借用其他博客的话来解释下:

Java系列-集合框架理解

Java平台提供了一个全新的集合框架."集合框架"主要由一组用来操作对象的接口组成.不同接口描述一组不同数据类型. 日常比较常用的的集合框架关系如上图所示: (1).集合接口:短虚线表示 ,其中5个关键接口Iterator,Collection,Map,List,Set ,表示不同集合类型,是集合框架的基础. (2).抽象类:长虚线表示AbstractCollection,AbstractList ,AbstractSet ,AbstractMap ,AbstractSequentia

【Java并发编程】19、DelayQueue源码分析

DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间).DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间.DelayQueue不允许包含null元素. 领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听.

java中DelayQueue的一个使用陷阱分析

最近工作中有接触到DelayQueue,网上搜索资料的时候发现一篇文章谈到DelayQueue的坑.点击打开链接 文中已经总结了遇到坑的地方,还有解决方案.不过我第一眼看一下没弄明白为什么,所以翻了翻源码深究了一下,下面把这个坑的原因以及原理分析一下. 首先是DelayQueue的take()方法: 1 public E take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lock

深入理解java线程池—ThreadPoolExecutor

几句闲扯:首先,我想说java的线程池真的是很绕,以前一直都感觉新建几个线程一直不退出到底是怎么实现的,也就有了后来学习ThreadPoolExecutor源码.学习源码的过程中,最恶心的其实就是几种状态的转换了,这也是ThreadPoolExecutor的核心.花了将近小一周才大致的弄明白ThreadPoolExecutor的机制,遂记录下来. 线程池有多重要##### 线程是一个程序员一定会涉及到的一个概念,但是线程的创建和切换都是代价比较大的.所以,我们有没有一个好的方案能做到线程的复用呢

深入理解Kafka必知必会(3)

Kafka中的事务是怎么实现的? Kafka中的事务可以使应用程序将消费消息.生产消息.提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区. 生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应. 每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic,

Python——深入理解urllib、urllib2及requests(requests不建议使用?)

深入理解urllib.urllib2及requests            python Python 是一种面向对象.解释型计算机程序设计语言,由Guido van Rossum于1989年底发明,第一个公开发行版发行于1991年,Python 源代码同样遵循 GPL(GNU General Public License)协议[1] .Python语法简洁而清晰,具有丰富和强大的类库. urllib and urllib2 区别 urllib和urllib2模块都做与请求URL相关的操作,但

关于SVM数学细节逻辑的个人理解(三) :SMO算法理解

第三部分:SMO算法的个人理解 接下来的这部分我觉得是最难理解的?而且计算也是最难得,就是SMO算法. SMO算法就是帮助我们求解: s.t.   这个优化问题的. 虽然这个优化问题只剩下了α这一个变量,但是别忘了α是一个向量,有m个αi等着我们去优化,所以还是很麻烦,所以大神提出了SMO算法来解决这个优化问题. 关于SMO最好的资料还是论文<Sequential Minimal Optimization A Fast Algorithm for Training Support Vector

2.2 logistic回归损失函数(非常重要,深入理解)

上一节当中,为了能够训练logistic回归模型的参数w和b,需要定义一个成本函数 使用logistic回归训练的成本函数 为了让模型通过学习来调整参数,要给出一个含有m和训练样本的训练集 很自然的,希望通过训练集找到参数w和b,来得到自己得输出 对训练集当中的值进行预测,将他写成y^(I)我们希望他会接近于训练集当中的y^(i)的数值 现在来看一下损失函数或者叫做误差函数 他们可以用来衡量算法的运行情况 可以定义损失函数为y^和y的差,或者他们差的平方的一半,结果表明你可能这样做,但是实际当中