JUC源码分析6-locks-AQS-独占模式

AbstractQueuedSynchronizer(下面简称AQS),javadoc说明: Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on  first-in-first-out (FIFO) wait queues。

1.提供一个FIFO等待队列,使用方法伪代码表示就是:

Acquire:

if(!获取到锁){

加入队列

}

Release:

if(释放锁){

unlock等待队列头结点的thread

}

2.内部使用volatile int state来表示一个同步状态,这个字段既可以表示lock的状态,也可以用来表示lock的次数,例如Semaphore使用该字段表示许可次数,ReentrantLock用来表示可重入次数,我们也可以自行定义成状态值来表示线程运行状态。子类继承AQS的时候必须实现Serializable;

3.提供独占和共享2套api,一般使用就是维护一个内部类继承AQS,实现其中一套api,判断是否获取到锁。ReentrantLock使用的是独占api,CountDownLatch使用的共享api。子类实现的protected方法为:

独占api,判断是否获取到锁:

tryAcquire

tryRelease

共享api,判断是否获取到锁:

tryAcquireShared

tryReleaseShared

isHeldExclusively(这个暂时不管)

4.AQS提供了condition用来实现wait/notify功能,入ReentrantLock.newCondition();

5.1.7版本JUC中使用到AQS的有:ReentrantLock/ReentrantReadWriteLock/Semaphore。

AQS继承了AbstractOwnableSynchronizer这个类:

//独占模式下持有锁的线程
private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}

protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

AQS的队列定义:

private transient volatile Node head;
private transient volatile Node tail;

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

通过unsafe设置队列的head/tail/state/waitStatus和节点的next值,我们可以看出队列的大致结构为:

看下队列节点的具体定义:

static final class Node {
	//标记节点类型是共享还是独占
	static final Node SHARED = new Node();
	static final Node EXCLUSIVE = null;

	//下面4个是节点状态值
	static final int CANCELLED =  1;
	static final int SIGNAL    = -1;
	static final int CONDITION = -2;
	static final int PROPAGATE = -3;

	/**
	节点状态,对应上面几个状态值:
	0:normal status
	1:节点被取消,cancelled状态的节点运行过程会被清理掉
	-1:需要唤醒当前节点的下一个节点
	-2:用在newCondition的情况下,condition时还为维护另一个条件队列
	-3:共享模式下,表示需要将release传递到队列的其他节点
	*/
	volatile int waitStatus;

    volatile Node prev;
    //next为null,并不代表改节点是tail节点,因为在加入队列时,是先pre再next的
    volatile Node next;

    volatile Thread thread;

    //独占模式时,指向条件队列的下一个节点,或者共享模式下值为SHARED
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

一.独占模式下acquire和release

Acquire:

不响应中断的acquire

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt(); //挂起后唤醒返回的中断状态是true的话,这里会中断当前线程
    }

由子类实现tryAcquire,AQS不提供

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

如果没有获取到,则addWaiter加入等待队列,并挂起线程:

private Node addWaiter(Node mode) {
//初始化一个node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
//先尝试直接加入到尾节点后面,
//从这里也可以看出,先将node的pre指向尾节点,然后cas设置tail,再将原tail的next指向节点,
//所以可能next为空的情况存在,但是已经加入的节点的pre肯定是存在
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//失败的话,for循环loop加入
    enq(node);
    return node;
}

看下enq操作:

