The java.util.concurrent Synchronizer Framework Base Class—AbstractQueuedSynchronizer

1. AQS简介

AQS是Java并发类库的基础,其提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作:

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
  • java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
  • java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子类推荐被定义为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。

同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。

2. CLH算法

锁的实现是以CLH算法为基础。下面简单介绍一下CLH算法:

CLH算法构建了隐式的链表,是一种非阻塞算法的实现。CLH队列中的结点QNode中含有一个locked字段,该字段若为true表示该线程需要获取锁,且不释放锁,为false表示线程释放了锁。结点之间是通过隐形的链表相连,之所以叫隐形的链表是因为这些结点之间没有明显的next指针,而是通过myPred所指向的结点的变化情况来影响myNode的行为。CLHLock上还有一个尾指针,始终指向队列的最后一个结点。CLHLock的类图如下所示:

当一个线程需要获取锁时,会创建一个新的QNode,将其中的locked设置为true表示需要获取锁,然后线程对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前趋的引用myPred,然后该线程就在前趋结点的locked字段上旋转,直到前趋结点释放锁。当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前趋结点。如下图所示,线程A需要获取锁,其myNode域为true,些时tail指向线程A的结点,然后线程B也加入到线程A后面,tail指向线程B的结点。然后线程A和B都在它的myPred域上旋转,一量它的myPred结点的locked字段变为false,它就可以获取锁扫行。明显线程A的myPred
locked域为false,此时线程A获取到了锁。

整个CLH的代码如下,其中用到了ThreadLocal类,将QNode绑定到每一个线程上,同时用到了AtomicReference,对尾指针的修改正是调用它的getAndSet()操作来实现的,它能够保证以原子方式更新对象引用。

CLH算法的示意代码如下:

[java] view
plain
copy

  1. import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
  2. public class CLHLock {
  3. public static class CLHNode {
  4. private boolean isLocked = true; // 默认是在等待锁
  5. }
  6. @SuppressWarnings("unused" )
  7. private volatile CLHNode tail ;
  8. private static final AtomicReferenceFieldUpdater<CLHLock, CLHNode> UPDATER = AtomicReferenceFieldUpdater
  9. . newUpdater(CLHLock.class, CLHNode .class , "tail" );
  10. public void lock(CLHNode currentThread) {
  11. CLHNode preNode = UPDATER.getAndSet( this, currentThread);
  12. if(preNode != null) {//已有线程占用了锁,进入自旋
  13. while(preNode.isLocked ) {
  14. }
  15. }
  16. }
  17. public void unlock(CLHNode currentThread) {
  18. // 如果队列里只有当前线程,则释放对当前线程的引用(for GC)。
  19. if (!UPDATER .compareAndSet(this, currentThread, null)) {
  20. // 还有后续线程
  21. currentThread. isLocked = false ;// 改变状态,让后续线程结束自旋
  22. }
  23. }
  24. }

至于AQS的实现,和CLH略有不同,同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:

[java] view
plain
copy

  1. Node {
  2. int waitStatus;
  3. Node prev;
  4. Node next;
  5. Node nextWaiter;
  6. Thread thread;
  7. }

成员变量主要负责保存该节点的线程引用,同步等待队列(以下简称sync队列)的前驱和后继节点,同时也包括了同步状态。节点成为sync队列和condition队列构建的基础,在同步器中就包含了sync队列。同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。

3.AQS实现分析

3.1 概述

同步器的设计包含获取和释放两个操作:

获取操作过程如下:

if(尝试获取成功){
    return;
 }else{
     加入等待队列;park自己
}

释放操作:

if(尝试释放成功){
    unpark等待队列中第一个节点
}else{
    return false
}

要满足以上两个操作,需要以下3点来支持:

1、原子操作同步状态;

2、阻塞或者唤醒一个线程;

3、内部应该维护一个队列。

AQS的实现采用了模板设计模式,在AbstractQueuedSynchronizer类中,定义了

[java] view
plain
copy

  1. protected boolean tryAcquire(int arg);
  2. protected int tryAcquireShared(int arg);
  3. protected boolean tryRelease(int arg);
  4. protected boolean tryReleaseShared(int arg);

等未具体实现的方法,子类需要实现这些方法,来完成不同的同步器实现。

3.2 获取、释放锁操作

3.2.1 获取操作

获取锁操作的代码如下:

[java] view
plain
copy

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt(); //如果获取锁的过程中有中断,则在获取操作完成后,响应中断。
  5. }

上述逻辑主要包括:

1. 尝试获取(调用tryAcquire更改状态,需要保证原子性);

在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队(通过调用addWaiter方法)

addWaiter方法如下:

