java 并发(五)---AbstractQueuedSynchronizer

文章部分图片和代码来自参考文章。

LockSupport 和 CLH 和 ConditionObject

阅读源码首先看一下注解 ,知道了大概的意思后,再进行分析。注释一开始就进行了概括。AQS的实现是基于FIFO等待队列的。

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

可以从上图看到,AQS里面先是定义了这两个类,AQS的实现也都会围绕着两个类进行。为了了解这两个类我们先了解一下以下知识。

CLH

AQS中有个Node,可以看到它的注释 :

The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks.

Node类是一个变形的 CLH . 那么CLH是什么呢?它是自旋锁的一种。下面的文章讲解了什么是CLH :   克雷格.兰丁&hagersten (CLH Lock) , 简单来说就是当前节点要运行就必须等待当前节点的前一个节点释放锁。运行以下代码 :

示例代码来自参考资料

  1 public class ClhSpinLock implements Lock{
  2     private final ThreadLocal<Node> prev;
  3     private final ThreadLocal<Node> node;
  4     private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
  5
  6     public ClhSpinLock() {
  7         this.node = new ThreadLocal<Node>() {
  8             protected Node initialValue() {
  9                 return new Node();
 10             }
 11         };
 12
 13         this.prev = new ThreadLocal<Node>() {
 14             protected Node initialValue() {
 15                 return null;
 16             }
 17         };
 18     }
 19
 20     /**
 21      * 1.初始状态 tail指向一个node(head)节点
 22      * +------+
 23      * | head | <---- tail
 24      * +------+
 25      *
 26      * 2.lock-thread加入等待队列: tail指向新的Node,同时Prev指向tail之前指向的节点
 27      * +----------+
 28      * | Thread-A |
 29      * | := Node  | <---- tail
 30      * | := Prev  | -----> +------+
 31      * +----------+        | head |
 32      *                     +------+
 33      *
 34      *             +----------+            +----------+
 35      *             | Thread-B |            | Thread-A |
 36      * tail ---->  | := Node  |     -->    | := Node  |
 37      *             | := Prev  | ----|      | := Prev  | ----->  +------+
 38      *             +----------+            +----------+         | head |
 39      *                                                          +------+
 40      * 3.寻找当前node的prev-node然后开始自旋
 41      *
 42      */
 43     public void lock() {
 44         final Node node = this.node.get();
 45         node.locked = true;
 46         Node pred = this.tail.getAndSet(node);
 47         this.prev.set(pred);
 48         // 自旋
 49         while (pred.locked);
 50     }
 51
 52     public void unlock() {
 53         final Node node = this.node.get();
 54         node.locked = false;
 55         this.node.set(this.prev.get());
 56     }
 57
 58     @Override
 59     public void lockInterruptibly() throws InterruptedException {
 60         // TODO Auto-generated method stub
 61
 62     }
 63
 64     @Override
 65     public boolean tryLock() {
 66         // TODO Auto-generated method stub
 67         return false;
 68     }
 69
 70     @Override
 71     public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
 72         // TODO Auto-generated method stub
 73         return false;
 74     }
 75
 76     @Override
 77     public Condition newCondition() {
 78         // TODO Auto-generated method stub
 79         return null;
 80     }
 81
 82     private static class Node {
 83         private volatile boolean locked;
 84     }
 85 }
  1 public class LockTest {
  2     static int count = 0;
  3
  4     public static void testLock(Lock lock) {
  5         try {
  6             lock.lock();
  7             for (int i = 0; i < 10000000; i++) ++count;
  8         } finally {
  9             lock.unlock();
 10         }
 11     }
 12
 13     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
 14         final ClhSpinLock clh = new ClhSpinLock();
 15         final CyclicBarrier cb = new CyclicBarrier(10, new Runnable() {
 16             @Override
 17             public void run() {
 18                 System.out.println(count);
 19             }
 20         });
 21
 22         for (int i = 0; i < 10; i++) {
 23             new Thread(new Runnable() {
 24                 @Override
 25                 public void run() {
 26                     testLock(clh);
 27                     // 这段代码是非lock比较使用
 28 //                    for (int i = 0; i < 10000000; i++)
 29 //                        count++;
 30                     try {
 31                         cb.await();
 32                     } catch (InterruptedException | BrokenBarrierException e) {
 33                         e.printStackTrace();
 34                     }
 35                 }
 36             }).start();
 37         }
 38     }
 39 }

