聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)

近期总体过了下AQS的结构。也在网上看了一些讲AQS的文章,大部分的文章都是泛泛而谈。又一次看了下AQS的代码,把一些新的要点拿出来说一说。

AQS是一个管程。提供了一个主要的同步器的能力,包括了一个状态,改动状态的原子操作。以及同步线程的一系列操作。它是CLHLock的变种,CLHLock是一个基于队列锁的自旋锁算法。

AQS也採用了队列来作为同步线程的结构。它维护了两个队列。一个是作为线程同步的同步队列,还有一个是基于Unsafe来进行堵塞/唤醒操作的条件队列。

所以理解队列操作是理解AQS的关键。

1. 理解 head, tail引用

2. 理解 next, prev引用

3. 理解队列节点何时入队,何时出队

关于head引用,须要记住的是

1. head引用始终指向获得了锁的节点,它不会被取消

acquire操作成功就表示获得了锁,acquire过程中假设中断,那么acquire就失败了,这时候head就会指向下一个节点。

* because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.

而获得了锁的之后,假设线程中断了,那么就需要release来释放head节点。

假设线程中断了不释放锁,就有可能造成问题。所以使用显式锁时。必需要在finally里面释放锁

Lock lock = new ReentrantLock();
		lock.lock();
		try{
			// 假设中断,能够处理获得抛出,要保证在finally里面释放锁
		}finally{
			lock.unlock();
		}

再来看看获得锁时对head引用的处理,仅仅有节点的前驱节点是head时,它才有可能获得锁,而获得锁之后,要把自己设置为head节点,同一时候把老的head的next设置为null。

这里有几层含义:

1. 始终从head节点開始获得锁

2. 新的线程获得锁之后,之前获得锁的节点从队列中出队

3. 一旦获得了锁,acquire方法肯定返回,这个过程中不会被中断

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

关于tail引用。它负责无锁地实现一个链式结构。採用CAS + 轮询的方式。

节点的入队操作都是在tail节点

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

next引用在队列中扮演了非常关键的数据。它出现的频率非常高。关于next引用。它有几种值的情况

1. next = null

2. next指向非null的下一个节点

3. next = 节点自己

next = null的情况有三种

1. 队尾节点,队尾节点的next没有显式地设置。所以为null

2. 队尾节点入队列时的上一个队尾节点next节点有可能为null,由于enq不是原子操作,CAS之后是复合操作

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    // 这个期间next可能为null
                    t.next = node;
                    return t;
                }
            }
        }
    }

3. 获取锁时,之前获取锁的节点的next设置为null

if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

next指向非null的下一个节点,这样的情况就是正常的在同步队列中等待的节点,入队操作时设置了前一个节点的next值,这样能够在释放锁时,通知下一个节点来获取锁

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

next指向自己,这个是取消操作时,会把节点的前一个节点指向它的后一个节点,最后把next域设置为自己

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

prev引用比較简单,它主要是维护链表结构。CLHLock是在前一个节点的状态自旋,AQS里面的节点不是在前一个状态等待,而是释放的时候由前一个节点通知队列来查找下一个要被唤醒的节点。

最后说说节点进入队列和出队列的情况。

节点入队列仅仅有一种情况。那就是它的tryAcquire操作失败,没有获得锁,就进入同步队列等待,假设tryAcquire成功了,就不须要进入同步队列等待了。AQS提供了充分的灵活性。它提供了tryAcquire和tryRelase方法给子类扩展。基类负责维护队列操作,子类能够自己决定是否要进入队列。

所以实际子类扩展的时候有两种类型,一种是公平的同步器,一种是非公平的同步器。这里须要注意的是,所谓的非公平,不是说不使用队列来维护堵塞操作,而是说在获取竞争时,不考虑先来的线程,后来的线程能够直接竞争资源。非公平和公平的同步器竞争失败后,都须要进入AQS的同步队列进行等待,而同步队列是先来先服务的公平的队列。

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

出队列有两种情况。

1. 后一个线程获得锁是。head引用指向当前获得锁的线程。前一个获得锁的节点自己主动出队列

2. 取消操作时。节点出队列,取消仅仅有两种情况,一种是线程被中断,另一种是等待超时

时间: 2024-08-01 10:31:59

聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)的相关文章

聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

这篇说说java.util.concurrent.atomic包里的类,总共12个.网上有非常多文章解析这几个类.这里挑些重点说说. 这12个类能够分为三组: 1. 普通类型的原子变量 2. 数组类型的原子变量 3. 域更新器 普通类型的原子变量的6个, 1. 当中AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference分别相应boolean, int,  long, object完毕主要的原子操作 2. AtomicMarkableRe

聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互斥锁,基于AQS的互斥模式实现,也就是说同时只有一个线程进入临界区,唤醒下一个线程时也只能释放一个等待线程 2. 可重入,通过设置了一个字段exclusiveOwnerThread来标示当前获得锁的线程.获取锁操作是,如果当前线程是已经获得锁的线程,那么获取操作成功.把当前状态作为获得锁次数的计数器

聊聊高并发(三十)解析java.util.concurrent各个组件(十二) 理解CyclicBarrier栅栏

这篇讲讲CyclicBarrier栅栏,从它的名字可以看出,它是可循环使用的.它的功能和CountDownLatch类似,也是让一组线程等待,然后一起开始往下执行.但是两者还是有几个区别 1. 等待的对象不同.CountDownLatch的一组线程等待的是一个事件,或者说是一个计数器归0的事件.而CyclicBarrier等待的对象是线程,只有线程都到齐了才往下执行 2. 使用方式不同,这个也是由等待的对象不同引起的,CountDownLatch需要调用await()来让线程等待,调用count

谈论高并发(三十)解析java.util.concurrent各种组件(十二) 认识CyclicBarrier栅栏

这次谈话CyclicBarrier栅栏,如可以从它的名字可以看出,它是可重复使用. 它的功能和CountDownLatch类别似,也让一组线程等待,然后开始往下跑起来.但也有在两者之间有一些差别 1. 不同的对象等.CountDownLatch组线程等待的是一个事件.或者说是一个计数器归0的事件.而CyclicBarrier等待的对象是线程,仅仅有线程都到齐了才往下运行 2. 使用方式不同,这个也是由等待的对象不同引起的,CountDownLatch须要调用await()来让线程等待.调用cou

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析

ThreadPoolExecutor是Executor运行框架最重要的一个实现类.提供了线程池管理和任务管理是两个最主要的能力.这篇通过分析ThreadPoolExecutor的源代码来看看怎样设计和实现一个基于生产者消费者模型的运行器. 生产者消费者模型 生产者消费者模型包括三个角色:生产者,工作队列,消费者.对于ThreadPoolExecutor来说, 1. 生产者是任务的提交者,是外部调用ThreadPoolExecutor的线程 2. 工作队列是一个堵塞队列的接口,详细的实现类能够有非

聊聊高并发(四十一)解析java.util.concurrent各个组件(十七) 任务的异步执行和状态控制

聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计这篇说了ExecutorService接口扩展了Executor接口,在执行任务的基础上,提供了执行框架生命周期的管理,任务的异步执行,批量任务的执行的能力.AbstractExecutorService抽象类实现了ExecutorService接口,提供了任务异步执行和批量执行的默认实现.这篇说说任务的异步执行和状态控制 说明一点,使用Executor框架执行任务的方式

聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

前几篇分析了一下AQS的原理和实现.这篇拿Semaphore信号量做样例看看AQS实际是怎样使用的. Semaphore表示了一种能够同一时候有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据,仅仅有拿到了票据的线程尽能够进入临界区,否则就等待.直到获得释放出的票据. Semaphore经常使用在资源池中来管理资源.当状态仅仅有1个0两个值时,它退化成了一个相互排斥的同步器.类似锁. 以下来看看Semaphore的代码. 它维护了一个内部类Sync来继承AQS,定制tryXXX方法来使

聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何实现的锁降级.但是下面几个问题没说清楚,这篇补充一下 1. 释放锁时的优先级问题,是让写锁先获得还是先让读锁先获得 2. 是否允许读线程插队 3. 是否允许写线程插队,因为读写锁一般用在大量读,少量写的情况,如果写线程没有优先级,那么可能造成写线程的饥饿 关于释放锁后是让写锁先获得还是让读锁先获得,

聊聊高并发(二十六)解析java.util.concurrent各个组件(八) 理解CountDownLatch闭锁

CountDownLatch闭锁也是基于AQS实现的一种同步器,它表示了"所有线程都等待,直到锁打开才继续执行"的含义.它和Semaphore的语意不同, Semaphore的获取和释放操作都会修改状态,都可能让自己或者其他线程立刻拿到锁.而闭锁的获取操作只判断状态是否为0,不修改状态本身,闭锁的释放操作会修改状态,每次递减1,直到状态为0. 所以正常情况下,闭锁的获取操作只是等待,不会立刻让自己获得锁,直到释放操作把状态变为0. 闭锁可以用来实现很多场景,比如: 1. 某个服务依赖于