深入浅出 Java Concurrency (22): 并发容器 part 7 可阻塞的BlockingQueue (2)[转]

上一节中详细分析了LinkedBlockingQueue 的实现原理。实现一个可扩展的队列通常有两种方式:一种方式就像LinkedBlockingQueue一样使用链表,也就是每一个元素带有下一个元素的引用,这样的队列原生就是可扩展的;另外一种就是通过数组实现,一旦队列的大小达到数组的容量的时候就将数组扩充一倍(或者一定的系数倍),从而达到扩容的目的。常见的ArrayList就属于第二种。前面章节介绍过的HashMap确是综合使用了这两种方式。

对于一个Queue而言,同样可以使用数组实现。使用数组的好处在于各个元素之间原生就是通过数组的索引关联起来的,一次元素之间就是有序的,在通过索引操作数组就方便多了。当然也有它不利的一面,扩容起来比较麻烦,同时删除一个元素也比较低效。

ArrayBlockingQueue 就是Queue的一种数组实现。

ArrayBlockingQueue 原理

在没有介绍ArrayBlockingQueue原理之前可以想象下,一个数组如何实现Queue的FIFO特性。首先,数组是固定大小的,这个是毫无疑问的,那么初始化就是所有元素都为null。假设数组一段为头,另一端为尾。那么头和尾之间的元素就是FIFO队列。

    1. 入队列就将尾索引往右移动一个,新元素加入尾索引的位置;
    2. 出队列就将头索引往尾索引方向移动一个,同时将旧头索引元素设为null,返回旧头索引的元素。
    3. 一旦数组已满,那么就不允许添加新元素(除非扩充容量)
    4. 如果尾索引移到了数组的最后(最大索引处),那么就从索引0开始,形成一个“闭合”的数组。
    5. 由于头索引和尾索引之间的元素都不能为空(因为为空不知道take出来的元素为空还是队列为空),所以删除一个头索引和尾索引之间的元素的话,需要移动删除索引前面或者后面的所有元素,以便填充删除索引的位置。
    6. 由于是阻塞队列,那么显然需要一个锁,另外由于只是一份数据(一个数组),所以只能有一个锁,也就是同时只能有一个线程操作队列。

有了上述几点分析,设计一个可阻塞的数组队列就比较容易了。

上图描述的ArrayBlockingQueue的数据结构。首先有一个数组E[],用来存储所有的元素。由于ArrayBlockingQueue最终设置为一个不可扩展大小的Queue,所以这里items就是初始化就固定大小的数组(final类型);另外有两个索引,头索引takeIndex,尾索引putIndex;一个队列的大小count;要支持阻塞就必须需要一个锁lock和两个条件(非空、非满),这三个元素都是不可变更类型的(final)。

由于只有一把锁,所以任何时刻对队列的操作都只有一个线程,这意味着对索引和大小的操作都是线程安全的,所以可以看到这个takeIndex/putIndex/count就不需要原子操作和volatile语义了。

清单1 描述的是一个可阻塞的添加元素过程。这与前面介绍的消费者、生产者模型相同。如果队列已经满了就挂起等待,否则就插入元素,同时唤醒一个队列已空的线程。对比清单2 可以看到是完全相反的两个过程。这在前面几种实现生产者-消费者模型的时候都介绍过了。

清单1 可阻塞的添加元素

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

清单2 可阻塞的移除元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

需要注意到的是,尽管每次加入、移除一个元素使用的都是signal()通知,而不是signalAll()通知。我们参考上一节中notify替换notifyAll的原则:每一个await醒来的动作相同,每次最多唤醒一个线程来操作。显然这里符合这两种条件,因此使用signal要比使用signalAll要高效,并且是可靠的。

上图描述了take()/put()的索引位置示意图。

一开始takeIndex/putIndex都在E/0位置,然后每加入一个元素offer/put,putIndex都增加1,也就是往后边移动一位;每移除一个元素poll/take,takeIndex都增加1,也是往后边移动一位,显然takeIndex总是在putIndex的“后边”,因为当队列中没有元素的时候takeIndex和putIndex相等,同时当前位置也没有元素,takeIndex也就是无法再往右边移动了;一旦putIndex/takeIndex移动到了最后面,也就是size-1的位置(这里size是指数组的长度),那么就移动到0,继续循环。循环的前提是数组中元素的个数小于数组的长度。整个过程就是这样的。可见putIndex同时指向头元素的下一个位置(如果队列已经满了,那么就是尾元素位置,否则就是一个元素为null的位置)。

比较复杂的操作时删除任意一个元素。清单3 描述的是删除任意一个元素的过程。显然删除任何一个元素需要遍历整个数组,也就是它的复杂度是O(n),这与根据索引从ArrayList中查找一个元素的复杂度O(1)相比开销要大得多。参考声明的结构图,一旦删除的是takeIndex位置的元素,那么只需要将takeIndex往“右边”移动一位即可;如果删除的是takeIndex和putIndex之间的元素怎么办?这时候就从删除的位置i开始,将i后面的所有元素位置都往“左”移动一位,直到putIndex为止。最终的结果是删除位置的所有元素都“后退”了一个位置,同时putIndex也后退了一个位置。

清单3 删除任意一个元素

