BlockingQueu 阻塞队列

java.util.concurrent
public interface BlockingQueue<E> extends Queue<E>

简介

当阻塞队列插入数据时:
    如果队列已经满了,线程则会阻塞等待队列中元素被取出后在插入。
    当从阻塞队列中取数据时,如果队列是空的,则线程会阻塞等待队列中有新元素。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列:

    这两个附加的操作是:

        在队列为空时,获取元素的线程会等待队列变为非空。

        当队列满时,存储元素的线程会等待队列可用。

    阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
    阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列(实现的接口是 BlockingQueue)。

LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列(实现的接口是 BlockingQueue)。

PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列(实现的接口是 BlockingQueue)。

DelayQueue: 一个使用优先级队列实现的无界阻塞队列(实现的接口是 BlockingQueue)。

SynchronousQueue: 一个不存储元素的阻塞队列(实现的接口是 BlockingQueue)。

LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列(实现的接口是 BlockingQueue)。

LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列(特例:实现的接口是 BlockingDeque)。

处理方法

方法描述 抛出异常 返回特殊的值 一直阻塞 超时退出
插入数据 add(e) offer(e) put(e) offer(e,time,unit)
获取并移除队列的头 remove() poll() take() poll(time,unit)
获取但不移除队列的头 element() peek() 不可用 不可用
抛出异常:
    是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。
    当队列为空时,从队列里获取元素时会抛出NoSuchElementEx·ception异常 。

返回特殊值:
    插入方法会返回是否成功,成功则返回true。
    移除方法,则是从队列里拿出一个元素,如果没有则返回null

一直阻塞:
    当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。
    当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

超时退出:
    当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

抛出异常与返回特殊值方法的实现是一样的,只不过对失败的操作的处理不一样!
通过 AbstractQueue 的源码可以发现,add(e),remove(),element() 都是分别基于offer(),poll(),peek()实现的
public interface BlockingQueue<E> extends Queue<E> {

    //将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
    boolean add(E e);

    //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
    boolean offer(E e);

    //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
    void put(E e) throws InterruptedException;

    //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
    E take() throws InterruptedException;

    //在给定的时间里,从队列中获取值,时间到了直接调用普通的poll方法,为null则直接返回null。
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //获取队列中剩余的空间。
    int remainingCapacity();

    //从队列中移除指定的值。
    boolean remove(Object o);

    //判断队列中是否拥有该值。
    public boolean contains(Object o);

    //将队列中值,全部移除,并发设置到给定的集合中。
    int drainTo(Collection<? super E> c);

    //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
    int drainTo(Collection<? super E> c, int maxElements);
}

BlockingQueue特点

BlockingQueue不接受 null 元素。

BlockingQueue可以是限定容量的(默认:Integer.MAX_VALUE)。

BlockingQueue实现是线程安全的。

与BlockingQueue一样,BlockingDeque是线程安全的,但不允许 null 元素,并且可以有容量限制。

七个阻塞队列

ArrayBlockingQueue(需指定容量):
    ArrayBlockingQueue是一个用数组实现的有界阻塞队列。
    此队列按照先进先出(FIFO)的原则对元素进行排序。

LinkedBlockingQueue:
    是一个用链表实现的有界阻塞队列。
    此队列的默认和最大长度为Integer.MAX_VALUE。
    此队列按照先进先出的原则对元素进行排序。

PriorityBlockingQueue(不会阻塞生产者):
    是一个支持优先级的无界队列。
    默认情况下元素采取自然顺序排列(每个元素都必须实现 Comparable 接口),也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。

    其iterator()方法中提供的迭代器并不保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
    如果需要有序地进行遍历,则应考虑使用Arrays.sort(priorityBlockingQueue.toArray())。

DelayQueue:
    无界阻塞队列,只有在延迟期满时才能从中提取元素(如果元素没有达到延时时间,就阻塞当前线程)。
    注意 DelayQueue 的所有方法只能操作“到期的元素“,例如,poll()、remove()、size()等方法,都会忽略掉未到期的元素。

    我们可以将DelayQueue运用在以下应用场景:

        缓存系统的设计:
            可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,
            一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

        定时任务调度:
            使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,
            从比如TimerQueue就是使用DelayQueue实现的。

    DelayQueue 的实现是基于 PriorityQueue,是一个优先级队列,是以延时时间的长短进行排序的:
        所以,DelayQueue 需要知道每个元素的延时时间,而这个延时时间是由 Delayed 接口的 getDelay() 方法获取的。
        所以,DelayQueue 的元素必须实现 Delay 接口。