[java] view
plain
copy

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);
  3. // Try the fast path of enq; backup to full enq on failure 首先在尾部快速添加,失败后再调用enq方法
  4. Node pred = tail;
  5. if (pred != null) {
  6. node.prev = pred;
  7. if (compareAndSetTail(pred, node)) {  //通过CAS操作,来进行入队操作
  8. pred.next = node;
  9. return node;
  10. }
  11. }
  12. enq(node);
  13. return node;
  14. }

2. 如果获取不到,将当前线程构造成节点Node并加入sync队列;

进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。

3. 再次尝试获取(调用acquireQueued方法),如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

acquireQueued代码如下:

[java] view
plain
copy

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) { //如果为头结点,且获取锁成功,则退出, Note:head其实保存的是已经获取锁的节点,是哑节点
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt()) //<span style="font-family: Arial;">parkAndCheckInterrupt的实现会调用park方法,使当前线程进入等待状态</span>
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

上述逻辑主要包括:

1. 获取当前节点的前驱节点;

需要获取当前节点的前驱节点,而头结点所对应的含义是当前站有锁且正在运行。

2. 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;

如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。

3. 否则进入等待状态。

如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。

需要注意的是,acquire在执行过程中,并不能及时的对外界中断进行相应,必须等待执行完毕之后,如果由外部中断,则进行中断响应。与acquire方法类似,acquireInterruptibly方法提供了获取状态能力,当然在无法获取状态的情况下会进入sync队列进行排队,这类似acquire,但是和acquire不同的地方在于它能够在外界对当前线程进行中断的时候提前结束获取状态的操作,换句话说,就是在类似synchronized获取锁时,外界能够对当前线程进行中断,并且获取锁的这个操作能够响应中断并提前返回。一个线程处于synchronized块中或者进行同步I/O操作时,对该线程进行中断操作,这时该线程的中断标识位被设置为true,但是线程依旧继续运行。

3.2.2 释放操作

释放操作代码如下:

[java] view
plain
copy

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

上述逻辑主要包括:

1. 尝试释放状态;

tryRelease能够保证原子化的将状态设置回去,当然需要使用compareAndSet来保证。如果释放状态成功过之后,将会进入后继节点的唤醒过程。

2. 唤醒当前节点的后继节点所包含的线程。

通过LockSupport的unpark方法将休眠中的线程唤醒,让其继续acquire状态。

The java.util.concurrent Synchronizer Framework Base Class—AbstractQueuedSynchronizer

时间: 2024-10-09 08:00:11

The java.util.concurrent Synchronizer Framework Base Class—AbstractQueuedSynchronizer的相关文章

java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[/xiaozao_web]]

二月 20, 2017 11:30:28 上午 org.apache.tomcat.util.digester.SetPropertiesRule begin警告: [SetPropertiesRule]{Server/Service/Engine/Host/Context} Setting property 'source' to 'org.eclipse.jst.jee.server:xiaozao_web' did not find a matching property.二月 20, 2

聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

这篇说说java.util.concurrent.atomic包里的类,总共12个.网上有非常多文章解析这几个类.这里挑些重点说说. 这12个类能够分为三组: 1. 普通类型的原子变量 2. 数组类型的原子变量 3. 域更新器 普通类型的原子变量的6个, 1. 当中AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference分别相应boolean, int,  long, object完毕主要的原子操作 2. AtomicMarkableRe

java多线程学习--java.util.concurrent

CountDownLatch,api 文档:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. 假设我们要打印1-100,最

java.util.concurrent介绍

(本文由 blog博主Caoer(草儿)原创,此处为转载. ) java.util.concurrent 包含许多线程安全.测试良好.高性能的并发构建块.不客气地说,创建 java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作.通过提供一组可靠的.高性能并发构建块,开发人员可以提高并发类的线程安全.可伸缩性.性能.可读性和可靠性. 如果一些类名看起来相似,可能是因为 java.util.concurrent 中的许多概念源自 Doug L

Java Concurrency - java.util.concurrent API Class Diagram

摘自: www.uml-diagrams.org Here we provide several UML class diagrams for the Java™ 7 java.util.concurrent package. Several java.util.concurrent.* packages introduced with version 5.0 of the Java platform added high-level concurrency features to the Ja

Java线程池与java.util.concurrent

Java(Android)线程池 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 那你就out太多了,new Thre

java.util.concurrent介绍【转】

java.util.concurrent介绍 java.util.concurrent 包含许多线程安全.测试良好.高性能的并发构建块.不客气地说,创建 java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作.通过提供一组可靠的.高性能并发构建块,开发人员可以提高并发类的线程安全.可伸缩性.性能.可读性和可靠性. 如果一些类名看起来相似,可能是因为 java.util.concurrent 中的许多概念源自 Doug Lea 的 util

java.util.concurrent.Future Basics

Hereby I am starting a series of articles about future concept in programming languages (also known as promises or delays) with a working title: Back to the Future. Futures are very important abstraction, even more these day than ever due to growing

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition