Java同步数据结构之LinkedBlockingQueue

前言

比起ArrayBlockingQueue,LinkedBlockingQueue应该是最被大家常用的阻塞队列,LinkedBlockingQueue是基于链表的一种可选容量的阻塞队列,也就是说,在构造LinkedBlockingQueue实例的时候,你可以像ArrayBlockingQueue那样指定队列大小,也可以不指定大小(这时候默认就是Integer.MAX_VALUE),指定队列的大小是为了防止队列过度的扩张,导致内存被过度占用或溢出。链表的节点是在每一次插入时动态的创建的,除非这会导致队列超出容量限制。LinkedBlockingQueue的容量在实例被构造完成之后也是不允许被更改的。

与ArrayBlockingQueue一样LinkedBlockingQueue不允许插入null值,也是先进先出FIFO队列,队列的头部是队列中存在时间最长的元素,新元素被插入到队尾,队列出队从头部开始。与ArrayBlockingQueue相比,LinkedBlockingQueue通常具有更高的吞吐量,但在大多数并发应用程序中性能的可预测性较差。

LinkedBlockingQueue采用了“双锁队列” 算法,元素的入队和出队分别由putLock、takeLock两个独立的可重入锁来实现。所以比起ArrayBlockingQueue明显提高了吞吐量。

源码分析

先看看其成员变量:

 1 static class Node<E> {
 2     E item;
 3
 4     /**
 5      * One of:
 6      * - the real successor Node
 7      * - this Node, meaning the successor is head.next
 8      * - null, meaning there is no successor (this is the last node)
 9      */
10     Node<E> next;
11
12     Node(E x) { item = x; }
13 }
14
15 /** The capacity bound, or Integer.MAX_VALUE if none */
16 private final int capacity;
17
18 /** Current number of elements */
19 private final AtomicInteger count = new AtomicInteger();
20
21 /**
22  * Head of linked list.
23  * Invariant: head.item == null
24  */
25 transient Node<E> head;
26
27 /**
28  * Tail of linked list.
29  * Invariant: last.next == null
30  */
31 private transient Node<E> last;
32
33 /** Lock held by take, poll, etc */
34 private final ReentrantLock takeLock = new ReentrantLock();
35
36 /** Wait queue for waiting takes */
37 private final Condition notEmpty = takeLock.newCondition();
38
39 /** Lock held by put, offer, etc */
40 private final ReentrantLock putLock = new ReentrantLock();
41
42 /** Wait queue for waiting puts */
43 private final Condition notFull = putLock.newCondition();

上面的Node节点内部类显然就是用于实现链表的节点实体,item就是当前节点携带的真正对象,next指向下一个节点。head、last分别表示链表的首尾节点,值得注意的是,在LinkedBlockingQueue内部实现的时候,head节点不会参与到链表的实体绑定,也就是说,真正的有效节点挂载都在head节点之后,所以head.item 永远都为null。takeLock和putLock两把锁以及各自的Condition实例分别用于队列元素的出队和入队,可以看到表示队列当前元素个数的count是由一个原子变量来保存的,这是为了避免在维护该变量的时候需要同时获取takeLock、putLock两个锁。当然LinkedBlockingQueue内部还是有一些方法需要同时获取两个锁才能执行,后面会介绍。

LinkedBlockingQueue实例在构造的时候可以指定容量也可以不指定,另外和ArrayBlockingQueue一样也可以在初始化的时候用一个指定的集合初始化队列:

 1 public LinkedBlockingQueue(int capacity) {
 2     if (capacity <= 0) throw new IllegalArgumentException();
 3     this.capacity = capacity;
 4     last = head = new Node<E>(null); //初始化首尾节点
 5 }
 6
 7 public LinkedBlockingQueue(Collection<? extends E> c) {
 8     this(Integer.MAX_VALUE);
 9     final ReentrantLock putLock = this.putLock;
10     putLock.lock(); // Never contended, but necessary for visibility
11     try {
12         int n = 0;
13         for (E e : c) {
14             if (e == null)
15                 throw new NullPointerException();
16             if (n == capacity)
17                 throw new IllegalStateException("Queue full");
18             enqueue(new Node<E>(e));
19             ++n;
20         }
21         count.set(n);
22     } finally {
23         putLock.unlock();
24     }
25 }

通过以上的构造方法可见一开始首尾节点其实是同一个节点,使用一个集合构造实例的时候,容量是无限的即Integer.MAX_VALUE,在入队操作之前先获取putLock,再循环遍历每一个元素一个一个的入队,一旦队列满了就会抛出IllegalStateException异常。

可阻塞入队操作:

 1 public void put(E e) throws InterruptedException {
 2     if (e == null) throw new NullPointerException();
 3     // Note: convention in all put/take/etc is to preset local var
 4     // holding count negative to indicate failure unless set.
 5     int c = -1;
 6     Node<E> node = new Node<E>(e);
 7     final ReentrantLock putLock = this.putLock;
 8     final AtomicInteger count = this.count;
 9     putLock.lockInterruptibly();
10     try {
11         /*
12          * Note that count is used in wait guard even though it is
13          * not protected by lock. This works because count can
14          * only decrease at this point (all other puts are shut
15          * out by lock), and we (or some other waiting put) are
16          * signalled if it ever changes from capacity. Similarly
17          * for all other uses of count in other wait guards.
18          */
19         while (count.get() == capacity) {
20             notFull.await();
21         }
22         enqueue(node);
23         c = count.getAndIncrement();
24         if (c + 1 < capacity)
25             notFull.signal();
26     } finally {
27         putLock.unlock();
28     }
29     if (c == 0)
30         signalNotEmpty();
31 }

通过入队的元素构造一个Node实例,入队先获取putLock,如果队列满了就等待,入队完成之后如果队列还没有满就唤醒其它可能被阻塞的入队操作,然后释放putLock。注意在最后如果队列从空变成非空还要唤醒消费线程即阻塞在takeLock锁的线程(即signalNotEmpty方法)。另一个入队方法offer的原理大同小异就不介绍了,实现都差不多,但最终也都是会调用enqueue做入队操作:

 1 /**
 2  * Links node at end of queue.
 3  *
 4  * @param node the node
 5  */
 6 private void enqueue(Node<E> node) {
 7     // assert putLock.isHeldByCurrentThread();
 8     // assert last.next == null;
 9     last = last.next = node;
10 }

入队操作其实就是将原来的尾节点的next指向新加入的节点,并且把这个新加入的节点设置成尾节点,由于初始化的时候head==last,所以第一个节点其实就是挂载到head.next的,最终的数据结构如下:head->n1->n2->n3......

可阻塞出队操作:

 1 public E take() throws InterruptedException {
 2     E x;
 3     int c = -1;
 4     final AtomicInteger count = this.count;
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lockInterruptibly();
 7     try {
 8         while (count.get() == 0) {
 9             notEmpty.await();//队列为空等待
10         }
11         x = dequeue(); //获取头元素
12         c = count.getAndDecrement();
13         if (c > 1) //队列中至少还有一个元素,唤醒其它可能被阻塞的出队操作
14             notEmpty.signal();
15     } finally {
16         takeLock.unlock();
17     }
18     if (c == capacity) //队列从满变成不满状态,唤醒其它可能被阻塞的入队操作
19         signalNotFull();
20     return x;
21 }

先获取takeLock锁,队列为空则阻塞等待,队列不为空时获取一个头元素,如果队列中至少还有一个元素的话就唤醒其它可能被阻塞的出队操作,注意这里的c-1才是当前真正的队列中元素的个数,所以c == capacity表示的就是队列从满变成了不满状态,所以需要唤醒其它可能被阻塞的入队操作。另外的出队方法poll原理差不多,但最终也都是调用dequeue做出队操作:

 1 private E dequeue() {
 2     // assert takeLock.isHeldByCurrentThread();
 3     // assert head.item == null;
 4     Node<E> h = head;
 5     Node<E> first = h.next; //头节点的next才是真正的第一个节点
 6     h.next = h; // help GC
 7     head = first;
 8     E x = first.item;
 9     first.item = null;
10     return x;
11 }

