AbstractQueuedSynchronizer源码解析

1、简介

AbstractQueuedSynchronizer队列同步器,用来实现锁或者其他同步组件的基础框架

AbstractQueuedSynchronizer使用int类型的volatile变量维护同步状态

一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态,同时,AQS提供以下线程安全的方法来对状态进行操作

protected final int getState() {
        return state;
    }
protected final void setState(int newState) {
        state = newState;
    }
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用

注:AQS主要是怎么使用的呢?

在java的同步组件中,AQS的子类一般是同步组件的静态内部类。

AQS是实现同步组件的关键,它俩的关系可以这样描述:同步组件是面向使用者的,它定义了使用者与组件交互的接口,隐藏了具体的实现细节;而AQS面向的是同步组件的实现者,它简化了具体的实现方式,屏蔽了线程切换相关底层操作,它们俩一起很好的对使用者和实现者所关注的领域做了一个隔离。

AQS的实现分析

从实现的角度具体分析AQS是如何实现线程同步的

同步队列分析

AQS的实现依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,并将其加入同步队列,同时阻塞当前线程。当同步状态释放时,唤醒队列的首节点。

Node

static final class Node {
     volatile int waitStatus;
     volatile Node prev;//前驱节点
     volatile Node next; //后继节点
     volatile Thread thread; //进入队列的当前线程
     Node nextWaiter;//存储condition队列中的后继节点
}    

waitStatus的几种状态:

   /** waitStatus value to indicate thread has cancelled */   static final int CANCELLED =  1; //当前线程被取消
    /** waitStatus value to indicate successor‘s thread needs unparking */
    static final int SIGNAL    = -1;//当前节点的后继节点需要运行
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2; //当前节点在等待condition
    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    static final int PROPAGATE = -3; //当前场景下后续的acquireShared可以执行
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;  //当前场景下后续的acquireShared可以执行      

Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:

    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

对于锁的获取,请求形成节点将其挂在队列尾部,至于资源的转移,是从头到尾进行,队列的基本结构就出来了:

同步队列插入/删除节点

1、节点插入

