Java并发编程原理与实战十九:AQS 剖析

一、引言
在JDK1.5之前,一般是靠synchronized关键字来实现线程对共享变量的互斥访问。synchronized是在字节码上加指令,依赖于底层操作系统的Mutex Lock实现。
而从JDK1.5以后java界的一位大神—— Doug Lea 开发了AbstractQueuedSynchronizer(AQS)组件,使用原生java代码实现了synchronized语义。换句话说,Doug Lea没有使用更“高级”的机器指令,也不依靠JDK编译时的特殊处理,仅用一个普普通通的类就完成了代码块的并发访问控制,比那些费力不讨好的实现不知高到哪里去了。
java.util.concurrent包有多重要无需多言,一言以蔽之,是Doug Lea大爷对天下所有Java程序员的怜悯。
AQS定义了一套多线程访问共享资源的同步器框架,是整个java.util.concurrent包的基石,Lock、ReadWriteLock、CountDowndLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor等都是在AQS的基础上实现的。

二、原理

2.1实现原理

并发控制的核心是锁的获取与释放,锁的实现方式有很多种,AQS采用的是一种改进的CLH锁。

2.2 CLH锁

CLH(Craig, Landin, andHagersten locks)是一钟自旋锁,能确保无饥饿性,提供先来先服务的公平性。

何谓自旋锁?它是为实现保护共享资源而提出一种锁机制。其实,自旋锁与互斥锁比较类似,它们都是为了解决对某项资源的互斥使用。无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个保持者,也就是说,在任何时刻最多只能有一个执行单元获得锁。但是两者在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,“自旋”一词就是因此而得名

CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

CLH队列中的结点QNode中含有一个locked字段,该字段若为true表示该线程需要获取锁,且不释放锁,为false表示线程释放了锁。结点之间是通过隐形的链表相连,之所以叫隐形的链表是因为这些结点之间没有明显的next指针,而是通过myPred所指向的结点的变化情况来影响myNode的行为。CLHLock上还有一个尾指针,始终指向队列的最后一个结点。

当一个线程需要获取锁时,会创建一个新的QNode,将其中的locked设置为true表示需要获取锁,然后使自己成为队列的尾部,同时获取一个指向其前趋的引用myPred,然后该线程就在前趋结点的locked字段上旋转,直到前趋结点释放锁。当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前趋结点。如上图所示,线程A需要获取锁,其myNode域为true,些时tail指向线程A的结点,然后线程B也加入到线程A后面,tail指向线程B的结点。然后线程A和B都在它的myPred域上旋转,一旦它的myPred结点的locked字段变为false,它就可以获取锁。

2.3 AQS数据模型

AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

AQS的内部队列是CLH同步锁的一种变形。其主要从两方面进行了改造,节点的结构与节点等待机制:
l 在结构上引入了头结点和尾节点,分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,

l 为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段

l 在每个node里面使用一个状态字段来控制阻塞/唤醒,而不是自旋

l head结点使用的是傀儡结点

FIFO队列中的节点有AQS的静态内部类Node定义:

static final class Node {

    // 共享模式
    static final Node SHARED = new Node();

    // 独占模式
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /**
     * CANCELLED,值为1,表示当前的线程被取消
     * SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
     * CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
     * PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
     * 值为0,表示当前节点在sync队列中,等待着获取锁。
     */
    volatile int waitStatus;

    // 前驱结点
    volatile Node prev;

    // 后继结点
    volatile Node next;

    // 与该结点绑定的线程
    volatile Thread thread;

    // 存储condition队列中的后继节点
    Node nextWaiter;

    // 是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取前驱结点
    final Node predecessor() throwsNullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() { // Used to establish initial heador SHARED marker
    }

    Node(Thread thread, Node mode) { // Used byaddWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { //Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node类中有两个常量SHARE和EXCLUSIVE,顾名思义这两个常量用于表示这个结点支持共享模式还是独占模式,共享模式指的是允许多个线程获取同一个锁而且可能获取成功,独占模式指的是一个锁如果被一个线程持有,其他线程必须等待。多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。

2.4 CAS操作

AQS有三个重要的变量:

 // 队头结点
    private transient volatile Node head;

    // 队尾结点
    private transient volatile Node tail;

    // 代表共享资源
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState){
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this,stateOffset, expect, update);
    }

compareAndSetState方法是以乐观锁的方式更新共享资源。

独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,即Compare And Swap。

CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。简单介绍一下这个指令的操作过程:

首先,CPU 会将内存中将要被更改的数据与期望的值做比较。然后,当这两个值相等时,CPU 才会将内存中的数值替换为新的值。否则便不做操作。最后,CPU 会将旧的数值返回。

这一系列的操作是原子的。它们虽然看似复杂,但却是 Java 5 并发机制优于原有锁机制的根本。简单来说,CAS 的含义是“我认为原有的值应该是什么,如果是,则将原有的值更新为新值,否则不做修改,并告诉我原来的值是多少”。

CAS通过调用JNI(Java Native Interface)调用实现的。JNI允许java调用其他语言,而CAS就是借助C语言来调用CPU底层指令实现的。Unsafe是CAS的核心类,它提供了硬件级别的原子操作

Doug Lea大神在java同步器中大量使用了CAS技术,鬼斧神工的实现了多线程执行的安全性。CAS不仅在AQS的实现中随处可见,也是整个java.util.concurrent包的基石。

可以发现,head、tail、state三个变量都是volatile的。

volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。如果一个字段被声明成volatile,Java线程内存模型确保所有线程看到这个变量的值是一致的。

volatile变量也存在一些局限:不能用于构建原子的复合操作,因此当一个变量依赖旧值时就不能使用volatile变量。而CAS呢,恰恰可以提供对共享变量的原子的读写操作。

volatile保证共享变量的可见性,CAS保证更新操作的原子性,简直是绝配!把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:

1.       首先,声明共享变量为volatile;

2.       然后,使用CAS的原子条件更新来实现线程之间的同步;

3.       同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent包的实现示意图如下:

三、源码解读 (java特种兵)

AQS的全称为(AbstractQueuedSynchronizer),这个类也是在java.util.concurrent.locks下面。这个类似乎很不容易看懂,因为它仅仅是提供了一系列公共的方法,让子类来调用。那么要理解意思,就得从子类下手,反过来看才容易看懂。如下图所示:

这么多类,我们看那一个?刚刚提到过锁(Lock),我们就从锁开始吧。这里就先以ReentrantLock排它锁为例开始展开讲解如何利用AQS的,然后再简单介绍读写锁的要点(读写锁本身的实现十分复杂,要完全说清楚需要大量的篇幅来说明)。
首先来看看ReentrantLock的构造方法,它的构造方法有两个,如下图所示:

很显然,对象中有一个属性叫sync,有两种不同的实现类,默认是“NonfairSync”来实现,而另一个“FairSync”它们都是排它锁的内部类,不论用那一个都能实现排它锁,只是内部可能有点原理上的区别。先以“NonfairSync”类为例,它的lock()方法是如何实现的呢?

lock()方法先通过CAS尝试将状态从0修改为1。若直接修改成功,前提条件自然是锁的状态为0,则直接将线程的OWNER修改为当前线程,这是一种理想情况,如果并发粒度设置适当也是一种乐观情况。
若上一个动作未成功,则会间接调用了acquire(1)来继续操作,这个acquire(int)方法就是在AbstractQueuedSynchronizer当中了。这个方法表面上看起来简单,但真实情况比较难以看懂,因为第一次看这段代码可能不知道它要做什么!不急,一步一步来分解。
首先看tryAcquire(arg)这里的调用(当然传入的参数是1),在默认的“NonfairSync”实现类中,会这样来实现:

○ 首先获取这个锁的状态,如果状态为0,则尝试设置状态为传入的参数(这里就是1),若设置成功就代表自己获取到了锁,返回true了。状态为0设置1的动作在外部就有做过一次,内部再一次做只是提升概率,而且这样的操作相对锁来讲不占开销。
○ 如果状态不是0,则判定当前线程是否为排它锁的Owner,如果是Owner则尝试将状态增加acquires(也就是增加1),如果这个状态值越界,则会抛出异常提示,若没有越界,将状态设置进去后返回true(实现了类似于偏向的功能,可重入,但是无需进一步征用)。
○ 如果状态不是0,且自身不是owner,则返回false。

对tryAcquire()的调用判定中是通过if(!tryAcquire())作为第1个条件的,如果返回true,则判定就不会成立了,自然后面的acquireQueued动作就不会再执行了,如果发生这样的情况是最理想的。
无论多么乐观,征用是必然存在的,如果征用存在则owner自然不会是自己,tryAcquire()方法会返回false,接着就会再调用方法:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)做相关的操作。
这个方法的调用的代码更不好懂,需要从里往外看,这里的Node.EXCLUSIVE是节点的类型,看名称应该清楚是排它类型的意思。接着调用addWaiter()来增加一个排它锁类型的节点,这个addWaiter()的代码是这样写的:

这里创建了一个Node的对象,将当前线程和传入的Node.EXCLUSIVE传入,也就是说Node节点理论上包含了这两项信息。代码中的tail是AQS的一个属性,刚开始的时候肯定是为null,也就是不会进入第一层if判定的区域,而直接会进入enq(node)的代码,那么直接来看看enq(node)的代码。

看到了tail就应该猜到了AQS是链表吧,没错,而且它还应该有一个head引用来指向链表的头节点,AQS在初始化的时候head、tail都是null,在运行时来回移动。此时,我们最少至少知道AQS是一个基于状态(state)的链表管理方式。

首先这个是一个死循环,而且本身没有锁,因此可以有多个线程进来,假如某个线程进入方法,此时head、tail都是null,自然会进入if(t == null)所在的代码区域,这部分代码会创建一个Node出来名字叫h,这个Node没有像开始那样给予类型和线程,很明显是一个空的Node对象,而传入的Node对象首先被它的next引用所指向,此时传入的node和某一个线程创建的h对象如下图所示。

刚才我们很理想的认为只有一个线程会出现这种情况,如果有多个线程并发进入这个if判定区域,可能就会同时存在多个这样的数据结构,在各自形成数据结构后,多个线程都会去做compareAndSetHead(h)的动作,也就是尝试将这个临时h节点设置为head,显然并发时只有一个线程会成功,因此成功的那个线程会执行tail = node的操作,整个AQS的链表就成为:

有一个线程会成功修改head和tail的值,其它的线程会继续循环,再次循环就不会进入if (t == null)的逻辑了,而会进入else语句的逻辑中。
在else语句所在的逻辑中,第一步是node.prev = t,这个t就是tail的临时值,也就是首先让尝试写入的node节点的prev指针指向原来的结束节点,然后尝试通过CAS替换掉AQS中的tail的内容为当前线程的Node,无论有多少个线程并发到这里,依然只会有一个能成功,成功者执行t.next = node,也就是让原先的tail节点的next引用指向现在的node,现在的node已经成为了最新的结束节点,不成功者则会继续循环。
简单使用图解的方式来说明,3个步骤如下所示,如下图所示:

插入多个节点的时候,就以此类推了哦,总之节点都是在链表尾部写入的,而且是线程安全的。
知道了AQS大致的写入是一种双向链表的插入操作,但插入链表节点对锁有何用途呢,我们还得退回到前面的代码中addWaiter方法最终返回了要写入的node节点, 再回退到图5-17中所在的代码中需要将这个返回的node节点作为acquireQueued方法入口参数,并传入另一个参数(依然是1),看看它里面到底做了些什么?请看下图:

这里也是一个死循环,除非进入if(p == head && tryAcquire(arg))这个判定条件,而p为node.predcessor()得到,这个方法返回node节点的前一个节点,也就是说只有当前一个节点是head的时候,进一步尝试通过tryAcquire(arg)来征用才有机会成功。tryAcquire(arg)这个方法我们前面介绍过,成立的条件为:锁的状态为0,且通过CAS尝试设置状态成功或线程的持有者本身是当前线程才会返回true,我们现在来详细拆分这部分代码。
○ 如果这个条件成功后,发生的几个动作包含:
(1) 首先调用setHead(Node)的操作,这个操作内部会将传入的node节点作为AQS的head所指向的节点。线程属性设置为空(因为现在已经获取到锁,不再需要记录下这个节点所对应的线程了),再将这个节点的perv引用赋值为null。
(2) 进一步将的前一个节点的next引用赋值为null。
在进行了这样的修改后,队列的结构就变成了以下这种情况了,通过这样的方式,就可以让执行完的节点释放掉内存区域,而不是无限制增长队列,也就真正形成FIFO了:

○ 如果这个判定条件失败
会首先判定:“shouldParkAfterFailedAcquire(p , node)”,这个方法内部会判定前一个节点的状态是否为:“Node.SIGNAL”,若是则返回true,若不是都会返回false,不过会再做一些操作:判定节点的状态是否大于0,若大于0则认为被“CANCELLED”掉了(我们没有说明几个状态的值,不过大于0的只可能被CANCELLED的状态),因此会从前一个节点开始逐步循环找到一个没有被“CANCELLED”节点,然后与这个节点的next、prev的引用相互指向;如果前一个节点的状态不是大于0的,则通过CAS尝试将状态修改为“Node.SIGNAL”,自然的如果下一轮循环的时候会返回值应该会返回true。
如果这个方法返回了true,则会执行:“parkAndCheckInterrupt()”方法,它是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。
相应的,可以自己看看FairSync实现类的lock方法,其实区别不大,有些细节上的区别可能会决定某些特定场景的需求,你也可以自己按照这样的思路去实现一个自定义的锁。
接下来简单看看unlock()解除锁的方式,如果获取到了锁不释放,那自然就成了死锁,所以必须要释放,来看看它内部是如何释放的。同样从排它锁(ReentrantLock)中的unlock()方法开始,请先看下面的代码截图:

通过tryRelease(int)方法进行了某种判定,若它成立则会将head传入到unparkSuccessor(Node)方法中并返回true,否则返回false。首先来看看tryRelease(int)方法,如下图所示:

这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。
这一点大家写代码要注意了哦,如果是在循环体中lock()或故意使用两次以上的lock(),而最终只有一次unlock(),最终可能无法释放锁。在本书的src/chapter05/locks/目录下有相应的代码,大家可以自行测试的哦。
在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是已经执行完的节点,在后面阐述这个方法的body的时候都叫head节点),内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入图 5-24中的循环进一步尝试tryAcquire()方法来获取锁,但是也未必能完全获取到哦,因为此时也可能有一些外部的请求正好与之征用,而且还奇迹般的成功了,那这个线程的运气就有点悲剧了,不过通常乐观认为不会每一次都那么悲剧。
再看看共享锁,从前面的排它锁可以看得出来是用一个状态来标志锁的,而共享锁也不例外,但是Java不希望去定义两个状态,所以它与排它锁的第一个区别就是在锁的状态上,它用int来标志锁的状态,int有4个字节,它用高16位标志读锁(共享锁),低16位标志写锁(排它锁),高16位每次增加1相当于增加65536(通过1 << 16得到),自然的在这种读写锁中,读锁和写锁的个数都不能超过65535个(条件是每次增加1的,如果递增是跳跃的将会更少)。在计算读锁数量的时候将状态左移16位,而计算排它锁会与65535“按位求与”操作,如下图所示。