出队的逻辑很巧妙,由于head.next才是真正的第一个节点,所以拿到第一个节点的数据item之后,会让原来的head.next指向自己,把本该被移除的第一个节点的item清空,然后把它变成新的head节点。这样保证head.next才是真正的第一个有效节点,而将原来的head.next指向自己是为了让GC能够快速的回收(为什么不将head.next直接置为null 是为了在迭代器迭代到当前节点时区分到底是被take了还是队列已经结束,如果为null 就被被判断为队列结束)。可以说移除的其实是head节点,而原来的head.next成了新的head。

这里这种移除节点的方式很关键,为什么出入队操作使用两个独立的锁不会出问题呢?因为take 操作拿到的虽然确实是head.next即第一个有效节点的数据,但是真正移除的节点确是head节点,原来那个本应该被移除的节点还存在于队列中只是它变成了head ,试想现在的队列只有一个元素即队列长这样:head -> n1 ,现在两个线程同时分别进行take 和put 操作,put会将新节点n2挂到n1.next, take 拿走n1.item,移除head,最终变成这样: n1 (head) -> n2. 因此并不会存在同步问题。

内部节点出队操作:

 1 public boolean remove(Object o) {
 2     if (o == null) return false;
 3     fullyLock();
 4     try {
 5         for (Node<E> trail = head, p = trail.next;
 6              p != null;
 7              trail = p, p = p.next) {
 8             if (o.equals(p.item)) {
 9                 unlink(p, trail);
10                 return true;
11             }
12         }
13         return false;
14     } finally {
15         fullyUnlock();
16     }
17 }

 1 void unlink(Node<E> p, Node<E> trail) {
 2     // assert isFullyLocked();
 3     // p.next is not changed, to allow iterators that are
 4     // traversing p to maintain their weak-consistency guarantee.
 5     p.item = null; //仅仅是将被移除的p节点的数据移除,并没有设置其next为空
 6     trail.next = p.next;
 7     if (last == p)
 8         last = trail;
 9     if (count.getAndDecrement() == capacity)
10         notFull.signal();
11 }

remove(object)可以移除指定的元素,该元素可能位于链表的任何位置。值得注意的是,这里获取了fullyLock,不允许所有的其它出入队操作,因为在移除节点的时候会破坏链表的结构,这时候如果有出入队操作很可能会导致元素被挂载到被删除的节点后面,或者将head指向被删除的节点,从而导致链表节点丢失。unlink方法就是用于将要移除的节点从链表中断开,并让它的上一个节点指向它的下一个节点。如果队列从满变成不满状态,那么唤醒可能阻塞的入队操作。这里并没有改变被移除节点的next指向,这是为了保证刚好迭代到p节点的迭代器能够继续往下迭代操作,而不会因为节点的移除而导致迭代器中途停止,即所谓的弱一致。值得注意的是,不论是从头节点出出队还是内部节点移除,都没有将它们的next指向重置为null其实都是为了方便迭代器实现.

除了remove,还有clear、contains、toArray、toString方法都会在操作的时候回去fullLock,这些方法会阻塞所有的其它方法,所以执行这些方法无疑会降低吞吐量的,在并发量高的场景尽量少使用。

而方法 drainTo() 即从队列中移除全部的(或指定量的)元素到指定的集合中,仅仅是获取了takeLock,因为该方法其实也仅仅是出队操作,只是它是通过从head节点遍历的方式来转移节点。

迭代器