上面的代码出现了ThreadLocal ,可以先看一下这篇文章,了解一下 ThreadLocal  : Java并发编程:深入剖析ThreadLocal

LockSupport

LockSupport,构建同步组件的基础工具,帮AQS完成相应线程的阻塞或者唤醒的工作。先阅读 : java并发编程之LockSupport  和 Java并发包源码学习之AQS框架(三)LockSupport和interrupt  可以知道LockSupport 重要的方法有 :unpark 和 park 。LockSupport所有的方法都是调用native的park和unpark实现的。具体源码查看上面的链接。

为什么在java6要在入参引入blocker呢?blocker的作用到底是什么?

有blocker的可以传递给开发人员更多的现场信息,可以查看到当前线程的阻塞对象,方便定位问题。所以java6新增加带blocker入参的系列park方法,替代原有的park方法。

ConditionObject

ConditonObject 继承自 Condition 接口。我们看一下这个接口是干嘛的?

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

最后一句话可以知道,Condition 这个接口是代替了 Object 这个类的有关monitor的方法。

再看一下ConditionObject 在 AQS中到底是什么作用。

Condition implementation for a AbstractQueuedSynchronizer serving as the basis of a Lock implementation.

ConditionObject 作为 Condition的一个实现,在AQS中实现了Lock的基本功能。

下面有几点需要注意 :

LockSupport 与 对象的 wait/notify的不同在于:

  1. LockSupport不需要获取对象的监视器。
  2. 实现机制不同,lockSupport 调用 native的park ,unpark方法。

AQS 详解

   以下部分总结来自 Java并发(五):aqs框架 

AQS 初步了解

AQS 锁概念

先说说几个概念

  • 独占锁&共享锁

    • 独占锁
      资源最多同时只能被一个线程占用
    • 共享共享锁
      资源同时可以被多个线程占用
  • 公平锁&非公平锁
    • 公平锁
      线程按照提交的顺序依次去获取资源,按照一定的优先级来,FIFO
    • 非公平锁
      线程获取资源的顺序是无序的

AQS的关键点有三个

  • 提供变量statue来维护状态 (int state这个局部变量)
  • 维护线程阻塞队列 (CLH 就是我们上面讲的呀!)
  • 阻塞和唤醒线程(ConditionObject也是上面讲的呀!)

AQS中的关键字段

  • private volatile int state;
    同步状态,通过设置state来达到同步的效果.该字段在子类代表的含义不同。
  • private transient volatile Node tail;
    线程等待队列的尾部,只有enq方法在向队列添加新线程时,才会修改该值 。
  • private transient volatile Node head;
    线程等待队列的头部。

下面是offset相关的变量,主要是为了提供CAS操作而设立的变量:

  • private static final long stateOffset;
    同步状态的offset
  • private static final long headOffset;
    等待队列头的offset
  • private static final long tailOffset;
    等待队列尾的offset
  • private static final long waitStatusOffset;
    当前节点的等待状态的offset
  • private static final long nextOffset;
    当前节点的下一个节点offset

AQS中的关键方法

已经实现的方法如下

  • public final void acquire(int arg)
    获取锁,一般会调用acquire(1)来获取。
  • public final boolean release(int arg)
    释放锁,一般会调用release(1)来释放。

需要被子类实现的方法

  • protected boolean tryAcquire(int arg)
    尝试获取独占锁,在获取前会检查同步状态是否允许获取独占锁。获取成功则返回true,返回false则把线程加入到等待队列。
  • protected boolean tryRelease(int arg)
    尝试通过设置同步状态,来释放独占锁。
  • protected int tryAcquireShared(int arg)
    尝试获取共享锁,在获取前会检查同步状态是否允许获取共享锁。获取成功则返回true,返回false则把线程加入到等待队列。
  • protected boolean tryReleaseShared(int arg)
    尝试通过设置同步状态,来释放共享锁。

独占获取锁

       下面内容部分来自 一行一行源码分析清楚AbstractQueuedSynchronizer 

