前言:
ArrayBlockingQueue类是一个阻塞队列,重要用于多线程操作的条件。
一,官方解释
一个建立在数组之上被BlockingQueue绑定的阻塞队列。这个队列元素顺序是先进先出。队列的头部是在队列中待的时间最长的元素。队列的尾部是再队列中待的时间最短的元素。新的元素会被插入到队列尾部,并且队列从队列头部获取元素。
这是一个典型的绑定缓冲,在这个缓冲区中,有一个固定大小的数组持有生产者插入的数据,并且消费者会提取这些数据。一旦这个类被创建,那么这个数组的容量将不能再被改变。尝试使用put操作给一个满队列插入元素将导致这个操作被阻塞;尝试从空队列中取元素也会被阻塞。
这个类推荐了一个可选的公平策略来排序等待的生产者和消费者线程。默认的,这个顺序是不确定的。但是队列会使用公平的设置true来使线程按照先进先出顺序访问。通常公平性会减少吞吐量但是却减少了可变性以及避免了线程饥饿。
这个类和它的迭代器实现了所有可选的Collection按Iterator接口的方法。
?二,源码分析
先来看看ArrayBlockingQueue类的特殊的一个构造器代码:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
这个构造器中得参数fair大家注意到没有,正好是传给ReentrantLock的参数,而这个参数正好就是ReentrantLock决定是否为公平或者非公平队列的参数,ReentrantLock参考我的这篇关于java中ReentrantLock类的源码分析以及总结与例子。
再往下看到,这两个notEmpty和notFull参数实际上是Condition,而Condition可以把它看做一个阻塞信号,Condition的子类ConditionObject(是AbstractQueuedSynchronizer的内部类)拥有两个方法signal和signalAll方法,前一个方法是唤醒队列中得第一个线程,而signalAll是唤醒队列中得所有等待线程,但是只有一个等待的线程会被选择,这两个方法可以看做notify和notifyAll的变体。
在这个阻塞队列的insert和remove方法中都会被调用signal来唤醒等待线程,在put方法中,如果队列已经满了,则会调用await方法来,直到队列有空位,才会调用insert方法插入元素。源代码如下:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
其实设计这个类的程序员哥哥已经想到了如果我们要用这个类,但是不想在队列满了之后,再插入元素被阻塞,提供了offer方法,这个offer方法有重载方法,调用offer(E e)方法时,如果队列已经满了,那么会直接返回一个false,如果没有满,则直接调用insert插入到队列中;调用offer(E e, long timeout, TimeUnit unit)方法时,会在队列满了之后阻塞队列,但是这里可以由开发人员设置超时时间,如果超时时队列还是满的,则会以false返回。源码如下所示:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } insert(e); return true; } finally { lock.unlock(); } }
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
插入数据有阻塞和非阻塞之分,那么提取数据也肯定就有阻塞与非阻塞之分了。
其中take方法是个阻塞方法,当队列为空时,就被阻塞,源码如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
方法poll是重载方法,跟offer相对,也有基础方法和超时方法之分。
在这个类中还提供了peek方法来提取数据,但是peek方法是从对了的tail提取,而pool是从队列的head提取,即peek提取的元素是进入队列最晚的,而pool提取的元素是进入队列最早时间最长的元素。
再来看看这个类中得迭代器。这个类中的迭代器是线程安全的,为什么会线程安全?因为在实现的next和remove方法中都加了lock了,安全性根本停不下来啊~上源码:
private class Itr implements Iterator<E> { private int remaining; // Number of elements yet to be returned private int nextIndex; // Index of element to be returned by next private E nextItem; // Element to be returned by next call to next private E lastItem; // Element returned by last call to next private int lastRet; // Index of last element returned, or -1 if none Itr() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { lastRet = -1; if ((remaining = count) > 0) nextItem = itemAt(nextIndex = takeIndex); } finally { lock.unlock(); } } public boolean hasNext() { return remaining > 0; } public E next() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); //锁定 try { if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; E x = itemAt(nextIndex); // check for fresher value if (x == null) { x = nextItem; // we are forced to report old value lastItem = null; // but ensure remove fails } else lastItem = x; while (--remaining > 0 && // skip over nulls (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) ; return x; } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; E x = lastItem; lastItem = null; // only remove if item still at index if (x != null && x == items[i]) { boolean removingHead = (i == takeIndex); removeAt(i); if (!removingHead) nextIndex = dec(nextIndex); } } finally { lock.unlock(); } } }
三,总结
对这个类的描述已经代码分析完成了,接下来我们来总结一下这个类的一些特点吧:
1.一旦创建,则容量不能再改动
2.这个类是线程安全的,并且迭代器也是线程安全的
3.这个类的put和take方法分别会在队列满了和队列空了之后被阻塞操作。
4.这个类提供了offer和poll方法来插入和提取元素,而不会在队列满了或者队列为空时阻塞操作。
5.这个队列的锁默认是不公平策略,即唤醒线程的顺序是不确定的。
java同步包种ArrayBlockingQueue类的分析与理解