LinkedBlockingQueue的迭代器实现比起ArrayBlockingQueue简单的多,但迭代的实现也有相类似的地方,例如在创建迭代器的时候就已经拿到了第一个有效节点的元素,每一次执行next的时候又准备好下一次迭代的返回对象,同ArrayBlockingQueue一样,它也有一个lastRet变量用来暂时存储当前迭代的节点,用于在it.next调用完成之后,调用it.remove()时避免删除不应该删除的元素。

 1 private class Itr implements Iterator<E> {
 2     /*
 3      * Basic weakly-consistent iterator.  At all times hold the next
 4      * item to hand out so that if hasNext() reports true, we will
 5      * still have it to return even if lost race with a take etc.
 6      */
 7
 8     private Node<E> current;
 9     private Node<E> lastRet;
10     private E currentElement;
11
12     Itr() {
13         fullyLock();
14         try {
15             current = head.next;
16             if (current != null)
17                 currentElement = current.item;
18         } finally {
19             fullyUnlock();
20         }
21     }
22
23     public boolean hasNext() {
24         return current != null;
25     }

很简单,在创建迭代器实例时直接拿到head.next即第一个有效节点,以及其数据 currentElement,hasNext直接判断current不为空即可,这也是为了保证迭代器的弱一致性,如果hasNext为true,那么next一定会返回非空的对象。

 1 /**
 2      * Returns the next live successor of p, or null if no such.
 3      *
 4      * Unlike other traversal methods, iterators need to handle both:
 5      * - dequeued nodes (p.next == p)
 6      * - (possibly multiple) interior removed nodes (p.item == null)
 7      */
 8     private Node<E> nextNode(Node<E> p) {
 9         for (;;) {
10             Node<E> s = p.next;
11             if (s == p) //如果s节点作为首个有效节点已经出队
12                 return head.next; //直接返回新的第一个有效节点
13             if (s == null || s.item != null)
14                 return s; //s就是正常的下一个有效节点,为null表示结束
15             p = s; //s.item ==null 说明已经被remove方法移除了,继续找它的下一个节点
16         }
17     }
18
19     public E next() {
20         fullyLock();
21         try {
22             if (current == null)
23                 throw new NoSuchElementException();
24             E x = currentElement;
25             lastRet = current; //保留当前遍历的节点,为接下来调用it.remove时使用。
26             current = nextNode(current);
27             currentElement = (current == null) ? null : current.item;
28             return x;
29         } finally {
30             fullyUnlock();
31         }
32     }

next方法没有什么好说的,而nextNode方法很关键,它会处理两种情况,1、当前节点p的下一个节点已经作为首个有效节点出队了,即p.next == p,这时候下一个节点其实就是新的首节点即head.next。2、如果当前节点的下一个节点被内部删除了即通过remove (object)移除,那么s.next不为空,但是s.item为空(具体请看unlink方法),所以需要继续寻找s.next节点,这里使用了无条件的for自旋,就可以跳过这种中间的一个或多个被remove方法移除的节点。

迭代器it.remove方法就不贴源码了,很简单,获取fullLocl,根据事先保存的next的返回节点lastRet遍历整个队列,发现了就unlink,没有发现就什么也不做。

LinkedBlockingQueue的迭代器保证了其弱一致性,除了首个有效节点在创建迭代器实例的时候就已经被保留下来之外(所以在获取迭代器实例之后,就算移除了头节点it.next也会返回该节点),队列中其它节点的变更都能被迭代器同步更新。LinkedBlockimgQueue的迭代器少了ArrayBlockingQueue那样很多精密的实现例如对于GC的友好性,所以使用多个迭代器实例可能内存性能有不可预测性。

可拆分迭代器Spliterator

LinkedBlockingQueue实现了自己的可拆分迭代器LBQSpliterator,从spliterator方法就可以看到:

public Spliterator<E> spliterator() {
    return new LBQSpliterator<E>(this);
}

可拆分迭代器的 tryAdvance、forEachRemaining、trySplit方法都是需要获取fullLock的,所以注意对吞吐量的影响,tryAdvance获取第一个item不为空的节点数据做指定的操作,forEachRemaining循环遍历当前迭代器中所有没有被移除的节点(item不为空)做指定的操作源码都很简单,就不贴代码了,它的拆分方法trySplit相对来说有意思的多:

 1 public Spliterator<E> trySplit() {
 2     Node<E> h;
 3     final LinkedBlockingQueue<E> q = this.queue;
 4     int b = batch; //batch初始值为0     //n第一次为1,第二次为2,依次加1,直到MAX_BATCH就固定下来
 5     int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
 6     if (!exhausted && //还有节点
 7         ((h = current) != null || (h = q.head.next) != null) &&
 8         h.next != null) {
 9         Object[] a = new Object[n];
10         int i = 0;
11         Node<E> p = current; //上一次拆分的结尾
12         q.fullyLock();
13         try {
14             if (p != null || (p = q.head.next) != null) {
15                 do {
16                     if ((a[i] = p.item) != null) //如果没有被移除就放到数组中
17                         ++i;
18                 } while ((p = p.next) != null && i < n); //继续从上一次拆分的结尾往后循环
19             }
20         } finally {
21             q.fullyUnlock();
22         }
23         if ((current = p) == null) { //更新这一次的结尾到current
24             est = 0L;
25             exhausted = true;
26         }
27         else if ((est -= i) < 0L) //如果已经没有元素了,设置est为0.
28             est = 0L;
29         if (i > 0) { //这一次拆出了元素,生成新的迭代器
30             batch = i; //更新batch
31             return Spliterators.spliterator
32                 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
33                  Spliterator.CONCURRENT);
34         }
35     }
36     return null;
37 }

LinkedBlockingQueue的迭代器拆分很特别,它不是像ArrayBlockingQueue那样每次分一半,而是第一次只拆一个元素,第二次拆2个,第三次拆三个,依次内推,拆分的次数越多,后面的迭代器分的得元素越多,直到一个很大的数MAX_BATCH(33554432) ,后面的迭代器每次都分到这么多的元素,拆分的实现逻辑很简单,每一次拆分结束都记录下拆分到哪个元素,下一次拆分从上次结束的位置继续往下拆分,直到没有元素可拆分了返回null。

总结

LinkedBlockingQueue使用了链表的方式实现队列,还有一个专门的head节点,所有的有效节点都移除挂载到head节点之后,采用两个独立的可重入锁分别对出入队进行加锁,而不像ArrayBlockingQueue那样所有操作都需要唯一的锁,所以吞吐量有了很大的提高,这也是LinkedBlockingQueue最被广泛使用的原因吧,但是它还是有很多方法(remove ,clear ,toArray, toString以及迭代器相关的方法)需要同时获取两个锁才能操作,这无疑会影响吞吐量,所以要合理使用。另外它在实现的时候创建了额外的Node节点实例来绑定真实数据,所以对内存的消耗稍微要多一些。

原文地址:https://www.cnblogs.com/txmfz/p/10316782.html

时间: 2024-08-01 05:16:45

Java同步数据结构之LinkedBlockingQueue的相关文章

Java同步数据结构之ConcurrentLinkedQueue

前言 前面介绍的Queue都是通过Lock锁实现的阻塞队列,今天介绍一种非阻塞队列ConcurrentLinkedQueue,所谓非阻塞,其实就是通过CAS代替加锁来实现的高效的非阻塞队列.当许多线程共享对公共集合的访问时,ConcurrentLinkedQueue是一个合适的选择.与大多数其他并发集合实现一样,该类不允许使用空元素. ConcurrentLinkedQueue是一个基于链表的无界线程安全的先进先出队列.虽然前面介绍的队列也有基于链表的实现,例如LinkedBlockingQue

java 基础数据结构

数据结构, 需要考虑两个方面: 1. 每个元素具体的存储方法 (java中是一个对象) 2. 元素之间的关系如何实现存储 (java中也是一个对象) 另外在java中, 已经可以把跟数据结构有关的一些方法写到一个类里了. 线性表 顺序表 c语言: 借助数组实现 #define INIT_SIZE 100; typedef struct { int elem[INIT_SIZE]; // 用来存储数组元素 int length; // 当前顺序表的长度 } SqList; // 元素之间的关系隐含

Java基本数据结构总结

一直没有很仔细的系统学习Java,之前一直用的是python和c/c++,但是既然要走上大数据的道路,那么一定逃脱不开java的.下面在网上找到一些资料并结合相关的书进行整理总结. java.util包,包含集合框架.遗留的 collection 类.事件模型.日期和时间设施.国际化和各种实用工具类(字符串标记生成器.随机数生成器和位数组.日期Date类.堆栈Stack类.向量Vector类等).集合类.时间处理模式.日期时间工具等各类常用工具包.下面要介绍的Java版的数据结构就是在这个包中.

【转】Java学习---Java核心数据结构(List,Map,Set)使用技巧与优化

[原文]https://www.toutiao.com/i6594587397101453827/ Java核心数据结构(List,Map,Set)使用技巧与优化 JDK提供了一组主要的数据结构实现,如List.Map.Set等常用数据结构.这些数据都继承自 java.util.Collection 接口,并位于 java.util 包内. 1.List接口 最重要的三种List接口实现:ArrayList.Vector.LinkedList.它们的类图如下: 可以看到,3种List均来自 Ab

Java同步与异步

一.关键字: thread(线程).thread-safe(线程安全).intercurrent(并发的) synchronized(同步的).asynchronized(异步的). volatile(易变的).atomic(原子的).share(共享) 二.总结背景: 一次读写共享文件编写,嚯,好家伙,竟然揪出这些零碎而又是一路的知识点.于是乎,Google和翻阅了<Java参考大全>.<Effective Java Second Edition>,特此总结一下供日后工作学习参考

java 同步锁(synchronized)

java 同步锁(synchronized) 在java中,Synchronized就是一把锁,他可以锁定一个方法,也可以锁定一个方法,我擦,其实这两个东西就是一样的.块不就是一个没有名字的方法么,方法就是一个有名字的块.本文就用块来测试.所谓锁,就是原子操作,把这个锁定的块作为一个整体,就像你上厕所,拉了就要擦屁屁,当然你也可以不擦,如果你不在意出现的问题的话.信号量Semaphore和这个Synchronized 其实实现的功能差不多,不过效率不同,使用的方式也不同.Synchronized

Java 同步示例

同步 1)同步方法 2)同步块 21) 实例变量 22) 类变量 锁定的内容 1)锁定类的某个特定实例 2)锁定类对象(类的所有实例) 一.同步类实例:同步方法 public class Demo { public synchronized void m1(){ //............... } public void m2(){ //............ synchronized(this){ //......... } //........ } } 这两种写法的效果是一样的,锁定的

java项目——数据结构实验报告

java项目——数据结构总结报告 20135315  宋宸宁 实验要求 1.用java语言实现数据结构中的线性表.哈希表.树.图.队列.堆栈.排序查找算法的类. 2.设计集合框架,使用泛型实现各类. 3.API的编写,并导出. 4.使用TDD模式,对程序进行测试,利用TestSuite将各测试类整合到一起. 5.与小组成员实现代码的整合. 实验设计过程 首先自学集合框架章节的内容,初步设计相关的类. 根据数据结构课本的章节分类,实验各数据结构类. 在类的编写过程中,经过老师的指导,我准备使用泛型

java实现数据结构-线性表-顺序表,实现插入,查找,删除,合并功能

package 顺序表; import java.util.ArrayList; import java.util.Scanner; public class OrderList { /** * @param args * @author 刘雁冰 * @2015-1-31 21:00 */ /* * (以下所谓"位置"不是从0开始的数组下标表示法,而是从1开始的表示法.) * (如12,13,14,15,16数据中,位置2上的数据即是13) * * 利用JAVA实现数据结构-线性表-顺