SynchronousQueue:
    一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作(只有一个元素)。
    SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。

LinkedTransferQueue(队列不满时也可以阻塞):
    是一个由链表结构组成的无界阻塞TransferQueue队列 。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。

    transfer方法(阻塞):
        如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),
        transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。
        如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并阻塞到该元素被消费者消费了才返回。

    tryTransfer方法:
        则是用来试探下生产者传入的元素是否能直接传给消费者。
        如果没有消费者等待接收元素,则返回false。
        和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。

        对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法 ,
        则是试图把生产者传入的元素直接传给消费者,
        但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

LinkedBlockingDeque:
    是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。
    有界的阻塞队列,默认长度以及最大长度是 Integer.MAX_VALUE。可在创建时,指定容量。

    LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法。
    另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。
    但是take方法却等同于takeFirst。

ArrayBlockingQueue源码分析

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public boolean remove(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final Object[] items = this.items;
                for (int i = takeIndex, end = putIndex,
                         to = (i < end) ? end : items.length;
                     ; i = 0, to = end) {
                    for (; i < to; i++)
                        if (o.equals(items[i])) {
                            removeAt(i);
                            return true;
                        }
                    if (to == end) break;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return e;
    }
}

原文地址:https://www.cnblogs.com/loveer/p/11518687.html

时间: 2024-10-20 18:53:06

BlockingQueu 阻塞队列的相关文章

caffe数据读取的双阻塞队列说明

caffe的datareader类中 class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); BlockingQueue<T*> free_; BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; 这个就是双阻塞队列,先将free队列填充到最大长度,然后按照如下规则: 1,每当生产者push时,先将full队列pop,如果fu

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

BlockingQueue(阻塞队列)详解

一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由

Java多线程-新特征-阻塞队列ArrayBlockingQueue

阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止.同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止. 有了这样的功能,就为多线程的排队等候的模型实现开辟了便捷通道,非常有用. java.util.concurrent.BlockingQueue继承了java.util.Queue接口,可

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深入理解Java类加载器(ClassLoader) 深入理解Java并发之synchronized实现原理 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic 深入理解Java内存模型(JMM)及volatile关键字 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理 剖析基于并发AQS的共

Java里的阻塞队列

JDK7提供了7个阻塞队列,如下: ArrayBlockingQueue  : 一个数组结构组成的有界阻塞队列. LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列 . PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列 . DelayQueue : 一个使用优先级队列实现的无界阻塞队列 . SynchronousQueue : 一个不存储元素的阻塞队列 . LinkedTransferQueue : 一个由链表结构组成的无界阻塞队列 .

9.并发包非阻塞队列ConcurrentLinkedQueue

jdk1.7.0_79  队列是一种非常常用的数据结构,一进一出,先进先出. 在Java并发包中提供了两种类型的队列,非阻塞队列与阻塞队列,当然它们都是线程安全的,无需担心在多线程并发环境所带来的不可预知的问题.为什么会有非阻塞和阻塞之分呢?这里的非阻塞与阻塞在于有界与否,也就是在初始化时有没有给它一个默认的容量大小,对于阻塞有界队列来讲,如果队列满了的话,则任何线程都会阻塞不能进行入队操作,反之队列为空的话,则任何线程都不能进行出队操作.而对于非阻塞无界队列来讲则不会出现队列满或者队列空的情况

LinkedBlockingQueue(lbq)阻塞队列

最近开发中,经常使用这个类LinkedBlockingQueue,它是BlockingQueue这个子类. 并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列.该类主要提供了两个方法put(),offer()和take(),poll(),前者将一个对象放 到队列尾部,如果队列已经满了,就等待直到有空闲节点:后者从head取一个对象,如果没有对象,就等待直到有可取的对象. 反正都是开发中常用的.记哈