写锁的功能与“ReentrantLock”基本一致,区域在于它会在tryAcquire操作的时候,判定状态的时候会更加复杂一些(因此有些时候它的性能未必好)。
读锁也会写入队列,Node的类型被改为:“Node.SHARED”这种类型,lock()时候调用的是AQS的acquireShared(int)方法,进一步调用tryAcquireShared()操作里面只需要检测是否有排它锁,如果没有则可以尝试通过CAS修改锁的状态,如果没有修改成功,则会自旋这个动作(可能会有很多线程在这自旋开销CPU)。如果这个自旋的过程中检测到排它锁竞争成功,那么tryAcquireShared()会返回-1,从而会走如排它锁的Node类似的流程,可能也会被park住,等待排它锁相应的线程最终调用unpark()动作来唤醒。
这就是Java提供的这种读写锁,不过这并不是共享锁的诠释,在共享锁里面也有多种机制 ,或许这种读写锁只是其中一种而已。在这种锁下面,读和写的操作本身是互斥的,但是读可以多个一起发生。这样的锁理论上是非常适合应用在“读多写少”的环境下(当然我们所讲的读多写少是读的比例远远大于写,而不是多一点点),理论上讲这样锁征用的粒度会大大降低,同时系统的瓶颈会减少,效率得到总体提升。
在本节中我们除了学习到AQS的内在,还应看到Java通过一个AQS队列解决了许多问题,这个是Java层面的队列模型,其实我们也可以利用许多队列模型来解决自己的问题,甚至于可以改写模型模型来满足自己的需求.

关于Lock及AQS的一些补充:
1、 Lock的操作不仅仅局限于lock()/unlock(),因为这样线程可能进入WAITING状态,这个时候如果没有unpark()就没法唤醒它,可能会一直“睡”下去,可以尝试用tryLock()、tryLock(long , TimeUnit)来做一些尝试加锁或超时来满足某些特定场景的需要。例如有些时候发现尝试加锁无法加上,先释放已经成功对其它对象添加的锁,过一小会再来尝试,这样在某些场合下可以避免“死锁”哦。
2、 lockInterruptibly() 它允许抛出InterruptException异常,也就是当外部发起了中断操作,程序内部有可能会抛出这种异常,但是并不是绝对会抛出异常的,大家仔细看看代码便清楚了。
3、 newCondition()操作,是返回一个Condition的对象,Condition只是一个接口,它要求实现await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll()方法,AbstractQueuedSynchronizer中有一个内部类叫做ConditionObject实现了这个接口,它也是一个类似于队列的实现,具体可以参考源码。大多数情况下可以直接使用,当然觉得自己比较牛逼的话也可以参考源码自己来实现。
4、 在AQS的Node中有每个Node自己的状态(waitStatus),我们这里归纳一下,分别包含:
SIGNAL 从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己,值为:-1
CANCELLED 当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED的意思。
CONDITION 线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来激活,值为-2。
PROPAGATE 读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个doReleaseShared()动作,内部也是一个循环,当判定后续的节点状态为0时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。
状态0 初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态。

代码上的理解注释,可以参考:http://ifeve.com/juc-aqs-reentrantlock/

四 、AQS和CAS 对比

CAS(Compare And Swap)

什么是CAS

CAS(Compare And Swap),即比较并交换。是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。

在JAVA中,sun.misc.Unsafe 类提供了硬件级别的原子操作来实现这个CAS。 java.util.concurrent 包下的大量类都使用了这个 Unsafe.java 类的CAS操作。至于 Unsafe.java 的具体实现这里就不讨论了。

CAS典型应用

java.util.concurrent.atomic 包下的类大多是使用CAS操作来实现的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong)。下面以 AtomicInteger.java 的部分实现来大致讲解下这些原子类的实现。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    private volatile int value;// 初始int大小
    // 省略了部分代码...

    // 带参数构造函数,可设置初始int大小
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    // 不带参数构造函数,初始int大小为0
    public AtomicInteger() {
    }

    // 获取当前值
    public final int get() {
        return value;
    }

    // 设置值为 newValue
    public final void set(int newValue) {
        value = newValue;
    }

    //返回旧值,并设置新值为 newValue
    public final int getAndSet(int newValue) {
        /**
        * 这里使用for循环不断通过CAS操作来设置新值
        * CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系
        * */
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

    // 原子的设置新值为update, expect为期望的当前的值
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    // 获取当前值current,并设置新值为current+1
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    // 此处省略部分代码,余下的代码大致实现原理都是类似的
}

一般来说在竞争不是特别激烈的时候,使用该包下的原子操作性能比使用 synchronized 关键字的方式高效的多(查看getAndSet(),可知如果资源竞争十分激烈的话,这个for循环可能换持续很久都不能成功跳出。不过这种情况可能需要考虑降低资源竞争才是)。 
在较多的场景我们都可能会使用到这些原子类操作。一个典型应用就是计数了,在多线程的情况下需要考虑线程安全问题。通常第一映像可能就是:

public class Counter {
    private int count;
    public Counter(){}
    public int getCount(){
        return count;
    }
    public void increase(){
        count++;
    }
}

上面这个类在多线程环境下会有线程安全问题,要解决这个问题最简单的方式可能就是通过加锁的方式,调整如下:

public class Counter {
    private int count;
    public Counter(){}
    public synchronized int getCount(){
        return count;
    }
    public synchronized void increase(){
        count++;
    }
}

这类似于悲观锁的实现,我需要获取这个资源,那么我就给他加锁,别的线程都无法访问该资源,直到我操作完后释放对该资源的锁。我们知道,悲观锁的效率是不如乐观锁的,上面说了Atomic下的原子类的实现是类似乐观锁的,效率会比使用 synchronized 关系字高,推荐使用这种方式,实现如下:

public class Counter {
    private AtomicInteger count = new AtomicInteger();
    public Counter(){}
    public int getCount(){
        return count.get();
    }
    public void increase(){
        count.getAndIncrement();
    }
}

AQS(AbstractQueuedSynchronizer)

什么是AQS

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果你有看过类似 CountDownLatch 类的源码实现,会发现其内部有一个继承了 AbstractQueuedSynchronizer 的内部类 Sync。可见 CountDownLatch 是基于AQS框架来实现的一个同步器.类似的同步器在JUC下还有不少。(eg. Semaphore)

AQS用法

如上所述,AQS管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)

 To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>

如JDK的文档中所说,使用AQS来实现一个同步器需要覆盖实现如下几个方法,并且使用getState,setState,compareAndSetState这几个方法来设置获取状态 
1. boolean tryAcquire(int arg) 
2. boolean tryRelease(int arg) 
3. int tryAcquireShared(int arg) 
4. boolean tryReleaseShared(int arg) 
5. boolean isHeldExclusively()

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquire、 tryReleaseisHeldExclusively而支持共享获取的同步器应该实现tryAcquireSharedtryReleaseSharedisHeldExclusively。下面以 CountDownLatch 举例说明基于AQS实现同步器, CountDownLatch 用同步状态持有当前计数,countDown方法调用 release从而导致计数器递减;当计数器为0时,解除所有线程的等待;await调用acquire,如果计数器为0,acquire 会立即返回,否则阻塞。通常用于某任务需要等待其他任务都完成后才能继续执行的情景。源码如下:

public class CountDownLatch {
    /**
     * 基于AQS的内部Sync
     * 使用AQS的state来表示计数count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // 使用AQS的getState()方法设置状态
            setState(count);
        }

        int getCount() {
            // 使用AQS的getState()方法获取状态
            return getState();
        }

        // 覆盖在共享模式下尝试获取锁
        protected int tryAcquireShared(int acquires) {
            // 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1
            return (getState() == 0) ? 1 : -1;
        }

        // 覆盖在共享模式下尝试释放锁
        protected boolean tryReleaseShared(int releases) {
            // 在for循环中Decrement count直至成功;
            // 当状态值即count为0的时候,返回false表示 signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    // 使用给定计数值构造CountDownLatch
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 让当前线程阻塞直到计数count变为0,或者线程被中断
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // count递减
    public void countDown() {
        sync.releaseShared(1);
    }

    // 获取当前count值
    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

参考资料:

http://ifeve.com/java-special-troops-aqs/

原文地址:https://www.cnblogs.com/pony1223/p/9460936.html

时间: 2024-09-28 03:10:26

Java并发编程原理与实战十九:AQS 剖析的相关文章

Java并发编程原理与实战视频教程

14套java精品高级架构课,缓存架构,深入Jvm虚拟机,全文检索Elasticsearch,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件,Mysql分布式集群,服务架构,运 维架构视频教程 14套精品课程介绍: 1.14套精 品是最新整理的课程,都是当下最火的技术,最火的课程,也是全网课程的精品: 2.14套资 源包含:全套完整高清视频.完整源码.配套文档: 3.知识也 是需要投资的,有投入才会有产出(保证投入产出比是

Java并发编程原理与实战

Java并发编程原理与实战网盘地址:https://pan.baidu.com/s/1c3mpC7A 密码: pe62备用地址(腾讯微云):https://share.weiyun.com/11ea938c7ad43783a934ed1d492eed8d 密码:ogHukS 原文地址:http://blog.51cto.com/13406637/2071116

Java并发编程原理与实战八:产生线程安全性问题原因(javap字节码分析)

前面我们说到多线程带来的风险,其中一个很重要的就是安全性,因为其重要性因此,放到本章来进行讲解,那么线程安全性问题产生的原因,我们这节将从底层字节码来进行分析. 一.问题引出 先看一段代码 package com.roocon.thread.t3; public class Sequence { private int value; public int getNext(){ return value++; } public static void main(String[] args) { S

Java并发编程原理与实战二十九:Exchanger

一.简介 前面三篇博客分别介绍了CyclicBarrier.CountDownLatch.Semaphore,现在介绍并发工具类中的最后一个Exchange.Exchange是最简单的也是最复杂的,简单在于API非常简单,就一个构造方法和两个exchange()方法,最复杂在于它的实现是最复杂的.在API是这么介绍的:可以在对中对元素进行配对和交换的线程的同步点.每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象.Exchanger 可能被

Java并发编程原理与实战二十:线程安全性问题简单总结

一.出现线程安全性问题的条件 •在多线程的环境下 •必须有共享资源 •对共享资源进行非原子性操作 二.解决线程安全性问题的途径 •synchronized (偏向锁,轻量级锁,重量级锁) •volatile •JDK提供的原子类 •使用Lock(共享锁,排它锁) 三.认识的“*锁” •偏向锁 Java偏向锁(Biased Locking)是Java6引入的一项多线程优化. 偏向锁,顾名思义,它会偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不

Java并发编程原理与实战四十五:问题定位总结

背景   “线下没问题的”. “代码不可能有问题 是系统原因”.“能在线上远程debug么”    线上问题不同于开发期间的bug,与运行时环境.压力.并发情况.具体的业务相关.对于线上的问题利用线上环境可用的工具,收集必要信息 对定位问题十分重要.    对于导致问题的bug.资源瓶颈很难直观取得数据,需要根据资源使用数据.日志等信息推测问题根源.并且疑难问题的定位通常需要使用不同的方法追根溯源.    这篇wiki我对自己使用过的工具做了整理,并分享一些案例. 1.  常见问题1.1 可用性

Java并发编程原理与实战二十六:闭锁 CountDownLatch

关于闭锁 CountDownLatch 之前在网上看到过一篇举例非常形象的例子,但不记得是出自哪里了,所以这里就当自己再重新写一篇吧: 例子如下: 我们每天起早贪黑的上班,父母每天也要上班,有一天定了一个饭店,一家人一起吃个饭,通知大家下班去饭店集合. 假设:3个人在不同的地方上班,必须等到3个人到场才能吃饭,用程序如何实现呢? 方式一: public class Test1 { /** * 模拟爸爸去饭店 */ public static void fatherToRes() { System

Java并发编程原理与实战四十四:final域的内存语义

一.final域的重排序规则 对于final域,编译器和处理器要遵循两个重拍序规则: 1.在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序. 2.初次读一个包含final域的对象的应用,与随后初次读这个final域,这两个操作之间不能重排序 下面通过一个示例来分别说明这两个规则: public class FinalTest { int i;//普通变量 final int j; static FinalTest obj; publi

Java并发编程原理与实战六:主线程等待子线程解决方案

本文将研究的是主线程等待所有子线程执行完成之后再继续往下执行的解决方案 public class TestThread extends Thread { public void run() { System.out.println(this.getName() + "子线程开始"); try { // 子线程休眠五秒 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.