AQS提供基于CAS的设置尾节点的方法:

   /**
     * CAS tail field. Used only by enq
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

需要传递当前线程认为的尾节点和当前节点,设置成功后,当前节点与尾节点建立关联

2、节点删除

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态之后将会唤醒后继节点,后继节点将会在获取同步状态成功的时候将自己设置为首节点。

注:设置首节点是由获取同步状态成功的线程来完成,因为每次只会有一个线程能够成功的获取到同步状态,所以,设置首节点并不需要CAS来保证。

AQS源码解析

       //独占式获取同步状态,该方法的实现需要先查询当前的同步状态是否可以获取,如果可以获取再进行获取;
   protected boolean tryAcquire(int arg) {        throw new UnsupportedOperationException();    }    //释放状态;   protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }   //共享式获取同步状态;
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }    //共享式释放状态;
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    //独占模式下,判断同步状态是否已经被占用;
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

AQS提供两种方式来操作同步状态,独占式与共享式,下面就针对性做一下源码分析

独占式同步状态获取 - acquire实现

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

具体执行流程如下:

  1. 调用tryAcquire方法尝试获取同步状态;
  2. 如果获取不到同步状态,将当前线程构造成节点Node并加入同步队列;
  3. 再次尝试获取,如果还是没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

addWaiter的实现

  private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
  1. 使用当前thread构造Node;
  2. 尝试在队尾插入节点,如果尾节点已经存在,就做以下操作:
     - 分配引用T指向尾节点;
     - 将待插入节点的prev指针指向尾节点;
     - 如果尾节点还为T,将当前尾节点设置为带待插入节点;
     - T的next指针指向待插入节点。
  3. 快速在队尾插入节点,失败则进入enq(Node node)方法
  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) {
                    tail = node;
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq的逻辑可以确保Node可以有顺序的添加到同步队列中,具体的加入队列的逻辑如下:

  1. 初始化同步队列:如果尾节点为空,分配一个头结点,并将尾节点指向头结点;
  2. 节点入队,通过CAS将节点设置为尾节点,以此在队尾做节点插入。

可以看出,整个enq方法通过“死循环”来保证节点的正确插入。

进入同步队列之后接下来就是同步状态的获取了,或者说是访问控制acquireQueued。对于同步队列中的线程,在同一时刻只能由队列首节点获取同步状态,其他的线程进入等待,直到符合条件才能继续进行。

AcquireQueued的实现

  final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }
  1. 获取当前节点的前驱节点;
  2. 如果当前节点的前驱节点是头节点,并且可以获取同步状态,设置当前节点为头结点,该节点占有锁;
  3. 不满足条件的线程进入等待状态

在整个方法中,当前线程一直都在“死循环”中尝试获取同步状态:

从代码的逻辑也可以看出,其实在节点与节点之间在循环检查的过程中是不会相互通信的,仅仅只是判断自己当前的前驱是不是头结点,这样设计使得节点的释放符合FIFO,同时也避免了过早通知

注:过早通知是指前驱节点不是头结点的线程由于中断被唤醒

acquire实现总结

  • 同步状态维护:

    对同步状态的操作是原子、非阻塞的,通过AQS提供的对状态访问的方法来对同步状态进行操作,并且利用CAS来确保原子操作;

  • 状态获取:

    一旦线程成功的修改了同步状态,那么该线程会被设置为同步队列的头节点;

  • 同步队列维护:

    不符合获取同步状态的线程会进入等待状态,直到符合条件被唤醒再开始执行。

当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,让后续节点可以获取到同步状态,调用方法release(int arg)方法可以释放同步状态

独占式同步状态释放 - release实现

  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  1. 尝试释放状态,tryRelease保证将状态重置回去,同样采用CAS来保证操作的原子性;
  2. 释放成功后,调用unparkSuccessor唤醒当前节点的后继节点线程。

unparkSuccessor实现

  private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

取出当前节点的next节点,将该节点线程唤醒,被唤醒的线程获取同步状态。这里主要通过LockSupportunpark方法唤醒线程。

共享式同步状态获取

共享式获取与独占式获取最主要的区别就是在同一时刻能否有多个线程可以同时获取到同步状态。这两种不同的方式在获取资源区别如下图所示:

  1. 共享式访问资源时,其他共享式访问都是被允许的;
  2. 独占式访问资源时,在同一时刻只能有一个访问,其他的访问都被阻塞

AQS提供acquireShared方法来支持共享式获取同步状态

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}
  1. 调用tryAcquireShared(int arg)方法尝试获取同步状态:
    tryAcquireShared方法返回值 > 0时,表示能够获取到同步状态;
  2. 获取失败调用doAcquireShared(int arg)方法进入同步队列

doAcquireShared实现

  private void doReleaseShared() {
         for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }
  1. 获取当前节点的前驱节点;
  2. 如果当前节点的前驱节点是头结点,并且获取到的共享同步状态 > 0,设置当前节点的为头结点,获取同步状态成功;
  3. 不满足条件的线程自旋等待

与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)方法可以释放同步状态

共享式同步状态释放 - releaseShared实现

  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  1. 调用tryReleaseShared方法释放状态;
  2. 调用doReleaseShared方法唤醒后继节点;

独占式超时获取 - doAcquireNanos

该方法提供了超时获取同步状态调用,假如在指定的时间段内可以获取到同步状态返回true,否则返回false。它是acquireInterruptibly(int arg)的增强版

1、acquireInterruptibly实现

该方法提供了获取同步状态的能力,同样,在无法获取同步状态时会进入同步队列,这类似于acquire的功能,但是它和acquire还是区别的:acquireInterruptibly可以在外界对当前线程进行中断的时候可以提前获取到同步状态的操作,换个通俗易懂的解释吧:类似于synchronized获取锁时,这时候外界对当前线程中断了,线程获取锁的这个操作能够及时响应中断并且提前返回。

  public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
  1. 判断当前线程是否被中断,如果已经被中断,抛出InterruptedException异常并将中断标志位置为false;
  2. 获取同步状态,获取成功并返回,获取不成功调用doAcquireInterruptibly(int arg)排队等待

doAcquireInterruptibly实现

  private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }
  1. 构造节点Node,加入同步队列;
  2. 假如当前节点是首节点并且可以获取到同步状态,将当前节点设置为头结点,其他节点自旋等待;
  3. 节点每次被唤醒的时候,需要进行中断检测,假如当前线程被中断,抛出异常InterruptedException,退出循环。

doAcquireNanos实现

该方法在支持中断响应的基础上,增加了超时获取的特性。针对超时获取,主要在于计算出需要睡眠的时间间隔nanosTimeout,如果nanosTimeout > 0表示当前线程还需要睡眠,反之返回false。

  private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                if (nanosTimeout <= 0) {
                    cancelAcquire(node);
                    return false;
                }
                if (nanosTimeout > spinForTimeoutThreshold &&
                    shouldParkAfterFailedAcquire(p, node))
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }
  1. nanosTimeout <= 0,表明当前线程不需要睡眠,返回false,不能获取到同步状态;
  2. 不满足条件的线程加入同步队列;
  3. 假如当前节点是首节点,并且可以获取到同步状态,将当前节点设置为头结点并退出,返回true,表明在指定的时间内可以获取到同步状态;
  4. 不满足条件3的线程,计算出当前休眠时间,nanosTimeout = 原有nanosTimeout + deadline(睡眠之前记录的时间)- now(System.nanoTime():当前时间):
  • 如果nanosTimeout <= 0,返回超时未获取到同步状态;
  • 如果nanosTimeout > 0 && nanosTimeout <= 1000L,线程快速自旋

注:为什么不直接进入超时等待呢?原因在于非常短的超时等待是无法做到十分精确的,如果这时候再进入超时等待会让nanosTimeout的超时从整体上表现的不精确,所以,在超时非常短的情况下,AQS都会无条件进入快速自旋;

- 如果nanosTimeout > 1000L,线程通过LockSupport.parkNanos进入超时等待。

整个流程可以总结如下图所示:



原文地址:https://www.cnblogs.com/cherish010/p/8797022.html

时间: 2024-08-30 17:06:50

AbstractQueuedSynchronizer源码解析的相关文章

AbstractQueuedSynchronizer(AQS)源码解析(一)

在JDK1.5版本,新增了并发包,其中包含了显示锁.并发容器.在这些锁和容器里,都有同步器(AQS)的身影.为了更好地理解JDK的并发包,我会用三个主题来详细描述AbstractQueuedSynchronizer的实现. 在AQS中,涉及到同步队列以及Condition对象,这也是我为什么要用三个主题来讲述的原因.本节将主要讲述同步队列,后面两节会分别讲述Condition对象以及AQS的主要功能实现. AQS同步队列的主要功能是将无法获得资源的线程放入同步队列中,进行等待,它是通过链表来

第六章 ReentrantLock源码解析2--释放锁unlock()

最常用的方式: int a = 12; //注意:通常情况下,这个会设置成一个类变量,比如说Segement中的段锁与copyOnWriteArrayList中的全局锁 final ReentrantLock lock = new ReentrantLock(); lock.lock();//获取锁 try { a++;//业务逻辑 } catch (Exception e) { }finally{ lock.unlock();//释放锁 } 注:关于lock()方法的源码解析,请参照"第五章

FutureTask 源码解析

FutureTask 源码解析 版权声明:本文为本作者原创文章,转载请注明出处.感谢 码梦为生| 刘锟洋 的投稿 站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:1. 线程执行结果带有返回值2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回. 那,怎么实现future这种超时控制呢?来看看代码: FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,

CountDownLatch源码解析

一.CountDownLatch介绍 CountDownLatch是在jdk1.5被引入的,它主要是通过一个计数器来实现的,当在初始化该类的构造函数时,会事先传入一个状态值,之后在执行await方法后, 在这个状态值为0之前,当前线程(指的是调用await的线程)会一直等待.它内部使用了AQS来实现的,且是共享锁,具体怎么实现,待会看看它的实现原理. 它的应用场景: 一般在于在执行当前线程之前,要完成n个线程的任务,才能执行当前线程.这种场景适合用countdownLatch. 二.源码解析 先

死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量. 特性 读写锁具有以下特性: 是否互斥 读 写 读 否 是 写 是 是 可以看到,读写锁除了读读

死磕 java同步系列之Semaphore源码解析

问题 (1)Semaphore是什么? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什么场景中? (4)Semaphore的许可次数是否可以动态增减? (5)Semaphore如何实现限流? 简介 Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可. 特性 Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流. 下面我们一起来学习Java

死磕 java同步系列之CountDownLatch源码解析

??欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作. Cou

AQS源码解析

文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多.还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油! AQS 结构 属性 private transient volatile Node head; private tra

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable