并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

Delayed

一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。

此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

下面例子是订单超时处理的具体代码:

重点是DelayOrderComponent 和OrderMessage

import com.concurrent.delayqueue.component.DelayOrderComponent;
import com.concurrent.delayqueue.model.OrderInfo;
import com.concurrent.delayqueue.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
@RequestMapping("/order")
public class OrderController {
    @Autowired
    private OrderService orderService;

    //创建订单
    @RequestMapping("insert")
    public void insert() {
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setCreateTime(new Date());
        orderInfo.setStatus(0);
        orderService.insert(orderInfo);
    }

    //取消订单
    @RequestMapping("cancel")
    public void cancel(Long orderId) {
        orderService.cancel(orderId);
    }

    //支付订单
    @RequestMapping("paysuccess")
    public void paysuccess(Long orderId) {
        orderService.paysuccess(orderId);
    }

    //查看队列中剩余处理数
    @RequestMapping("queuecount")
    public int queuecount() {
        return DelayOrderComponent.getDelayQueueCount();
    }
}
@Service
public class OrderService {
    @Autowired
    private OrderInfoMapper orderInfoMapper;
    @Autowired
    private DelayOrderComponent delayOrderComponent;

    /**
     * 插入
     * @param orderInfo
     */
    @Transactional
    public void insert(OrderInfo orderInfo){
        orderInfoMapper.insert(orderInfo);
        //加入到延时队列中,用于超时未支付
        boolean flag = delayOrderComponent.addDelayQueue(new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime()));
        if(!flag){
            throw new RuntimeException();
        }
    }

    /**
     * 取消
     */
    @Transactional
    public void cancel(Long orderId){
        orderInfoMapper.updateByStatus(orderId,0,-1);
        delayOrderComponent.removeDelayQueue(orderId);
    }

    /**
     * 用户支付成功
     */
    public void paysuccess(Long orderId){
        orderInfoMapper.updateByStatus(orderId,0,1);
        delayOrderComponent.removeDelayQueue(orderId);
    }

}
import com.concurrent.delayqueue.mapper.OrderInfoMapper;
import com.concurrent.delayqueue.message.OrderMessage;
import com.concurrent.delayqueue.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;

/**
 * 处理订单超时
 */
@Component
@Lazy(false)
public class DelayOrderComponent {
    @Autowired
    private OrderInfoMapper orderInfoMapper;

    private static DelayQueue<OrderMessage> delayQueue = new DelayQueue<OrderMessage>();
    public static int getDelayQueueCount(){
        return delayQueue.size();
    }

    /**
     * 系统启动时,预先加载的数据@PostConstruct
     */
    @PostConstruct
    public void init(){
        /**初始化时加载数据库中需处理超时的订单**/
        System.out.println("获取数据库中需要处理的超时的订单");
        List<OrderInfo> list = orderInfoMapper.selectByStatus(0);
        for(int i=0;i<list.size();i++){
            OrderInfo orderInfo = list.get(i);
            OrderMessage orderMessage = new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime());
            this.addDelayQueue(orderMessage);//加入队列
        }

        /**
         * 启动线程,取延时消息
         */
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        OrderMessage orderMessage = delayQueue.take();
                        //处理超时订单
                        orderInfoMapper.updateByStatus(orderMessage.getOrderId(),0,2);//订单状态改成超时订单
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    /**
     * 加入延时队列
     * 用户下单时,调用此方法
     */
    public boolean addDelayQueue(OrderMessage orderMessage){
        return delayQueue.add(orderMessage);
    }

    /**
     * 从延时队列中删除
     * 用户主动取消,或者支付成功后,调用此方法
     */
    public boolean removeDelayQueue(Long orderId){
        for (Iterator<OrderMessage> iterator = delayQueue.iterator(); iterator.hasNext();) {
            OrderMessage queue = iterator.next();
            if(orderId.equals(queue.getOrderId())){
                return delayQueue.remove(queue);
            }
        }
        return false;
    }

}
public class OrderMessage implements Delayed {
    private final static long DELAY = 15*60*1000L;//默认延迟15分钟

    private Long orderId;//订单号
    private Long expireTime;//过期时间
    public OrderMessage(Long orderId,Long createTime){
        this.orderId = orderId;
        this.expireTime = createTime + DELAY;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this){
            return 0;
        }
        if(other instanceof OrderMessage){
            OrderMessage otherRequest = (OrderMessage)other;
            long otherStartTime = otherRequest.expireTime;
            return (int)(this.expireTime - otherStartTime);
        }
        return 0;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public Long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(Long expireTime) {
        this.expireTime = expireTime;
    }
}
import java.util.Date;

public class OrderInfo {
    private Long orderId;//订单状态
    private Date createTime;//创建时间
    private Integer status;//订单状态:0待支付1已支付-1取消2已超时

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }
}
import com.concurrent.delayqueue.model.OrderInfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
public interface OrderInfoMapper {
    int deleteByPrimaryKey(Long orderId);

    int insert(OrderInfo record);

    int insertSelective(OrderInfo record);

    OrderInfo selectByPrimaryKey(Long orderId);

    int updateByPrimaryKeySelective(OrderInfo record);

    int updateByPrimaryKey(OrderInfo record);

    List<OrderInfo> selectByStatus(int status);
    int updateByStatus(@Param("orderId")Long orderId, @Param("oldstatus")Integer oldstatus,@Param("newstatus")Integer newstatus);
}
OrderInfoMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.concurrent.delayqueue.mapper.OrderInfoMapper" >
  <resultMap id="BaseResultMap" type="com.concurrent.delayqueue.model.OrderInfo" >
    <id column="order_id" property="orderId" jdbcType="BIGINT" />
    <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
    <result column="status" property="status" jdbcType="INTEGER" />
  </resultMap>
  <sql id="Base_Column_List" >
    order_id, create_time, status
  </sql>
  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" >
    select
    <include refid="Base_Column_List" />
    from t_order
    where order_id = #{orderId,jdbcType=BIGINT}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Long" >
    delete from t_order
    where order_id = #{orderId,jdbcType=BIGINT}
  </delete>
  <insert id="insert" parameterType="com.concurrent.delayqueue.model.OrderInfo"
          useGeneratedKeys="true" keyProperty="orderId">
    insert into t_order (order_id, create_time, status
      )
    values (#{orderId,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{status,jdbcType=INTEGER}
      )
  </insert>
  <insert id="insertSelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    insert into t_order
    <trim prefix="(" suffix=")" suffixOverrides="," >
      <if test="orderId != null" >
        order_id,
      </if>
      <if test="createTime != null" >
        create_time,
      </if>
      <if test="status != null" >
        status,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides="," >
      <if test="orderId != null" >
        #{orderId,jdbcType=BIGINT},
      </if>
      <if test="createTime != null" >
        #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="status != null" >
        #{status,jdbcType=INTEGER},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    update t_order
    <set >
      <if test="createTime != null" >
        create_time = #{createTime,jdbcType=TIMESTAMP},
      </if>
      <if test="status != null" >
        status = #{status,jdbcType=INTEGER},
      </if>
    </set>
    where order_id = #{orderId,jdbcType=BIGINT}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
    update t_order
    set create_time = #{createTime,jdbcType=TIMESTAMP},
      status = #{status,jdbcType=INTEGER}
    where order_id = #{orderId,jdbcType=BIGINT}
  </update>

  <select id="selectByStatus" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
    select
    <include refid="Base_Column_List" />
    from t_order
    where status = #{status,jdbcType=INTEGER}
  </select>
  <update id="updateByStatus">
    update t_order
    set status = #{newstatus,jdbcType=INTEGER}
    where order_id = #{orderId,jdbcType=BIGINT}
    and status = #{oldstatus,jdbcType=INTEGER}
  </update>
</mapper>
application.properties

spring.datasource.url = jdbc:mysql://localhost:3306/concurrent?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password =  123456

mybatis.mapper-locations=classpath:/mybatis/*Mapper.xml

源码地址:https://github.com/qjm201000/concurrent_delayqueue.git

数据库sql文件:到源码里面查看readme,按照步骤来就行。

原文地址:https://www.cnblogs.com/qjm201000/p/10142004.html

时间: 2024-10-10 18:08:10

并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue的相关文章

并发编程-concurrent指南-ConcurrentMap

ConcurrentMap 是个接口,你想要使用它的话就得使用它的实现类之一. ConcurrentMap,它是一个接口,是一个能够支持并发访问的java.util.map集合: 在原有java.util.map接口基础上又新提供了4种方法,进一步扩展了原有Map的功能: public interface ConcurrentMap<K, V> extends Map<K, V> { //插入元素 V putIfAbsent(K key, V value); //移除元素 bool

并发编程-concurrent指南-Lock-可重入锁(ReentrantLock)

可重入和不可重入的概念是这样的:当一个线程获得了当前实例的锁,并进入方法A,这个线程在没有释放这把锁的时候,能否再次进入方法A呢? 可重入锁:可以再次进入方法A,就是说在释放锁前此线程可以再次进入方法A(方法A递归). 不可重入锁(自旋锁):不可以再次进入方法A,也就是说获得锁进入方法A是此线程在释放锁钱唯一的一次进入方法A. ,具体区别查看可重入锁和不可重入锁区别. ReentrantLock,意思是"可重入锁".ReentrantLock是唯一实现了Lock接口的类,并且Reent

并发编程 17—— 使用内置条件队列实现简单的有界缓存

并发编程 01—— ConcurrentHashMap 并发编程 02—— 阻塞队列和生产者-消费者模式 并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程 04—— Callable和Future 并发编程 05—— CompletionService : Executor 和 BlockingQueue 并发编程 06—— 任务取消 并发编程 07—— 任务取消 之 中断 并发编程 08—— 任务取消 之 停止基于线程的服务 并发编程 09——

并发编程中的阻塞队列概述

1.简介 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作支持阻塞的插入和移除方法. 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满. 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空. 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程.阻塞队列就是生产者用来存放元素.消费者用来获取元素的容器. 在阻塞队列不可用时,插入.移除这两个附加操作提供4种处理

python并发编程基础之守护进程、队列、锁

并发编程2 1.守护进程 什么是守护进程? 表示进程A守护进程B,当被守护进程B结束后,进程A也就结束. from multiprocessing import Process import time ? def task(): print('妃子的一生') time.sleep(15) print('妃子死了') ? if __name__ == '__main__': fz = Process(target=task) fz.daemon = True #将子进程作为主进程的守护进程.必须在

并发编程【四】锁和队列及生产者消费模型

1.锁multiprocessing-Lock 锁的应用场景:当多个进程需要操作同一个文件/数据的时候: 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题. 为保证数据的安全性,多进程中只有去操作一些进程之间可以共享的数据资源的时候才需要进行加锁: 枷锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务进行修改,即串行的修改,没错速度式慢了,但牺牲了速度却保证了数据的安全: 模拟查票抢票: import json import time from multiprocess

深入理解java:2.3.4. 并发编程concurrent包 之容器ConcurrentLinkedQueue

1.    引言 在并发编程中我们有时候需要使用线程安全的队列. 如果我们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法. 使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现, 而非阻塞的实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的. 2.    ConcurrentLinkedQueue的介绍 Concurre

python并发编程(守护进程,进程锁,进程队列)

进程的其他方法 P = Process(target=f,) P.Pid 查看进程号  查看进程的名字p.name P.is_alive()  返回一个true或者False P.terminate()  给操作系统发送一个结束进程的信号 验证进程之间是空间隔离的 from multiprocessing import Process num = 100 def f1(): global num num = 3 print(num) # 结果 3 if __name__ == '__main__

深入理解java:2.3.1. 并发编程concurrent包 之Atomic原子操作

java中,可能有一些场景,操作非常简单,但是容易存在并发问题,比如i++, 此时,如果依赖锁机制,可能带来性能损耗等问题, 于是,如何更加简单的实现原子性操作,就成为java中需要面对的一个问题. 在backport-util-concurrent没有被引入java1.5并成为JUC之前, 这些原子类和原子操作方法,都是使用synchronized实现的. 不过JUC出现之后,这些原子操作 基于JNI提供了新的实现, 比如AtomicInteger,AtomicLong,AtomicBoole