队列同步器——AQS

一、AQS概念

队列同步器AQS(AbstractQueuedSynchronizer)是用来构建锁或者其它同步组件的基础框架,使用int成员变量state表示同步状态,通过内部的FIFO双向等待队列来完成线程的排队工作。同步器中的等待队列可以简单的理解为"等待锁的线程队列"。

  • 子类通过继承AQS并重写其指定方法来管理同步状态;
  • 子类被推荐为自定义同步组件的静态内部类,同步器本身并没有实现任何的同步接口,仅定义了若干同步状态获取和释放的方法供自定义同步组件使用;
  • 同步器支持独占式(EXCLUSIVE)的获取同步状态,也支持共享式(SHARED)的获取同步状态。

队列同步器AQS可以简单的理解为"同步状态的管理者",为了保证对同步状态的原子操作,使用CAS;当线程获取同步状态时,需要将获取同步状态失败的线程以及等待状态等信息构成一个节点(Node)放入等待队列,同时阻塞该线程,因此需要同步队列。

二、AQS的接口和实例

2.1 AQS的设计实现原理

同步器采用了模板方法的设计模式,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法。在重写同步器指定的方法时,需要对同步状态进行更改时,使用同步器提供的如下3个方法:getState()、setState(int newState)、compareAndSetState(int expect, int update)来进行操作,因为CAS操作保证同步状态的改变是线程安全的。下面是AQS源码中state的声明和三个方法的定义:

 1     /** 同步状态  */
 2     private volatile int state;
 3
 4     /**
 5      * 返回当前的同步状态。此操作的内存语义为 {@code volatile} read.
 6      */
 7     protected final int getState() {
 8         return state;
 9     }
10
11     /**
12      * 设置新的同步状态。此操作的内存语义为 {@code volatile} write.
13      */
14     protected final void setState(int newState) {
15         state = newState;
16     }
17
18     /**
19      * 如果当前的同步状态与期望值相同,通过原子操作更新状态值。
20      * 此操作的内存语义为{@code volatile} read and write.
21      * 如果当前的同步状态与期望值不同,返回false.
22      */
23     protected final boolean compareAndSetState(int expect, int update) {
24         // See below for intrinsics setup to support this
25         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
26     }

2.2 AQS提供的可被重写的方法

子类需要重写部分或全部一下方法。如果是独占锁就重写 tryAcquire() 和 tryRelease()方法,在独占模式下,每次只能有一个线程持有锁,RenentrantLock就是以独占方式实现的互斥锁;如果是共享锁就重写 tryAcquireShared() 和 tryReleaseShared() 方法,在共享模式下,允许多个线程同时获取锁,并发访问,共享资源,如:ReentrantReadWriteLock里的读锁,它的读锁是可以被共享的,但是它的写锁是独占的。AQS的内部类Node定义了两个常量SHARED和EXCLUSIVE,用来表示AQS队列中等待线程的锁获取模式。

 1     /**
 2      * 以独占模式获取同步状态,实现该方法需要查询当前状态并判断状态是否符合预期,然
 3      * 后再进行CAS设置同步状态。
 4      */
 5     protected boolean tryAcquire(int arg) {
 6         throw new UnsupportedOperationException();
 7     }
 8
 9     /**
10      * 以独占模式释放同步锁,等待获取同步状态的线程有机会获取同步状态。
11      */
12     protected boolean tryRelease(int arg) {
13         throw new UnsupportedOperationException();
14     }
15
16     /** 以共享模式获取同步状态。*/
17     protected int tryAcquireShared(int arg) {
18         throw new UnsupportedOperationException();
19     }
20
21     /** 以共享模式释放同步状态,该方法总是由执行释放的线程调用。*/
22     protected boolean tryReleaseShared(int arg) {
23         throw new UnsupportedOperationException();
24     }
25
26     /** 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占。*/
27     protected boolean isHeldExclusively() {
28         throw new UnsupportedOperationException();
29     }

2.3 AQS提供的模板方法

Java并发包根据是否允许多个线程同时获取锁,将加锁模式分为共享锁和独占锁。对于synchronized来说,如果一个线程在等待锁,那么结果只有两种,获得这把锁继续执行,或者线程保持等待;而AQS提供了另一种可能,这就是线程可以被中断,并且优先响应中断,也就是在等待锁的过程中,程序可以根据需要取消对锁的需求,可以避免死锁。除了等待外部通知(中断操作interrupt)之外,限时等待也可以避免死锁,给定一个时间,如果线程没有在给定的时间内获取到锁,让线程自动放弃。

不管是等待方式,中断方式 还是 定时方式,它们的主要功能获取锁、释放锁的逻辑方法是一样的,中断方式只是在获取锁之前增加了中断判断,定时方式只是增加了定时设计。因此,上面的重写方法是它们的共有逻辑,但实现方式可以不同,将它们放在子类中具体实现。

  1     /**
  2      * 以独占模式获取,忽略中断。 通过调用至少一次tryAcquire(int)实现,成功返回。
  3      * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)。
  4      */
  5     public final void acquire(int arg) {
  6         if (!tryAcquire(arg) &&
  7                 acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
  8             selfInterrupt();
  9     }
 10
 11     /**
 12      * 以独占模式获取,优先中断。首先检查中断状态,然后调用至少一次tryAcquire(int)实现,成功返回。
 13      * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)。
 14      */
 15     public final void acquireInterruptibly(int arg)
 16             throws InterruptedException {
 17         if (Thread.interrupted())
 18             throw new InterruptedException();
 19         if (!tryAcquire(arg))
 20             doAcquireInterruptibly(arg);
 21     }
 22
 23     /**
 24      * 以独占模式获取,优先中断,如果超时则失败。
 25      * 首先检查中断状态,然后调用至少一次tryAcquire(int)实现,成功返回。
 26      * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)。
 27      */
 28     public final boolean tryAcquireNanos(int arg, long nanosTimeout)
 29             throws InterruptedException {
 30         if (Thread.interrupted())
 31             throw new InterruptedException();
 32         return tryAcquire(arg) ||
 33                 doAcquireNanos(arg, nanosTimeout);
 34     }
 35
 36     /**
 37      * 以独占模式释放同步状态,调用tryRelease(int)实现。
 38      * 该方法会在释放同步状态成功后,将同步队列中的队首节点(这时已更新队首节点)包含的线程唤醒。
 39      */
 40     public final boolean release(int arg) {
 41         if (tryRelease(arg)) {
 42             AbstractQueuedSynchronizer.Node h = head;
 43             if (h != null && h.waitStatus != 0)
 44                 unparkSuccessor(h);
 45             return true;
 46         }
 47         return false;
 48     }
 49
 50     /**
 51      * 以共享模式获取,忽略中断。 通过调用至少一次tryAcquireShared(int)实现,成功返回。
 52      * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquireShared(int)。
 53      */
 54     public final void acquireShared(int arg) {
 55         if (tryAcquireShared(arg) < 0)
 56             doAcquireShared(arg);
 57     }
 58
 59     /**
 60      * 以共享模式获取,优先中断。 通过调用至少一次tryAcquireShared(int)实现,成功返回。
 61      * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquireShared(int)。
 62      */
 63     public final void acquireSharedInterruptibly(int arg)
 64             throws InterruptedException {
 65         if (Thread.interrupted())
 66             throw new InterruptedException();
 67         if (tryAcquireShared(arg) < 0)
 68             doAcquireSharedInterruptibly(arg);
 69     }
 70
 71     /**
 72      * 以共享模式获取,优先中断,如果超时则失败。
 73      * 通过调用至少一次tryAcquireShared(int)实现,成功返回。
 74      * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquireShared(int)。
 75      */
 76     public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
 77             throws InterruptedException {
 78         if (Thread.interrupted())
 79             throw new InterruptedException();
 80         return tryAcquireShared(arg) >= 0 ||
 81                 doAcquireSharedNanos(arg, nanosTimeout);
 82     }
 83
 84     /** 以共享模式释放同步状态。*/
 85     public final boolean releaseShared(int arg) {
 86         if (tryReleaseShared(arg)) {
 87             doReleaseShared();
 88             return true;
 89         }
 90         return false;
 91     }
 92
 93     /** 获取在等待队列上的线程集合。*/
 94     public final Collection<Thread> getQueuedThreads() {
 95         ArrayList<Thread> list = new ArrayList<Thread>();
 96         for (AbstractQueuedSynchronizer.Node p = tail; p != null; p = p.prev) {
 97             Thread t = p.thread;
 98             if (t != null)
 99                 list.add(t);
100         }
101         return list;
102     }

不管是共享锁还是独占锁,或者公平锁还是非公平锁,锁的逻辑都是一样的:线程获取锁 --> 成功继续执行,不成功阻塞等待 --> 释放锁 --> 后续线程获取锁。所以AQS采用模板方法的设计模式是非常适合的,让子类重写tryAcquire(),tryRelease(),tryAcquireShared(),tryReleaseShared()方法,不同的锁可以有不同的实现方式,比如ReentrantLock的内部类NonfairSync和FairSync对 tryAcquire()的实现方式就不同,一个是非公平方式,一个是公平方式;然后AQS中的模板方法 ,如果是获取锁的方法,就调用相应的子类中重写的获取锁的方法,如果是 释放锁的方法,就调用相应的子类中重写的释放锁的方法。这样子类重写的方法就被"隐藏"了,只需要调用AQS中的几个重要的模板方法就可以了。

三、AQS的同步队列

当线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时阻塞当前线程;当同步状态释放时,会把队首节点中的线程唤醒,使其再次获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,AQS的内部内Node的定义如下:

 1 static final class Node {
 2     /** 表示节点使用共享模式等待 */
 3     static final Node SHARED = new Node();
 4     /** 表示节点使用独占模式等待 */
 5     static final Node EXCLUSIVE = null;
 6
 7     /** 等待状态可能值,由于在同步队列中等待的线程等待超时或者被中断,该节点不会参与
 8      同步状态的竞争,需要从同步队列中取消等待,节点进入该状态后将不会再变化; */
 9     static final int CANCELLED =  1;
10     /** 等待状态可能值,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态
11      或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
12     static final int SIGNAL    = -1;
13     /** 等待状态可能值,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition
14      * 调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中;*/
15     static final int CONDITION = -2;
16     /**
17      * 等待状态可能值,表示下一次共享式同步状态获取将会无条件地传播下去;
18      */
19     static final int PROPAGATE = -3;
20
21     /** 等待状态值 */
22     volatile int waitStatus;
23
24     /**
25      * 前驱节点
26      */
27     volatile Node prev;
28
29     /**
30      * 后继节点
31      */
32     volatile Node next;
33
34     /**
35      * 获取同步状态的线程,使用后置null.
36      */
37     volatile Thread thread;
38
39     /**
40      * 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,
41      * 也就是说节点类型(独占和共享)和等待队列中的后继节点公用同一个字段
42      */
43      Node nextWaiter;
44
45     /**
46      * Returns true if node is waiting in shared mode.
47      */
48     final boolean isShared() {
49         return nextWaiter == SHARED;
50     }
51
52     /**
53      * 返回前驱节点
54      */
55     final Node predecessor() throws NullPointerException {
56         Node p = prev;
57         if (p == null)
58             throw new NullPointerException();
59         else
60             return p;
61     }
62
63     Node() {    // Used to establish initial head or SHARED marker
64     }
65
66     Node(Thread thread, Node mode) {     // Used by addWaiter
67         this.nextWaiter = mode;
68         this.thread = thread;
69     }
70
71     Node(Thread thread, int waitStatus) { // Used by Condition
72         this.waitStatus = waitStatus;
73         this.thread = thread;
74     }
75 }

节点是构成同步队列的基础,同步器用有首节点head和尾节点tail。同步队列的基本结构入下图所示:

3.1 入队

获取同步状态失败的线程要被构造成节点,并加入到同步队列尾部,加入队列的过程要保证线程安全,同步器提供了基于CAS的设置节点的方法:

1     /**
2      * 仅用于入队。
3      */
4     private final boolean compareAndSetTail(Node expect, Node update) {
5         return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
6     }

添加节点的操作通过addWaiter(Node)方法完成,源码如下:

 1     /**
 2      * 为当前线程创建一个节点并入列,然后返回这个节点
 3      * 使用CAS入队并设置尾节点
 4      */
 5     private Node addWaiter(Node mode) {
 6         Node node = new Node(Thread.currentThread(), mode);
 7         // Try the fast path of enq; backup to full enq on failure
 8         Node pred = tail;
 9         //如果队尾不为null,则尝试插入队列
10         if (pred != null) {
11             node.prev = pred;
12             if (compareAndSetTail(pred, node)) {
13                 pred.next = node;
14                 return node;
15             }
16         }
17         //如果队尾为null,则调用enq(Node)方法插入
18         enq(node);
19         return node;
20     }
当同步队列存在时,addWaiter(Node)方法使用快速入队,即将构造好的node的前驱指针指向当前尾节点,然后通过CAS操作将刚构造的node作为新的尾节点,再把原尾节点的后继指针指向新尾节点;否则,采用enq(Node)入队。enq(Node)源码如下:
 1     private Node enq(final Node node) {
 2         for (;;) {
 3             Node t = tail;
 4             if (t == null) { // Must initialize
 5                 // 队列必须初始化,若有多个线程并发执行此操作,
 6                 // 通过CAS能保证只有一个线程执行成功
 7                 if (compareAndSetHead(new Node()))
 8                     tail = head;
 9             } else {
10                 // 采用快速入队的方式入队
11                 node.prev = t;
12                 if (compareAndSetTail(t, node)) {
13                     t.next = node;
14                     return t;
15                 }
16             }
17         }
18     }

enq(Node)方法使用死循环和CAS的方式来保证节点正确添加到同步队列中。同步器将节点加入到同步队列的过程如下图:

3.2 出队

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头结点的方法不需要使用CAS操作来保证线程安全,它只需要将首节点设置成原首节点的后继节点并断开原首节点的next引用即可。设置首节点的操作通过setHead(Node)方法来完成,源码如下:

1     private void setHead(Node node) {
2         head = node;
3         node.thread = null; //处于 GC 考虑
4         node.prev = null;
5     }

设置首节点的过程如下图所示:

 

 

原文地址:https://www.cnblogs.com/yocapl/p/12397056.html

时间: 2024-11-09 06:21:18

队列同步器——AQS的相关文章

Java并发包下锁学习第二篇Java并发基础框架-队列同步器介绍

Java并发包下锁学习第二篇队列同步器 还记得在第一篇文章中,讲到的locks包下的类结果图吗?如下图: ? 从图中,我们可以看到AbstractQueuedSynchronizer这个类很重要(在本文中,凯哥就用AQS来代替这个类).我们先来了解这个类.对这个类了解之后,学习后面的会更容易了. 本篇是<凯哥(凯哥Java:kagejava)并发编程学习>系列之<Lock系列>教程的第一篇:<Java并发包下锁学习第二篇:队列同步器>. 本文主要内容:同步器介绍:同步器

秋招之路8:JAVA锁体系和AQS抽象队列同步器

整个的体系图 悲观锁,乐观锁 是一个广义概念:体现的是看待线程同步的不同角度. 悲观锁 认为在自己使用数据的时候一定有别的线程来修改数据,在获取数据的时候会先加锁,确保数据不被别的线程修改. 实现:关键字synchronized,接口Lock的实现类 适用场景:写操作多,先加锁可以保证写操作时的数据正确. 乐观锁 认为自己在使用数据的时候不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据. 实现:CAS算法,例如AtomicInteger类的原子自

[Java并发] AQS抽象队列同步器源码解析--锁获取过程

要深入了解java并发知识,AbstractQueuedSynchronizer(AQS)是必须要拿出来深入学习的,AQS可以说是贯穿了整个JUC并发包,例如ReentrantLock,CountDownLatch,CyclicBarrier等并发类都涉及到了AQS.接下来就对AQS的实现原理进行分析. 在开始分析之前,势必先将CLH同步队列了解一下 CLH同步队列 CLH自旋锁: CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁,能确保无饥饿性,提

队列同步器AbstractQueueSynchronizer

同步器是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作. 同步器主要使用的方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态.子类推荐被定义为自定义同步组件的静态内部类,同步器本身没有实现任何同步接口,仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用. 同步器的设计是基于模版方法模式的,即使用者需要继承同步器并重写指定的方法.同步器支持独占式获取同步状态和共享式获取同步状态 Java.co

java并发编程的艺术——第五章总结(Lock锁与队列同步器)

Lock锁 锁是用来控制多个线程访问共享资源的方式. 一般来说一个锁可以防止多个线程同时访问共享资源(但有些锁可以允许多个线程访问共享资源,如读写锁). 在Lock接口出现前,java使用synchronized关键字实现锁的功能,但是在javaSE5之后,并发包中提供了Lock接口(以及其实现类)用来实现锁的功能. Lock提供了与synchronized相似的功能,但必须显示的获取锁与释放锁,虽然不及隐式操作方便,但是拥有了锁获取与释放的可操作性.可中断的锁获取与超时获取锁等多重功能. 提供

队列同步器

队列同步器AbstractQueueSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础. 同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义.可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细

AQS(队列同步器)

目录导引: 一.简介 二.源码解析(JDK8) 三.运用示例 一.简介 AQS(AbstractQueuedSynchronizer)的核心思想是基于volatile int state变量,配合Unsafe工具对其原子性的操作来实现对当前state状态值进行修改. 同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作. 同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法: getState() 获取当前

同步器AQS解析

什么是同步器? 多线程并发执行,通过某种共享状态来同步,当共享状态满足某种条件,才能触发线程开始执行操作. AbstractQueuedSynchronizer(AQS) 这是一个抽象类,它提供多线程下不同共享模式的操作,实现它可以自定义同步器. 可以看出,ReentrantLock和ReentrantReadWriteLock的内部类都是继承于这个抽象类. public abstract class AbstractQueuedSynchronizer extends AbstractOwna

JUC回顾之-AQS同步器的实现原理

1.什么是AQS? AQS的核心思想是基于volatile int state这样的volatile变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改.同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作. 2.同步器的应用 同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法: getState() 获取当前的同步状态 setState(int newState) 设置当前同步状态 co