LinkedBlockingQueue
概述
LinkedBlockingQueue是基于链表的阻塞FIFO队列,可以指定一个最大的长度限制以防止过度扩展,未指定情况下其大小为Integer.MAX_VALUE;提供比ArrayBlockingQueue更高的吞吐量但是在高并发条件下可预测性降低。
LinkedBlockingQueue使用了两个锁,doc介绍是“two lock queue”算法的变种,但是我没找着这种算法。
同步策略
putlock只负责添加,takelock只负责删除。这样添加和删除操作就可以分开做,这样也是其吞吐量高于ArrayBlockingQueue的原因,而对于count则使用一个AtomicInteger来进行同步。但是对于迭代器操作和remove(Object)操作则是不同的,需要同时加上两种所才行。对于notFull和notEmpty的条件变量这里实际也是分开的。doc里提到这里使用级联通知模式。我没找到对应的翻译。我理解的话就是每次进行put或者take类似的操作如果满足条件都对两个条件变量进行notify。
可见性
读者和写者之间的可见性问题,当元素是队尾元素时,获取put锁,并且count更新,随后的读者通过fullLock获得put锁或获得take锁来保证其对于队尾元素的可见性。然后读取n=count.get(),这个保证获得对第n个元素的可见性。
迭代器策略
为了实现若一致性的迭代器。我们显然需要保持所有节点的是GC可达的(我理解的话就是在创建迭代器的时候当前的所有节点在迭代器存在期间不会被GC回收),这会导致两个问题:
错误的使用迭代器会导致无效元素无限制的存在于内存中。
当一个节点在存在期间本删除,GC将很难处理这种情况,将导致重复收集界定啊,并导致新旧节点的跨代连接。(我不是很理解这里到底想表达什么意思。)
源码部分
Node
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
Node是一个静态内部类,单向链表。
域
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private基础方法
sign方法
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
这个方法但看起来似乎没有什么特别的。但是细想起来有很多很多有意思的地方。这是并发编程难而有趣的地方:
- 首先一个是为什么好好的notEmpty.signal()要用takesignal来包裹起来?
我们来看条件变量的意义是什么?而通常来说条件变量可以说是用来表现当前维护的共享可变资源的状态的,在来而同步组件的意义在于控制访问共享可变资源的访问权限的。那这两者有什么关系呢?通常在java里来说条件变量需要通过同步组件比如说来生成。这是一条java语言的属性。还有更关键的一条条件变量的唤醒通常在生成器的同步组件的同步区域内进行。道理很简单如果没有检查权限就操作状态资源显然是不合理的。而通常来说条件变量的阻塞在同步区的首位,而其状态的唤醒操作在同步区的尾部。操作系统曾简要介绍过这里。
- signalNotEmpty的使用
单独锁是很好操作的,比如ArrayBlockingQueue,但是两个锁的话就会出现很多问题,比如死锁比如锁的获取顺序。既然条件变量上有锁,那么像是signalNotEmpty这样的情况就必须考虑其调用的位置,在LinkedBlockingQueue里,signalNotEmpty虽然用在用在put中但是从不在putLock的同步区内,而signalNotFull也是一样这样就保证同一时刻只持有一个锁这样就不会出现死锁问题。
- signalNotEmpty的使用
入队和出队操作
当然这是不考虑同步的问题的操作就简单很多了
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
这里就一个就是需要考虑GC的问题,在dequeue中新建了一个h变量来指向之前的head。然后让h.next =h 这里的标注是helpGC。如果对虚拟机的GC原理有大概的了解的话就知道java的GC策略是维护一个访问链,如果无法访问到就需要进行删除。其实这里的h本身就是访问不到的了。但是h.next还是可以访问到的,所以让h.next指向h就可以保证整个h对象都无法访问。从而可以让GC快速回收。这个方法其实之前就有很多地方都用到。比如在collection中很多用到链表的地方都用到了。
fulllock
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
主要就是锁获取的顺序必须是反向的。
构造器
构造器主要是两个:
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
一个空的构造其head和last都是null的空节点。要主要BlockingQueue本身是不允许空元素的,这里仅仅是head和last是空节点。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
没什么特别要说的。
put类操作
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
基本的思路是很显而易见的,等待直到count小于capacity。然后进行入队操作,count增加。
在离开同步区域之前最后一步是唤醒notfull。
在离开同步区域之后对c进行判断,如果c为0则说明之前队列为空,也就是说有可能有线程阻塞在NotEmpty条件变量上,所以唤醒signalNotEmpty()。
但是这里有两点需要说明的问题:
- 首先一个是使用AtomicInteger的必要性
这里解释了为什么count必须用AtomicInteger。在while(count.get()==capacity)这条语句的时间点,当前线程用于put锁,所以所有其他入队操作都被阻塞,当前count只会因为其他出队操作而下降而不会因为入队操作上升。当然就这里的情况来看使用volatile变量也是可以的,但是在入队之后需要对count进行自增操作,这样volatile就不行了,必须要使用AtomicInteger当然你也可以选用同步组件+Int这样的方式,不过这样又需要考虑很多问题,不如AtomicInteger简单。
- 对于离开前唤醒notFull的必要性
这一点是比较难理解的,我目前也不知道理解的对不对。权且说下我自己的理解。
if (c + 1 < capacity)
notFull.signal();
这句话是否有必要?是有必要的。
java里面的条件变量实际是有些不同的在条件变量调用await的时候是会释放锁的。所以实际上有可能有两个以上线程阻塞在notFull.await这里。其中一个被阻塞的线程被唤醒了,那必然是因为有了出队操作或者删除操作,如果在阻塞的节点同时删除了两个以上的元素会发生什么情况?首先一个被阻塞的线程会被唤醒,因为出队或者删除操作会调用notFull.signal。但是singal并不是signalAll。(signalAll是否可以还没有想明白。)只会唤醒一个线程。另一个线程就无法被唤醒了。所以这里还需要在离开时多做一次判断是否满足条件需要再次判断。
当然这里困惑大概可以这样去理解,但是问题在于如果我自己去写这样程序很难想得到会出现这样的情况。所以如果需要我们自己去构建同步容器该怎么办?那我这里就尝试一种蛮力的方式是否可行。就是对于当前锁的所有条件变量在离开条件区域的时候都需要进行判断,是否需要对满足条件的情况予以唤醒。这样的一来就可以竟可能减小出现同步错误的情况。
offer
offer方法和put相同区别只是两点
没有对条件变量notFull的while循环判断,而是以一次性的if条件来判断。并在返回时通过c>=0的条件来提供返回值。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
offer(E e, long timeout, TimeUnit unit)
基本上也是类似offer区别是,这里使用了对条件变量notFull带时间的阻塞。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
take类操作
put和take基本是相同的操作。条件变量不同而且count是减小之类的细微的区别。这里不再赘述
peek类操作
peek类操作其实比较简单。因为有一个head节点去维护当前的队首元素。只有判断先first(head的后继)是否为空就好。
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
remove(Object)及unlink操作
因为是单项链表所以删除操作稍微麻烦一些,
首先一个要fulllock因为有可能同时涉及到头尾结点的访问问题。
其次还需要一个维护n和n+1节点的坐标。同时移动这两个来进行节点操作。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
其他方法
其他方法内容都比较简单这里不做详细描述了
迭代器
迭代器的域:
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
current是当前的指针位置,currentElemt是取当前坐标时元素的内容。lastRet是刚刚跳过的上一个节点
迭代器的其他内容没什么要说明的但是next方法还是很有意思的:
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
if (s == p)
return head.next;
if (s == null || s.item != null)
return s;
p = s;
}
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
next方法主要由两部分构成一个是next(),这个方法首先判断当前current是否为null如果为null则不满足hasNext条件抛出异常。
然后取出currentElemtn,让current节点传递给lastRet。
然后让current等于后继节点。
如果current为null则让currentElement为null否则为当前current节点的元素。
next的逻辑属于比较正常的逻辑。比较有意思的地方是nextNode方法。
LinkedBlockingQueue是使用两个节点来定位首尾,然后用单项链表来维护队列。那么他需要考虑两种特殊的情况一种是出队的节点,一种是被删除的节点;
- 出队队列考虑上面所说的take类的方法中间有一个操作是让出队节点node的node.next指向其自身。所以可以使用s==p来判断。
- 被删除的操作可以考虑上面的remove,当然迭代器自身也是可以删除的,但是操作流程和remove(Object)方法是一样的。remove的方法中会将node.item置为null,在这种情况下实际上会运行p=s方法。也就是让p=p.next;那么在nextNode的下一次循环中会返回当前head.next的节点。
在正常的情况下p的next为s节点既不为空其item也不为空,则正常返回。而当上述两种特殊情况发生的时候实际上返回的是当前队首head.next的方法。其实从这可以看出这里的迭代是非常不稳定的。因为删除是从队首开始删除的。很有可能刚刚迭代过的元素被删除了。或者是当前元素总是不断的指向队首,然后迭代重复的元素,或者迭代到被删除的元素等等诸多情况。所以尽可能不要用迭代器。虽然对于LinkedBlockingQueue的有些操作使用了迭代器,但是大部分方法都是使用fulllock操作。所以基本上安全的。但是如果显式的声明迭代器,其性能是不可预测的。