public boolean remove(Object o) {
    if (o == null) return false;
    final E[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = takeIndex;
        int k = 0;
        for (;;) {
            if (k++ >= count)
                return false;
            if (o.equals(items[i])) {
                removeAt(i);
                return true;
            }
            i = inc(i);
        }

} finally {
        lock.unlock();
    }
}
void removeAt(int i) {
    final E[] items = this.items;
    // if removing front item, just advance
    if (i == takeIndex) {
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else {
        // slide over all others up through putIndex.
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
    --count;
    notFull.signal();
}

对于其他的操作,由于都是带着Lock的操作,所以都比较简单就不再展开了。

下一篇中将介绍另外两个BlockingQueue, PriorityBlockingQueue和SynchronousQueue 然后对这些常见的Queue进行一个小范围的对比。

时间: 2024-10-13 16:14:09

深入浅出 Java Concurrency (22): 并发容器 part 7 可阻塞的BlockingQueue (2)[转]的相关文章

深入浅出 Java Concurrency (21): 并发容器 part 6 可阻塞的BlockingQueue (1)[转]

在<并发容器 part 4 并发队列与Queue简介>节中的类图中可以看到,对于Queue来说,BlockingQueue是主要的线程安全版本.这是一个可阻塞的版本,也就是允许添加/删除元素被阻塞,直到成功为止. BlockingQueue相对于Queue而言增加了两个操作:put/take.下面是一张整理的表格. 看似简单的API,非常有用.这在控制队列的并发上非常有好处.既然加入队列和移除队列能够被阻塞,这在实现生产者-消费者模型上就简单多了. 清单1 是生产者-消费者模型的一个例子.这个

深入浅出 Java Concurrency (23): 并发容器 part 8 可阻塞的BlockingQueue (3)[转]

在Set中有一个排序的集合SortedSet,用来保存按照自然顺序排列的对象.Queue中同样引入了一个支持排序的FIFO模型. 并发队列与Queue简介 中介绍了,PriorityQueue和PriorityBlockingQueue就是支持排序的Queue.显然一个支持阻塞的排序Queue要比一个非线程安全的Queue实现起来要复杂的多,因此下面只介绍PriorityBlockingQueue,至于PriorityQueue只需要去掉Blocking功能就基本相同了. 排序的Blocking

深入浅出 Java Concurrency (17): 并发容器 part 2 ConcurrentMap (2)

本来想比较全面和深入的谈谈ConcurrentHashMap的,发现网上有很多对HashMap和ConcurrentHashMap分析的文章,因此本小节尽可能的分析其中的细节,少一点理论的东西,多谈谈内部设计的原理和思想. 要谈ConcurrentHashMap的构造,就不得不谈HashMap的构造,因此先从HashMap开始简单介绍. HashMap原理 我们从头开始设想.要将对象存放在一起,如何设计这个容器.目前只有两条路可以走,一种是采用分格技术,每一个对象存放于一个格子中,这样通过对格子

深入浅出 Java Concurrency (16): 并发容器 part 1 ConcurrentMap (1)[转]

从这一节开始正式进入并发容器的部分,来看看JDK 6带来了哪些并发容器. 在JDK 1.4以下只有Vector和Hashtable是线程安全的集合(也称并发容器,Collections.synchronized*系列也可以看作是线程安全的实现).从JDK 5开始增加了线程安全的Map接口ConcurrentMap和线程安全的队列BlockingQueue(尽管Queue也是同时期引入的新的集合,但是规范并没有规定一定是线程安全的,事实上一些实现也不是线程安全的,比如PriorityQueue.A

深入浅出 Java Concurrency (27): 并发容器 part 12 线程安全的List/Set[转]

本小节是<并发容器>的最后一部分,这一个小节描述的是针对List/Set接口的一个线程版本. 在<并发队列与Queue简介>中介绍了并发容器的一个概括,主要描述的是Queue的实现.其中特别提到一点LinkedList是List/Queue的实现,但是LinkedList确实非线程安全的.不管BlockingQueue还是ConcurrentMap的实现,我们发现都是针对链表的实现,当然尽可能的使用CAS或者Lock的特性,同时都有通过锁部分容器来提供并发的特性.而对于List或者

深入浅出 Java Concurrency (17): 并发容器 part 2 ConcurrentMap (2)[转]

本来想比较全面和深入的谈谈ConcurrentHashMap的,发现网上有很多对HashMap和ConcurrentHashMap分析的文章,因此本小节尽可能的分析其中的细节,少一点理论的东西,多谈谈内部设计的原理和思想. 要谈ConcurrentHashMap的构造,就不得不谈HashMap的构造,因此先从HashMap开始简单介绍. HashMap原理 我们从头开始设想.要将对象存放在一起,如何设计这个容器.目前只有两条路可以走,一种是采用分格技术,每一个对象存放于一个格子中,这样通过对格子

深入浅出 Java Concurrency (25): 并发容器 part 10 双向并发阻塞队列 BlockingDeque[转]

这个小节介绍Queue的最后一个工具,也是最强大的一个工具.从名称上就可以看到此工具的特点:双向并发阻塞队列.所谓双向是指可以从队列的头和尾同时操作,并发只是线程安全的实现,阻塞允许在入队出队不满足条件时挂起线程,这里说的队列是指支持FIFO/FILO实现的链表. 首先看下LinkedBlockingDeque的数据结构.通常情况下从数据结构上就能看出这种实现的优缺点,这样就知道如何更好的使用工具了. 从数据结构和功能需求上可以得到以下结论: 要想支持阻塞功能,队列的容量一定是固定的,否则无法在

深入浅出 Java Concurrency (18): 并发容器 part 3 ConcurrentMap (3)[转]

在上一篇中介绍了HashMap的原理,这一节是ConcurrentMap的最后一节,所以会完整的介绍ConcurrentHashMap的实现. ConcurrentHashMap原理 在读写锁章节部分介绍过一种是用读写锁实现Map的方法.此种方法看起来可以实现Map响应的功能,而且吞吐量也应该不错.但是通过前面对读写锁原理的分析后知道,读写锁的适合场景是读操作>>写操作,也就是读操作应该占据大部分操作,另外读写锁存在一个很严重的问题是读写操作不能同时发生.要想解决读写同时进行问题(至少不同元素

深入浅出 Java Concurrency (26): 并发容器 part 11 Exchanger[转]

可以在对中对元素进行配对和交换的线程的同步点.每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象.Exchanger 可能被视为 SynchronousQueue 的双向形式. 换句话说Exchanger提供的是一个交换服务,允许原子性的交换两个(多个)对象,但同时只有一对才会成功.先看一个简单的实例模型. 在上面的模型中,我们假定一个空的栈(Stack),栈顶(Top)当然是没有元素的.同时我们假定一个数据结构Node,包含一个要交换的元