AQS里面有一个Node类,它的结构如下:

  1 static final class Node {
  2     /** Marker to indicate a node is waiting in shared mode */
  3     // 标识节点当前在共享模式下
  4     static final Node SHARED = new Node();
  5     /** Marker to indicate a node is waiting in exclusive mode */
  6     // 标识节点当前在独占模式下
  7     static final Node EXCLUSIVE = null;
  8
  9     // ======== 下面的几个int常量是给waitStatus用的 ===========
 10     /** waitStatus value to indicate thread has cancelled */
 11     // 代码此线程取消了争抢这个锁
 12     static final int CANCELLED =  1;
 13     /** waitStatus value to indicate successor‘s thread needs unparking */
 14     // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
 15     static final int SIGNAL    = -1;
 16     /** waitStatus value to indicate thread is waiting on condition */
 17     // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
 18     static final int CONDITION = -2;
 19     /**
 20      * waitStatus value to indicate the next acquireShared should
 21      * unconditionally propagate
 22      */
 23     // 同样的不分析,略过吧
 24     static final int PROPAGATE = -3;
 25     // =====================================================
 26
 27     // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
 28     // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
 29     // 也许就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
 30     volatile int waitStatus;
 31     // 前驱节点的引用
 32     volatile Node prev;
 33     // 后继节点的引用
 34     volatile Node next;
 35     // 这个就是线程本尊
 36     volatile Thread thread;
 37
 38 }

下面重点说一下这块代码,这是独占模式下的加锁获取

  1  // 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
  2     // 否则,acquireQueued方法会将线程压到队列中
  3     public final void acquire(int arg) { // 此时 arg == 1
  4         // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
  5         // 因为有可能直接就成功了呢,也就不需要进队列排队了,
  6         // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
  7         if (!tryAcquire(arg) &&
  8             // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
  9             acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
 10               selfInterrupt();
 11         }
 12     }
 13
 14
 15 final boolean acquireQueued(final Node node, int arg) {
 16     boolean failed = true;
 17     try {
 18         boolean interrupted = false;
 19         for (;;) {
 20             final Node p = node.predecessor();
 21             if (p == head && tryAcquire(arg)) {
 22                 setHead(node);
 23                 p.next = null; // help GC
 24                 failed = false;
 25                 return interrupted;
 26             }
 27             if (shouldParkAfterFailedAcquire(p, node) &&
 28                 parkAndCheckInterrupt())
 
                // 说明当前线程是被中断唤醒的。
                // 注意:线程被中断之后会继续走到if处去判断,也就是会忽视中断。
                // 除非碰巧线程中断后acquire成功了,那么根据Java的最佳实践,
                // 需要重新设置线程的中断状态(acquire.selfInterrupt)。        
 29                 interrupted = true;
 30         }
 31     } finally {
 32         if (failed)
 33             cancelAcquire(node);
 34     }
 35 }
 36 
  1 /**
  2      * Creates and enqueues node for current thread and given mode.
  3      *
  4      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  5      * @return the new node
  6      */
  7     // 此方法的作用是把线程包装成node,同时进入到队列中
  8     // 参数mode此时是Node.EXCLUSIVE,代表独占模式
  9     private Node addWaiter(Node mode) {
 10         Node node = new Node(Thread.currentThread(), mode);
 11         // Try the fast path of enq; backup to full enq on failure
 12         // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
 13         Node pred = tail;
 14
 15         // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
 16         if (pred != null) {
 17             // 设置自己的前驱 为当前的队尾节点
 18             node.prev = pred;
 19             // 用CAS把自己设置为队尾, 如果成功后,tail == node了
 20             if (compareAndSetTail(pred, node)) {
 21                 // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
 22                 // 上面已经有 node.prev = pred
 23                 // 加上下面这句,也就实现了和之前的尾节点双向连接了
 24                 pred.next = node;
 25                 // 线程入队了,可以返回了
 26                 return node;
 27             }
 28         }
 29         // 仔细看看上面的代码,如果会到这里,
 30         // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
 31         // 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的
 32         enq(node);
 33         return node;
 34     }
  1 /**
  2      * Inserts node into queue, initializing if necessary. See picture above.
  3      * @param node the node to insert
  4      * @return node‘s predecessor
  5      */
  6     // 采用自旋的方式入队
  7     // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
  8     // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
  9     private Node enq(final Node node) {
 10         for (;;) {
 11             Node t = tail;
 12             // 之前说过,队列为空也会进来这里
 13             if (t == null) { // Must initialize
 14                 // 初始化head节点
 15                 // 细心的读者会知道原来head和tail初始化的时候都是null,反正我不细心
 16                 // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
 17                 if (compareAndSetHead(new Node()))
 18                     // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
 19
 20                     // 这个时候有了head,但是tail还是null,设置一下,
 21                     // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
 22                     // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
 23                     // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
 24                     tail = head;
 25             } else {
 26                 // 下面几行,和上一个方法 addWaiter 是一样的,
 27                 // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
 28                 node.prev = t;
 29                 if (compareAndSetTail(t, node)) {
 30                     t.next = node;
 31                     return t;
 32                 }
 33             }
 34         }
 35     }
 36 

