并发编程中的阻塞队列概述

1.简介

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

在阻塞队列不可用时,插入、移除这两个附加操作提供4种处理方式:

4种处理试的说明:

抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如 果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

注意:如果有无界阻塞队列,队列永不会满,put或offer方法永不会被阻塞,使用offer方法永远返回true.

2.阻塞队列分类

主要的几种阻塞队列

1)普通阻塞队列:ArrayBlockingQueue,基于数组的有界阻塞队列 ;LinkedBlockingQueue ,基于单向链表的有界阻塞队列;LinkedBlockingDeque, 基于双向链表的有界双向阻塞队列。

2)优先级阻塞队列:PriorityBlockingQueue, 基于优先级排序无界阻塞队列。

3)延时阻塞队列:DelayQueue,基于优先级队列实现的无界阻塞队列。

4)其他阻塞队列:SynchronousQueu,不存储元素的阻塞队列;LinkedTransferQueue,基于单向链表组成的无界阻塞队列。

阻塞队列实现的基本原理:

阻塞队列主要是利用并发编程中的“等待/通知”模型,使用显式锁ReentrantLock和条件ConditionObject实现的。如下自定义的阻塞队列BoundBlockQueue,它使用一把锁和两个条件来实现阻塞队列。一个把锁lock,可以保护所有的访问,一个条件notEmpty用来通知线程当前队列“不满”,另一个条件notFull用来通知线程当前队列“不空”。上面介绍的7种阻塞队列其实现原理大致与此类似。

package thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundBlockQueue<T> {
    private       int       addIndex    = 0;
    private       int       removeIndex = 0;
    private       int       count       = 0;
    private final Object[]  items;
    private final Lock      lock        = new ReentrantLock();
    private final Condition notEmpty    = lock.newCondition();
    private final Condition notFull     = lock.newCondition();

    public BoundBlockQueue(int size) {
        if (size < 0) throw new IllegalArgumentException("size must large than zero");
        this.items = new Object[size];
    }

    public void add(T o) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) notFull.await();
            items[addIndex++] = o;
            count++;
            if (addIndex == items.length) addIndex = 0;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) notEmpty.await();
            Object o = items[removeIndex];
            items[removeIndex++] = null;
            count--;
            if (removeIndex == items.length) removeIndex = 0;
            notFull.signal();
            return (T) o;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BoundBlockQueue<String> queue = new BoundBlockQueue<>(9);
/*        new Thread(() -> {
            try {
                int i = 0;
                while(i<=5){
                    queue.add(String.valueOf(i));
                    System.out.println("add-digital: " + i);
                    Thread.sleep(1000);
                    i++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();*/
        new Thread(() -> {
            try {
                int i = 1;
                while (i <= 10) {
                    queue.add("0" + i);
                    System.out.println("add-digital: 0" + i);
                    Thread.sleep(500);
                    i++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "addItemThread1").start();
        new Thread(() -> {
            try {
                int i = 0;
                while (i <= 8) {
                    String s = queue.remove();
                    System.out.println("remove-digital:" + s);
                    Thread.sleep(1000);
                    i++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "removeItemThread1").start();
    }
}

BoundBlockQueue

3.阻塞队列详述

1)普通阻塞队列

ArrayBlockingQueue和LinkedBlockingQueue都实现了Queue接口,表示先进先出的队列,头出尾进,而LinkedBlockingDeque实现了Deque接口,它表示一个双端队列,头尾均可进出。

这三个阻塞队列内部都是使用显式锁ReentrantLock和显式条件CoditionObject实现的。

(1)ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列(构造方法必须指定容量) ,创建时指定容量大小,且运行时容量也不会变化。默认情况下不保证线程公平地访问队列。此队列按照先进先出(FIFO)的原则对元素进行排序。队列的容量和公平性选择均在构造方法中设定。

如下便创建了一个容量为50的公平阻塞队列。

ArrayBlockingQueue e = new ArrayBlockingQueue(50,true);

ArrayBlockingQueue的实现比较简单,其内部有一个数组存储元素,有两个索引分别表示头和尾,有一个变量表示当前元素个数,有一个锁保护所有的访问,有“不满‘和”不空“两个条件处理线程协作问题。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -817911632652898426L;
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    //.....省略
}

(2)LinkedBlockingQueue

LinkedBlockingQueue是基于单向链表实现的,在创建是可指定最大长度,也可不指定,默认是无限大。LinkedBlockingQueue不支锁的公平性选择,只支持非公平锁。

LinkedBlockingQueue queue=new LinkedBlockingQueue();LinkedBlockingQueue queue2=new LinkedBlockingQueue(20);

LinkedBlockingQueue的实现与ArrayBlockingQueue有些不同,它是单向链表结构,尾进头出,为提高性能它将锁的粒度细化了,使用了两个锁,一个保护头部、一个保护尾部,每个锁绑定一个条件。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private final int capacity;
        private final AtomicInteger count = new AtomicInteger();
        transient Node<E> head;
        private transient Node<E> last;
        private final ReentrantLock takeLock = new ReentrantLock();//保护头部(出队)的锁
        private final Condition notEmpty = takeLock.newCondition();//"不空"条件
        private final ReentrantLock putLock = new ReentrantLock();//保护尾部(入队)的锁
        private final Condition notFull = putLock.newCondition();//“不满”条件
    //......省略
}

(3)LinkedBlockingDeque

LinkedBlockingDeque,基于双向链表实现,其最大长度可也是可选的,默认无限大。LinkedBlockingDeque不支锁的公平性选择,只支持非公平锁。

LinkedBlockingDeque deque = new LinkedBlockingDeque();LinkedBlockingDeque deque2 = new LinkedBlockingDeque(20);

LinkedBlockingDeque,与ArrayBlockingQueue类似,也是使用一个锁,两个条件,使用锁保护所有操作,使用“不满‘和”不空“两个条件进行线程协作。

public class LinkedBlockingDeque<E> extends AbstractQueue<E>
        implements BlockingDeque<E>, java.io.Serializable {
    transient Node<E> first;
    transient Node<E> last;
    private transient int count;
    private final int capacity;
    final ReentrantLock lock = new ReentrantLock(); //重入锁
    private final Condition notEmpty = lock.newCondition();//"非空"条件
    private final Condition notFull = lock.newCondition();//“非满”条件
    //......省略
}

LinkedBlockingDeque是双向队列,它可以在队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。因其有两个插入/移除的入口,双向阻塞队列常用在Jork/Join框架的“工作窃取”模式中。

2)优先级阻塞队列

ProrityBlockingQueue是一个支持优先级的无界阻塞队列,(内部元素排列不是完全有序)它是按照优先级出队的(不像普通阻塞队列那样先进先出)。其内部使用数组来存储元素(逻辑上是堆,物理上是数组),它是无界的,数组的长度会动态扩展。它要求元素类型实现了Comparable接口或在构造方法中传入一个Comparator比较器,另外它不能保证同优先级元素的顺序。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = 5595510919245408276L;
    private transient Object[] queue; //保存元素的数组
    private transient int size;
    private transient Comparator<? super E> comparator;
    private final ReentrantLock lock; //锁
    private final Condition notEmpty;//“不空”的条件
    private transient volatile int allocationSpinLock;
    private PriorityQueue<E> q;//只有序列化/反序列化会用到,主要目的是与以前版本兼容。
    //......省略
}

ProrityBlockingQueue不支锁的公平性选择,只支持非公平锁。ProrityBlockingQueue的实现,它有一个数组保存元素,它使用一把锁保护所有的操作,(因它是无界的,永不会“满”)只使用一个“不空”的条件进行线程协作。

3)延时阻塞队列

DelayQueue是一个支持延时获取元素的无界阻塞队列。它内部使用一个优先级队列来储存元素,使用一把锁来保护数据,用一个“可获取”的条件表示头部是否有元素可获取,当头部元素的延时未到时,take操作会根据延时计算需要休眠的时间,然后休眠,若此进程中有新的元素入队,且成为头部元素,则部位皮肤休眠的线程会被提前唤醒然后重新检查延时。DelayQueue不支锁的公平性选择,只支持非公平锁。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
    //.....省略
}

它要求元素类型要实现Delay接口,而Delay接口双继承Comparable接口,那么DelayQueue中的每一个元素都是可比较的,额外的getDelay方法返回一个再延迟多长时间的整数(小于等于零就不再延迟)。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

DelayQueue常用于定时任务,因为DelayQueue是按照延时时间出队的。元素只有在过期后才能出队列中拿走,若没过期就要阻塞等待。

4)其他阻塞队列

SynchronousQueue和LinkedTransferQueue是两个特殊的阻塞队列。

(1)SynchronousQueue

SynchronousQueue,与其他队列不同,它没有存储元素的空间,它的每入队操作必须等待另一个线程的出队操作,反之亦然。

它支持公平访问队列(通过构造方法的参数指定),默认情况下线程采用非公平性策略访问队列

SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

(2)LinkedTransferQueue

LinkedTransferQueue实现了TransferQueue接口,TransferQueue扩展了BlockingQueue接口,增加了一此其他功能.生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于消息传递类型的应用。

public interface TransferQueue<E> extends BlockingQueue<E> {
    //如果有消费者在等待,直接转给消费者,返回true,否则返回false,不入队
    boolean tryTransfer(E e);
    //如果有消费者在等待,直接转给消费者,否则入队,阻塞等待直到被消费者接口后再返回
    void transfer(E e) throws InterruptedException;
    //如果有消费者在等待,直接转给消费者,返回true,否则入队,阻塞等待限定时间,若最后被消费者接收,返回true
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    //是否有消费者在等待
    boolean hasWaitingConsumer();
    //在等待的消费者个数
    int getWaitingConsumerCount();
}

LinkedTransferQueue,与其他阻塞队列相比,主要是多了transfer系列方法, 且这几个方法核心逻辑都是委托给xfer方法实现的。

public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }
   public boolean tryTransfer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
    public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }

transfer(E)方法: 如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回.

tryTransfer(E)方法 : 与transfer类似,只这里只是“尝试”,若没有消费者在等待,它不会阻塞,会直接返回。

transfer(E,long,TimeUnit)方法: 与transfer类似,只是这里是个限时版本。

原文地址:https://www.cnblogs.com/gocode/p/introduce-blockingqueue.html

时间: 2024-10-06 14:26:16

并发编程中的阻塞队列概述的相关文章

161212、并发编程中的关于队列

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者消费者模式 生产者

并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对象的延迟到期时间最长.注意:不能将null元素放置到这种队列中. Delayed 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象. 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序. 下面例子是订单超时处理的具体代码: 重点是DelayOrderCompo

多线程编程学习六(Java 中的阻塞队列).

介绍 阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满:当队列空时,队列会阻塞获得元素的线程,直到队列变非空.阻塞队列就是生产者用来存放元素.消费者用来获取元素的容器. 当线程 插入/获取 动作由于队列 满/空 阻塞后,队列也提供了一些机制去处理,或抛出异常,或返回特殊值,或者线程一直等待... 方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put(e) offer(e, timeout, unit

聊聊并发(七)Java中的阻塞队列

什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue

理解并发编程中的重要概念:指令重排序和指令乱序执行

看过了很多介绍指令重排序的文章,可惜由于自己硬件和计算机理论知识缺乏,很难理解深层次的奥秘和实现原理.不过也有很多帖子,讲的浅显易懂,使用的例子很形象.大牛就是能用简单的解释和通俗的比喻,给我们讲明白很高深的东西.这里做个摘抄和总结,和大家分享下,希望大家能够对指令重排序有个形象的认识,不至于在并发编程中犯一些简单的错误.如果理解有错误,希望看到的大神指正. 从源码变成可以被机器(或虚拟机)识别的程序,至少要经过编译期和运行期.重排序分为两类:编译期重排序和运行期重排序(乱序执行),分别对应编译

【Java并发编程】6、volatile关键字解析&amp;内存模型&amp;并发编程中三概念

转自:http://www.cnblogs.com/dolphin0520/p/3920373.html volatile这个关键字可能很多朋友都听说过,或许也都用过.在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果.在Java 5之后,volatile关键字才得以重获生机. volatile关键字虽然从字面上理解起来比较简单,但是要用好不是一件容易的事情.由于volatile关键字是与Java的内存模型有关的,因此在讲述volatile关键之前,我们先来

java并发编程中常用的工具类 Executor

/***************************************************  * TODO: description .  * @author: gao_chun  * @since:  2015-4-17  * @version: 1.0.0  * @remark: 转载请注明出处  **************************************************/ java.util.concurrent.Executor 使用 Execut

Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过start方法启动线程--->线程变为可运行可执行状态,然后通过数据产生共享,线程产生互斥---->线程状态变为阻塞状态---->阻塞状态想打开的话可以调用notify方法. 这里Java5中提供了封装好的类,可以直接调用然后构造阻塞状态,以保证数据的原子性. 2.如何实现? 主要是实现Blo

并发编程中.net与java的一些对比

Java在并发编程中进行使用java.util.concurrent.atomic来处理一些轻量级变量 如AtomicInteger AtomicBoolean等 .Net中则使用Interlocked来实现类似功能 Java中使用object的wait和notify方法来实现线程间的写作 .Net中可以使用Semaphore(信号量).mutex.和EventWaitHandle来实现 但是Semaphore类的构造函数需要指定初始入口数和最大入口数