JUC源码分析8-locks-AQS-condition

AQS的conditionObject实现类似object的wait/notify/notify的功能,功能大概是:

1.object维护一个监视器和一个等待队列,condition对于一个lock可以有多个condition,对于每个condition维护一个条件队列;

2.提供wait/signal/signalall功能。

来个入门demo:

public class ConditionTest {

    private static ReentrantLock lock = new ReentrantLock();

    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();
                    System.out.println(Thread.currentThread()+ "等待条件完成");
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread()+ "终于等到条件完成了,gogogo");
                    lock.unlock();
                }
            }
        }).start();

        Thread b = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    lock.lock();
                    condition.signalAll();
                    System.out.println(Thread.currentThread()+ "条件完成了,释放吧");
                } finally {
                    lock.unlock();
                }
            }
        });
        b.start();
    }
}

ConditionObject实现Condition接口,Condition提供的方法定义:

有没有很熟悉的感觉。

ConditionObject每次new都会维护一个条件队列,通过node的nextWaiter串起来

/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;

/**
 * 空的构造,看下ReentrantLock.newCondition()每次都会new ConditionObject()可以维护多个条件队列
 */
public ConditionObject() { }

看下响应中断的await()流程

/**
响应中断的await
能调用await的方法的线程肯定获得锁
*/
public final void await() throws InterruptedException {
		//线程中断直接异常
    if (Thread.interrupted())
        throw new InterruptedException();
		//将当前线程封装加入condition的条件队列
    Node node = addConditionWaiter();
    //释放AQS同步等待队列中的节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //看节点是否还在AQS的同步等待队列,因为signal/signalall调用的话会把节点加入到AQS的等待队列,如果没在那就说明需要park
    while (!isOnSyncQueue(node)) {
    		//不在的话那就应该在条件队列了,那么park吧
        LockSupport.park(this);
        //被signal/signalall唤醒后,检查中断状态,如果被中断,break,没有的话while
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //这里说明已经加入到AQS的队列,重新acquire,注意的是acquireQueued返回值为是否中断,返回true肯定是中断,返回false
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)//中断时,直接throw还是设置中断状态
        reportInterruptAfterWait(interruptMode);
}
/**
先判断lastWaiter的状态,如果不是condition就过一遍条件队列,将所有状态不为condition的都去掉
然后将节点加入到lastWaiter(类似AQS中的tail)的nextWaiter,如果last为null,就将first和last的nextWaiter都指向新节点
最后将lastWaiter指向新加入节点
*/
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
/**
从firstWaiter开始,过滤掉所有状态不为condition的节点
基本上按trail-t-next逐个节点向后移动,t从firstWaiter开始
当时看的时候,拿纸画了一遍才清楚
*/
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
/**
这里是释放掉AQS同步等待队列中的节点
返回释放前的state值
有异常的话就将节点的状态改为cancelled
*/
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
/**
看节点是否还在AQS的队列中
*/
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     之前分析过AQS加入节点的顺序enq(),pre-tail-next,pre加入了,但是并不能说明这个节点就真正在AQS的等待队列,
     所以需要从tail往前过滤一遍看是否存在
     */
    return findNodeFromTail(node);
}
/**
从tail往前判断节点是否在队列中,找到返回true
*/
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

/**
2个状态,表示await被唤醒后,如果检查线程是中断的,就需要判断是在什么时候被中断,然后判断怎么返回这个中断,是直接异常还是设置中断状态
 */
private static final int REINTERRUPT =  1;
private static final int THROW_IE    = -1;