走完了end方法,那么该节点肯定是插入到了等待队列。继续前进

  1  /**
  2      * Checks and updates status for a node that failed to acquire.
  3      * Returns true if thread should block. This is the main signal
  4      * control in all acquire loops.  Requires that pred == node.prev
  5      *
  6      * @param pred node‘s predecessor holding status
  7      * @param node the node
  8      * @return {@code true} if thread should block
  9      */
 10     // 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
 11     // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
 12     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 13         int ws = pred.waitStatus;
 14         // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
 15         if (ws == Node.SIGNAL)
 16             /*
 17              * This node has already set status asking a release
 18              * to signal it, so it can safely park.
 19              */
 20             return true;
 21
 22         // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。这里需要知道这点:
 23         // 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
 24         // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
 25         // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
 26         // 找前驱节点的前驱节点做爹,往前循环总能找到一个好爹的
 27         if (ws > 0) {
 28             /*
 29              * Predecessor was cancelled. Skip over predecessors and
 30              * indicate retry.
 31              */
 32             do {
 33                 node.prev = pred = pred.prev;
 34             } while (pred.waitStatus > 0);
 35             pred.next = node;
 36         } else {
 37             /*
 38              * waitStatus must be 0 or PROPAGATE.  Indicate that we
 39              * need a signal, but don‘t park yet.  Caller will need to
 40              * retry to make sure it cannot acquire before parking.
 41              */
 42             // 仔细想想,如果进入到这个分支意味着什么
 43             // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
 44             // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
 45             // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
 46             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 47         }
 48         return false;
 49     }
 50
 51     // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
 52     // 这个方法结束根据返回值我们简单分析下:
 53     // 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
 54     //        我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
 55     // 如果返回false, 说明当前不需要被挂起,为什么呢?往后看
 56
 57     // 跳回到前面是这个方法
 58     // if (shouldParkAfterFailedAcquire(p, node) &&
 59     //                parkAndCheckInterrupt())
 60     //                interrupted = true;
 61
 62     // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
 63     // 那么需要执行parkAndCheckInterrupt():
 64
 65     // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
 66     // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
 67     private final boolean parkAndCheckInterrupt() {
 68         LockSupport.park(this);
 69         return Thread.interrupted();
 70     }
 71
 72     // 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况
 73
 74    // 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。
 75
 76     // 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
 77     // => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。
 78 }

我们结合上面的代码知道 acquireQueued 方法有个for(;;)循环,shouldParkAfterFailedAcquire 方法的作用是判断当前节点是否应该阻塞等待唤醒。内部过滤掉了Node节点waitStatus >0 的节点(即现在这段时间可能会取消掉抢锁),同时把状态都变为了 SIGNAL。

shouldParkAfterFailedAcquire方法的作用是:

  • 确定后继是否需要park;
  • 跳过被取消的结点;
  • 设置前继的waitStatus为SIGNAL.

parkAndCheckInterrupt() 这个方法一旦进入就会进入阻塞。注意哦,parkAndCheckInterrupt 这个方法一直等待阻塞,等待唤醒,但是唤醒之并且没有退出循环哦,所以会继续执行哦,等到什么时候才退出呢?等待第一个节点获取到锁了才退出。这样就形成了后一个节点等待上一个节点释放锁等待唤醒的等待队列。 唤醒之后,acquireQueued 退出之后,就会进入 selfInterrupt 方法 。

一旦tryAcquire成功则立即返回,否则线程会加入队列 线程可能会反复的被阻塞和唤醒直到tryAcquire成功,这是因为线程可能被中断, 而acquireQueued方法中会保证忽视中断(for循环),只有tryAcquire成功了才返回。中断版本的独占获取是acquireInterruptibly这个方法, doAcquireInterruptibly这个方法中如果线程被中断则acquireInterruptibly会抛出InterruptedException异常。

线程被唤醒只可能是:被unpark,被中断或伪唤醒。被中断会设置interrupted,acquire方法返回前会 selfInterrupt重置下线程的中断状态,如果是伪唤醒的话会for循环re-check。这里涉及到 LockSupport 可以看这篇文章: Java并发包源码学习之AQS框架(三)LockSupport和interrupt

独占模式释放

比较简单只要直接唤醒后续结点就可以了,后续结点会从parkAndCheckInterrupt方法中返回。

  1 public final boolean release(int arg) {
  2     // tryReease由子类实现,通过设置state值来达到同步的效果。
  3     if (tryRelease(arg)) {
  4         Node h = head;
  5         // waitStatus为0说明是初始化的空队列
  6         if (h != null && h.waitStatus != 0)
  7             // 唤醒后续的结点
  8             unparkSuccessor(h);
  9         return true;
 10     }
 11     return false;
 12 }

共享模式获取

acquireShared方法是用来共享模式获取。

  1 public final void acquireShared(int arg) {
  2     //如果没有许可了则入队等待
  3     if (tryAcquireShared(arg) < 0)
  4         doAcquireShared(arg);
  5 }
  6
  7 private void doAcquireShared(int arg) {
  8     // 添加队列
  9     final Node node = addWaiter(Node.SHARED);
 10     boolean failed = true;
 11     try {
 12         boolean interrupted = false;
 13         // 等待前继释放并传递
 14         for (;;) {
 15             final Node p = node.predecessor();
 16             if (p == head) {
 17                 int r = tryAcquireShared(arg);// 尝试获取
 18                 if (r >= 0) {
 19                     // 获取成功则前继出队,跟独占不同的是
 20                     // 会往后面结点传播唤醒的操作,保证剩下等待的线程能够尽快 获取到剩下的许可。
 21                     setHeadAndPropagate(node, r);
 22                     p.next = null; // help GC
 23                     if (interrupted)
 24                         selfInterrupt();
 25                     failed = false;
 26                     return;
 27                 }
 28             }
 29
 30             // p != head || r < 0
 31             if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
 32                 interrupted = true;
 33         }
 34     }
 35     finally {
 36         if (failed)
 37             cancelAcquire(node);
 38     }
 39 }
 

核心是这个doAcquireShared方法,跟独占模式的acquireQueued很像,主要区别在setHeadAndPropagate方法中, 这个方法会将node设置为head。如果当前结点acquire到了之后发现还有许可可以被获取,则继续释放自己的后继, 后继会将这个操作传递下去。这就是PROPAGATE状态的含义。

  1 private void setHeadAndPropagate(Node node, int propagate) {
  2     Node h = head; // Record old head for check below
  3     setHead(node);
  4     /*
  5      * 尝试唤醒后继的结点:<br />
  6      * propagate > 0说明许可还有能够继续被线程acquire;<br />
  7      * 或者 之前的head被设置为PROPAGATE(PROPAGATE可以被转换为SIGNAL)说明需要往后传递;<br />
  8      * 或者为null,我们还不确定什么情况。 <br />
  9      * 并且 后继结点是共享模式或者为如上为null。
 10      * <p>
 11      * 上面的检查有点保守,在有多个线程竞争获取/释放的时候可能会导致不必要的唤醒。<br />
 12      *
 13      */
 14     if (propagate > 0 || h == null || h.waitStatus < 0) {
 15         Node s = node.next;
 16         // 后继结是共享模式或者s == null(不知道什么情况)
 17         // 如果后继是独占模式,那么即使剩下的许可大于0也不会继续往后传递唤醒操作
 18         // 即使后面有结点是共享模式。
 19         if (s == null || s.isShared())
 20             // 唤醒后继结点
 21             doReleaseShared();
 22     }
 23 }
 24
 25 private void doReleaseShared() {
 26     for (;;) {
 27         Node h = head;
 28         // 队列不为空且有后继结点
 29         if (h != null && h != tail) {
 30             int ws = h.waitStatus;
 31             // 不管是共享还是独占只有结点状态为SIGNAL才尝试唤醒后继结点
 32             if (ws == Node.SIGNAL) {
 33                 // 将waitStatus设置为0
 34                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 35                     continue; // loop to recheck cases
 36                 unparkSuccessor(h);// 唤醒后继结点
 37                 // 如果状态为0则更新状态为PROPAGATE,更新失败则重试
 38             } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 39                 continue; // loop on failed CAS
 40         }
 41         // 如果过程中head被修改了则重试。
 42         if (h == head) // loop if head changed
 43             break;
 44     }
 45 }
 

共享模式释放

主要逻辑也就是doReleaseShared

  1 public final boolean releaseShared(int arg) {
  2     if (tryReleaseShared(arg)) {
  3         doReleaseShared();
  4         return true;
  5     }
  6     return false;
  7 }

参考资料 :

原文地址:https://www.cnblogs.com/Benjious/p/10101189.html

时间: 2024-11-09 05:56:54

java 并发(五)---AbstractQueuedSynchronizer的相关文章

java 并发(五)---AbstractQueuedSynchronizer(4)

读写锁 ReentrantReadWriteLock 首先我们来了解一下 ReentrantReadWriteLock 的作用是什么?和 ReentranLock 有什么区别?Reentrancy 英文的意思是可重入性.ReentrantReadWriteLock下文简称(rrwl)         下面总结来自   Java并发编程-ReentrantReadWriteLock ,你也可以从JDK 中阅读到这段. ReentrantReadWriteLock是Lock的另一种实现方式,我们已经

java 并发(五)---AbstractQueuedSynchronizer(3)

       文章代码分析和部分图片来自参考文章 认识 CountDownLatch 分析这个类,首先了解一下它所可以实现的效果,然后再顺着这个源码的思路思考是不是和它实现的效果一样.下面的代码和图片可以说明 CountDownLatch (下文简称CDL)的工作过程. 1 public class CountDownLatchDemo { 2 3 public static void main(String[] args) { 4 5 CountDownLatch latch = new Co

Java并发(五):并发,迭代器和容器

在随后的博文中我会继续分析并发包源码,在这里,得分别谈谈容器类和迭代器及其源码,虽然很突兀,但我认为这对于学习Java并发很重要; ConcurrentModificationException: JavaAPI中的解释:当不允许这样的修改时,可以通过检测到对象的并发修改的方法来抛出此异常.一个线程通常不允许修改集合,而另一个线程正在遍历它. 一般来说,在这种情况下,迭代的结果是未定义的. 某些迭代器实现(包括由JRE提供的所有通用集合实现的实现)可能会选择在检测到此行为时抛出此异常. 这样做的

Java并发编程-AbstractQueuedSynchronizer源码分析

简介 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础.使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态.然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作: java.util.concurrent.locks.Abstra

Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)

AbstractQueuedSynchronizer是实现Java并发类库的一个基础框架,Java中的各种锁(RenentrantLock, ReentrantReadWriteLock)以及同步工具类(Semaphore, CountDownLatch)等很多都是基于AbstractQueuedSynchronizer实现的.AbstractQueuedSynchronizer 一般简称AQS,Abstract表示他是一个抽象类,Queued表示他是基于先进先出 FIFO 等待队列实现的,Sy

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

《Java并发编程实战》第五章 同步容器类 读书笔记

一.同步容器类 1. 同步容器类的问题 线程容器类都是线程安全的.可是当在其上进行符合操作则须要而外加锁保护其安全性. 常见符合操作包括: . 迭代 . 跳转(依据指定顺序找到当前元素的下一个元素) . 条件运算 迭代问题能够查看之前的文章 <Java ConcurrentModificationException 异常分析与解决方式> 二.并发容器 集合类型 非线程安全 线程安全 List ArrayList CopyOnWriteArrayList Set SortedSet Concur

Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析

学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了

Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式

在上一篇<Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析>中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态.理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解.在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作.AQS为在独占模