private Node enq(final Node node) {
//loop操作,tail不存在的情况会初始化一个空节点,并将head和tail都指向空节点,
//然后cas加入node,确保节点一定会加入
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在将节点加入等待队列之后,尝试挂起线程:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        //新加入node的pre节点
            final Node p = node.predecessor();
            //如果pre节点是头结点,再次重试acquire,如果成功则设置node为头结点
            //需要注意的是,头结点代表的是持有锁的节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果pre不是头结点或acquire失败,则尝试挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
    //如果上面的操作发生异常,需要将node
        if (failed)
            cancelAcquire(node);
    }
}
/**
设置头结点
*/
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
/**
检查是否需要挂起
这个方法就是设置新加入节点的pre节点的waitStatus为SIGNAL(肯定成功),
这样在pre节点release的时候判断是不是需要唤醒下个节点
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * 设置过程中会过滤Cancelled状态的节点,把cancelled状态的节点去掉
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
/**
调用Locksupport.park阻塞线程
*/
private final boolean parkAndCheckInterrupt() {
//挂起线程
    LockSupport.park(this);
//当pre节点release的时候检查状态为SIGNAL为会唤醒当前节点,这里会返回线程的中断状态
    return Thread.interrupted();
}
/**
acquire和挂起过程中异常,需要取消acquire
*/
private void cancelAcquire(Node node) {
    //为null直接返回
    if (node == null)
        return;

    node.thread = null;

    // 下面会跳过pre为cancelled的节点,将pre指向队列node前面第一个非取消状态节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext是队列node前面第一个非取消状态节点的下一个节点
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // 下面检查node节点的位置,如果是tail节点,直接将pred设置为尾节点,
    //然后设置之前的pred的next为null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 不是tail节点
        int ws;
        //这里判断经过上面处理的node的pre是不是head节点
        //不是head节点就要cas保证其状态为SIGNAL
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            //node的next不为null且状态不是取消状态就node节点的next关联到pred节点的next节点
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
        //如果node的pre是头结点,需要唤醒node的next节点
            unparkSuccessor(node);
        }
				//将next指向自己
        node.next = node; // help GC
    }
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 之前说过addWaiter的时候是先pre->tail->next,所以存在tail已经改变但是next还没有变化的情况
     * 这里就会从tail往前查找不会null,且状态不是取消的节点
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //找到就unpark,但是unpark后也不一定acquire成功,acquire那边的for就会一直loop
    if (s != null)
        LockSupport.unpark(s.thread);
}

接下来看下响应中断的acquireInterruptibly方法,这里会先判断先线程是否中断,中断的会直接抛出异常,没有中断再尝试请求

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

doAcquireInterruptibly方法与之前的区别就是线程中断后直接抛出异常,不是像之前的那样return 中断状态到上一层

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //区别
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

支持中断和超时时间的

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    //取一次时间
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            //超时时间小于0就直接返回false
            if (nanosTimeout <= 0)
                return false;
            //这里spinForTimeoutThreshold为static final long spinForTimeoutThreshold = 1000L;
            //如果超时时间大于spinForTimeoutThreshold,park才有意思,否则直接自旋
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //底层调用unsafe.park(false,nanosTimeout)
                LockSupport.parkNanos(this, nanosTimeout);
						//唤醒后重新计算一下时间
            long now = System.nanoTime();
            nanosTimeout -= now - lastTime;
            lastTime = now;
            //如果线程中断,直接抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

响应中断和响应时间的acquire的其他跟acquire差不多。

Release

<span style="font-size:18px;">public final boolean release(int arg) {
//tryRelease是否可以释放由子类实现判断
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}</span>

unparkSuccessor上面已经讲过,unpark队列的第一个未取消状态的节点。

大致流程为:

参考:

http://brokendreams.iteye.com/blog/2250372

http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer#anch132323

时间: 2024-11-13 07:35:28

JUC源码分析6-locks-AQS-独占模式的相关文章

JUC源码分析-集合篇(七)PriorityBlockingQueue

JUC源码分析-集合篇(七)PriorityBlockingQueue PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现. PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock. 1. 基本属性 // 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是因为有的 VM 实现在数组头有些内容 private stati

JUC源码分析-集合篇(三)ConcurrentLinkedQueue

JUC源码分析-集合篇(三)ConcurrentLinkedQueue 在并发编程中,有时候需要使用线程安全的队列.如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法.使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现.非阻塞的实现方 式则可以使用循环 CAS 的方式来实现.本节让我们一起来研究一下 Doug Lea 是如何使用非阻塞的方式来实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理 以 LinkedBlockingQueue 分析 BlockingQueue 阻塞式队列的实现原理. 1. 数据结构 LinkedBlockingQueue 和 ConcurrentLinkedQueue 一样都是由 head 节点和 last 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列.默认情况下

JUC源码分析16-集合-ConcurrentSkipListMap、ConcurrentSkipListSet

NBA这赛季结束,勇士可惜啊,谁能想到没拿到冠军,库昊也没成为真正的老大,lbl一战封神,所有口水留言都变成羡慕嫉妒恨,哎,我库啊,还是还是看书吧. ConcurrentSkipListMap说实话,之前还真没注意过,还是看JUC才看到,利用skiplist跳表结构来实现一种有序的map,之前看到的map都是无序.在学习前还是要好好了解下什么是skiplist跳表,的确很不错,利用空间换时间,复杂度为logN,跳表的原理参考http://kenby.iteye.com/blog/1187303,

JUC源码分析10-locks-CountDownLatch

上一次学习了ReetrantLock,是对AQS独占模式的,这次学习CountDownLatch,是共享模式api的实现.人生不死,学无止境.先看个demo吧: import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { private static CountDownLatch count1 = new CountDownLatch(1); private static CountDownLatch

JUC源码分析13-locks-ReentrantReadWriteLock

ReentrantReadWriteLock基于AQS实现读写锁的同步: 1.利用共享模式实现读锁,独占模式实现写锁: 2.支持公平和非公平,非公平的情况下可能会出现读锁阻塞写锁的场景: 3.写锁阻塞写锁和读锁,读锁阻塞写锁: 4.写锁可以降级为读锁,读锁不能升级为写锁,只能先release再lock: 5.写锁支持condition条件: 6.读写锁都支持超时/中断lock: 7.适合读多写少的场景. 实现ReadWriteLock接口,用于返回读/写锁: <span style="fo

JUC源码分析7-locks-AQS-共享模式

AQS中一定要记住2点: 1.处理流程: if(!请求成功) 加入队列 2.请求是对state的判断,AQS不关心你state表示什么,你可以表示状态也可以表示数量,由子类实现对请求的判断.将规则的判断和规则的处理分离,有点像模板模式. 先想想什么是独占什么是共享,举个栗子:独占就像大家拿号去排队体检,你拿号了发现前面还有n个人,没办法,等吧,然后你前面的人体检完了,医生就说,你通知下一位吧,ok,你出来通知排你后面的人,这个人有可能是跟占座位似得就放在纸在哪,所以你跳过他,再通知后面真正有人的

JUC源码分析9-locks-ReentrantLock

ReentrantLock可重入锁,使用比synchronized方便灵活,可作为替代使用: 1.支持公平/不公平锁: 2.支持响应超时,响应中断: 3.支持condition: ReentrantLock实现了Lock接口,内部使用static类继承AQS实现独占式的api来实现这些功能,使用AQS的state来表示锁可重入次数: 之前学习AQS的时候说过请求和release的大的流程: acquire: if(!tryacquire()) 加入AQS的等待队列 release: if(try

Java源码分析:深入探讨Iterator模式

作者:兄弟连 java.util包中包含了一系列重要的集合类.本文将从分析源码入手,深入研究一个集合类的内部结构,以及遍历集合的迭代模式的源码实现内幕. 下面我们先简单讨论一个根接口Collection,然后分析一个抽象类AbstractList和它的对应Iterator接口,并仔细研究迭代子模式的实现原理. 本文讨论的源代码版本是JDK 1.4.2,因为JDK 1.5在java.util中使用了很多泛型代码,为了简化问题,所以我们还是讨论1.4版本的代码. 集合类的根接口Collection