/**
	检查中断状态,0:未中断,
	THROW_IE:
	REINTERRUPT:
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
/**
检查中断是在什么时候发生的,是在signal前还是signal后
*/
final boolean transferAfterCancelledWait(Node node) {
	//如果调用signal的话,先把节点的状态设置成0,再把节点从条件队列转移(enq)到AQS的等待队列
	//所以下面这个cas成功,那么这个中断肯定是发生在signal前
  if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  		//把节点放入AQS队列,保证后面acquireQueued执行
      enq(node);
      return true;
  }
  /*
   * If we lost out to a signal(), then we can't proceed
   * until it finishes its enq().  Cancelling during an
   * incomplete transfer is both rare and transient, so just
   * spin.
   */
	/**
	到这里的话,肯定是已经发生了signal,但是signal的enq没有完成,所以自旋,让signal的enq完成,返回false
	*/
  while (!isOnSyncQueue(node))
      Thread.yield();
  return false;
}
/**
这是根据之前的标识判断怎么处理中断
signal前就抛出,signal后就设置中断状态
*/
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

整个await的流程为:

1.判断线程中断,中断直接抛出异常

2.将节点加入condition条件队列

3.释放AQS队列中的锁

4.while判断是否在AQS等待队列

5.如果不在AQS队列中,就park

6.唤醒后检查是被signal唤醒还是中断唤醒

7.中断唤醒要判断signal前还是signal后,设置怎么处理中断,signal前的话还需要将节点enq到AQS的等待队列,转到4

8.如果在就acquireQueued,重新获取,这里判断acquire返回,为true则为中断,然后设置中断处理方式

9.如果节点的nextWaiter不为null,就清理下condition的条件队列,清除所有状态不为condition的节点

10.最后看是否需要处理中断,如有,signal前的中断直接抛出,signal后设置中断状态。

awaitNanos/awaitUntil/await(long time, TimeUnit unit)基本流程跟响应中断的await差不多,只不过多了超时时间处理,跟前面讲过的响应超时没什么区别,都是底层unsafe的那些。

看下signal/signalAll:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
/**子类实现判断是否是自己拥有*/
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
/**
这里从first开始释放一个condition状态的节点
*/
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //设置节点状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //将节点加入AQS的等待队列,返回的是加入节点的pre
    Node p = enq(node);
    int ws = p.waitStatus;
    //设置节点状态为SIGNAL,如果失败直接unpark新加入的节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal只释放first的开始第一个状态为condition的节点,然后将节点加入到AQS的同步等待队列,设置新加入节点的pre的状态为SIGNAL。看下signalAll的释放:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

看到signalAll最终是处理所有condition节点。

其实不管是await还是signal/signalAll都是模拟object.wait跟notify/notifyAll,可以对比来看。

AQS大概就这么多了,还有个AbstractQueuedLongSynchronizer这个类,跟AQS差不多,只是state状态采用的是long类型:

    private volatile long state;

AQS采用的是:

    private volatile int state;

注意:

await会有虚假唤醒的情况,即使没有signal,await的线程也可能被唤醒。参考:多线程编程中条件变量和虚假唤醒(spurious wakeup)的讨论 http://siwind.iteye.com/blog/1469216,最后的建议就是使用
while判断条件而不是使用if判断:

while(条件不满足){

condition_wait(cond, mutex);

}

而不是:

If( 条件不满足 ){

Condition_wait(cond,mutex);

}

说实话最后我也没看懂什么原因导致虚假唤醒,后来去stackoverflow查询了下,这是解释,自己研究吧

http://stackoverflow.com/questions/1050592/do-spurious-wakeups-actually-happen,还有这篇http://blog.sina.com.cn/s/blog_e59371cc0102v29b.html

参考:

http://blog.csdn.net/yuenkin/article/details/50867530

http://brokendreams.iteye.com/blog/2250372

http://ifeve.com/understand-condition/comment-page-1/#comment-26901

时间: 2024-08-02 11:15:23

JUC源码分析8-locks-AQS-condition的相关文章

JUC源码分析-集合篇(三)ConcurrentLinkedQueue

JUC源码分析-集合篇(三)ConcurrentLinkedQueue 在并发编程中,有时候需要使用线程安全的队列.如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法.使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现.非阻塞的实现方 式则可以使用循环 CAS 的方式来实现.本节让我们一起来研究一下 Doug Lea 是如何使用非阻塞的方式来实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理

JUC源码分析-集合篇(五)BlockingQueue 阻塞式队列实现原理 以 LinkedBlockingQueue 分析 BlockingQueue 阻塞式队列的实现原理. 1. 数据结构 LinkedBlockingQueue 和 ConcurrentLinkedQueue 一样都是由 head 节点和 last 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列.默认情况下

JUC源码分析-集合篇(七)PriorityBlockingQueue

JUC源码分析-集合篇(七)PriorityBlockingQueue PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现. PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock. 1. 基本属性 // 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是因为有的 VM 实现在数组头有些内容 private stati

JUC源码分析16-集合-ConcurrentSkipListMap、ConcurrentSkipListSet

NBA这赛季结束,勇士可惜啊,谁能想到没拿到冠军,库昊也没成为真正的老大,lbl一战封神,所有口水留言都变成羡慕嫉妒恨,哎,我库啊,还是还是看书吧. ConcurrentSkipListMap说实话,之前还真没注意过,还是看JUC才看到,利用skiplist跳表结构来实现一种有序的map,之前看到的map都是无序.在学习前还是要好好了解下什么是skiplist跳表,的确很不错,利用空间换时间,复杂度为logN,跳表的原理参考http://kenby.iteye.com/blog/1187303,

JUC源码分析6-locks-AQS-独占模式

AbstractQueuedSynchronizer(下面简称AQS),javadoc说明: Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on  first-in-first-out (FIFO) wait queues. 1.提供一个FIFO等待队列,使用方法伪代码表示就是: Acquire: if(!获取到锁

JUC源码分析13-locks-ReentrantReadWriteLock

ReentrantReadWriteLock基于AQS实现读写锁的同步: 1.利用共享模式实现读锁,独占模式实现写锁: 2.支持公平和非公平,非公平的情况下可能会出现读锁阻塞写锁的场景: 3.写锁阻塞写锁和读锁,读锁阻塞写锁: 4.写锁可以降级为读锁,读锁不能升级为写锁,只能先release再lock: 5.写锁支持condition条件: 6.读写锁都支持超时/中断lock: 7.适合读多写少的场景. 实现ReadWriteLock接口,用于返回读/写锁: <span style="fo

JUC源码分析7-locks-AQS-共享模式

AQS中一定要记住2点: 1.处理流程: if(!请求成功) 加入队列 2.请求是对state的判断,AQS不关心你state表示什么,你可以表示状态也可以表示数量,由子类实现对请求的判断.将规则的判断和规则的处理分离,有点像模板模式. 先想想什么是独占什么是共享,举个栗子:独占就像大家拿号去排队体检,你拿号了发现前面还有n个人,没办法,等吧,然后你前面的人体检完了,医生就说,你通知下一位吧,ok,你出来通知排你后面的人,这个人有可能是跟占座位似得就放在纸在哪,所以你跳过他,再通知后面真正有人的

JUC源码分析9-locks-ReentrantLock

ReentrantLock可重入锁,使用比synchronized方便灵活,可作为替代使用: 1.支持公平/不公平锁: 2.支持响应超时,响应中断: 3.支持condition: ReentrantLock实现了Lock接口,内部使用static类继承AQS实现独占式的api来实现这些功能,使用AQS的state来表示锁可重入次数: 之前学习AQS的时候说过请求和release的大的流程: acquire: if(!tryacquire()) 加入AQS的等待队列 release: if(try

JUC源码分析19-队列-PriorityBlockingQueue

PriorityBlockingQueue是一个基于数组实现的线程安全的无界队列,原理和内部结构跟PriorityQueue基本一样,只是多了个线程安全.javadoc里面提到一句,1:理论上是无界的,所以添加元素可能导致outofmemoryerror:2.不容许添加null:3.添加的元素使用构造时候传入Comparator排序,要不然就使用元素的自然排序. PriorityBlockingQueue是基于优先级,不是FIFO,这是个好东西,可以用来实现优先级的线程池,高优先级的先执行,低优