Java Concurrent之 AbstractQueuedSynchronizer

ReentrantLock/CountDownLatch/Semaphore/FutureTask/ThreadPoolExecutor的源码中都会包含一个静态的内部类Sync,它继承了AbstractQueuedSynchronizer这个抽象类。

AbstractQueuedSynchronizer是java.util.concurrent包中的核心组件之一,为并发包中的其他synchronizers提供了一组公共的基础设施。

AQS会对进行acquire而被阻塞的线程进行管理,其管理方式是在AQS内部维护了一个FIFO的双向链表队列,队列的头部是一个空的结点,除此之外,每个结点持有着一个线程,结点中包含两个重要的属性waiteStatus和nextWaiter。结点的数据结构如下: Node中的属性waitStatus、prev、next、thread都使用了volatile修饰,这样直接的读写操作就具有内存可见性。 waitStatus表示了当前结点Node的状态

Java代码  

  1. static final class Node {
  2. /** waitStatus的值,表示此结点中的线程被取消 */
  3. static final int CANCELLED =  1;
  4. /** waitStatus value 表明后续结点中的线程需要unparking 唤醒 */
  5. static final int SIGNAL    = -1;
  6. /** waitStatus value 表明当前结点中的线程需要等待一个条件*/
  7. static final int CONDITION = -2;
  8. /** 表明结点是以共享模式进行等待(shared mode)的标记*/
  9. static final Node SHARED = new Node();
  10. /** 表明结点是以独占模式进行等待(exclusive mode)的标记*/
  11. static final Node EXCLUSIVE = null;
  12. /**
  13. * Status field, taking on only the values:
  14. *   SIGNAL: 后继结点现在(或即将)被阻塞(通过park) 那么当前结点在释放或者被取消的时候必须unpark它的后继结点
  15. *           为了避免竞态条件,acquire方法必须首先声明它需要一个signal,然后尝试原子的acquire
  16. *            如果失败了 就阻塞
  17. *   CANCELLED:当前结点由于超时或者中断而被取消  结点不会脱离这个状态
  18. *              尤其是,取消状态的结点中的线程永远不会被再次阻塞
  19. *   CONDITION: 当前结点在一个条件队列中。它将不会进入sync队列中直到它被transferred
  20. *              (这个值在这里的使用只是为了简化结构 跟其他字段的使用没有任何关系)
  21. *   0:          None of the above 非以上任何值
  22. *
  23. * 这些值通过数字来分类达到简化使用的效果
  24. * 非负的数字意味着结点不需要信号signal 这样大部分的代码不需要检查特定的值 just 检查符号就ok了
  25. *
  26. * 这个字段对于普通的sync结点初始化为0 对于条件结点初始化为CONDITION(-2) 本字段的值通过CAS操作进行修改
  27. */
  28. volatile int waitStatus;
  29. /**
  30. * 连接到当前结点或线程依赖的用于检查waitStatus等待状态的前驱结点。
  31. * 进入队列时赋值,出队列时置空(为GC考虑)。
  32. * 根据前驱结点的取消(CANCELLED),我们查找一个非取消结点的while循环短路,将总是会退出 ;
  33. * 因为头结点永远不会被取消:一个结点成为头结点只能通过一次成功过的acquire操作的结果
  34. * 一个取消的线程永远不会获取操作成功(acquire操作成功)
  35. * 一个线程只能取消它自己  不能是其他结点
  36. */
  37. volatile Node prev;
  38. /**
  39. * 连接到当前结点或线程释放时解除阻塞(unpark)的后继结点
  40. * 入队列时赋值,出队列时置空(为GC考虑)
  41. * 入队列时不会给前驱结点的next字段赋值,需要确认compareAndSetTail(pred, node)操作是否成功 (详见Node addWaiter(Node mode)方法)
  42. * 所以当我们发现结点的next为空时不一定就是tail尾结点 如果next为空,可以通过尾结点向前遍历即addWaiter中调用的enq(node)方法(个人觉
  43. * 这是对第一次处理失败的亡羊补牢之举)官方说法double-check 双层检查
  44. *
  45. * 被取消的结点next指向的是自己而不是空(详见cancelAcquire(Node node)中最后的node.next = node; )这让isOnSyncQueue变得简单
  46. * Upon cancellation, we cannot adjust this field, but can notice
  47. * status and bypass the node if cancelled.
  48. */
  49. volatile Node next;
  50. /**
  51. * 入队列结点中的线程,构造时初始化,使用完 就置空
  52. */
  53. volatile Thread thread;
  54. /**
  55. * 连接到下一个在条件上等待的结点 或者waitStatus为特殊值SHARED 共享模式
  56. * 因为条件队列只有在独占模式(exclusive)下持有时访问,当结点等待在条件上,我们只需要一个简单的链表队列来持有这些结点
  57. * 然后他们会转移到队列去进行re-acquire操作。
  58. * 由于条件只能是独占的,我们可以使用一个特殊的值来声明共享模式(shared mode)来节省一个字段
  59. */
  60. Node nextWaiter;
  61. /**
  62. * 如果结点以共享模式等待  就返回true
  63. */
  64. final boolean isShared() {
  65. return nextWaiter == SHARED;
  66. }
  67. /**
  68. * 返回当前结点的前驱结点如果为null就抛出NullPointException
  69. * @return the predecessor of this node
  70. */
  71. final Node predecessor() throws NullPointerException {
  72. Node p = prev;
  73. if (p == null)
  74. throw new NullPointerException();
  75. else
  76. return p;
  77. }
  78. //用于建立初始化头 或 共享标识
  79. Node() {
  80. }
  81. //入队列时使用
  82. Node(Thread thread, Node mode) {     // Used by addWaiter
  83. this.nextWaiter = mode;
  84. this.thread = thread;
  85. }
  86. //用于条件结点
  87. Node(Thread thread, int waitStatus) { // Used by Condition
  88. this.waitStatus = waitStatus;
  89. this.thread = thread;
  90. }
  91. }

acquire操作

获取同步器

Java代码  

  1. if(尝试获取成功){
  2. return ;
  3. }else{
  4. 加入队列;park自己
  5. }

释放同步器

Java代码  

  1. if(尝试释放成功){
  2. unpark等待队列中的第一个结点
  3. }else{
  4. return false;
  5. }

Java代码  

  1. /**
  2. * 以独占模式(exclusive mode)排他地进行的acquire操作 ,对中断不敏感 完成synchronized语义
  3. * 通过调用至少一次的tryAcquire实现 成功时返回
  4. * 否则在成功之前,一直调用tryAcquire(int)将线程加入队列,线程可能反复的阻塞和解除阻塞(park/unpark)。
  5. * 这个方法可以用于实现Lock.lock()方法
  6. * acquire是通过tryAcquire(int)来实现的,直至成功返回时结束,故我们无需自定义这个方法就可用它来实现lock。
  7. * tryLock()是通过Sync.tryAquire(1)来实现的
  8. * @param arg the acquire argument. 这个值将会被传递给tryAcquire方法
  9. * 但他是不间断的 可以表示任何你喜欢的内容
  10. */
  11. public final void acquire(int arg) {
  12. if (!tryAcquire(arg) &&
  13. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  14. selfInterrupt();
  15. }
  16. /**
  17. * 尝试以独占模式进行acquire操作 这个方法应该查询这个对象状态是否允许以独占模式进行acquire操作,如果允许就获取它
  18. *
  19. *
  20. * <p>This method is always invoked by the thread performing
  21. * acquire.  If this method reports failure, the acquire method
  22. * may queue the thread, if it is not already queued, until it is
  23. * signalled by a release from some other thread. This can be used
  24. * to implement method {@link Lock#tryLock()}.
  25. *
  26. * 默认实现抛出UnsupportedOperationException异常
  27. *
  28. * @param arg the acquire argument. This value is always the one
  29. *        passed to an acquire method, or is the value saved on entry
  30. *        to a condition wait.  The value is otherwise uninterpreted
  31. *        and can represent anything you like.
  32. * @return {@code true} if successful. Upon success, this object has
  33. *         been acquired.
  34. * @throws IllegalMonitorStateException if acquiring would place this
  35. *         synchronizer in an illegal state. This exception must be
  36. *         thrown in a consistent fashion for synchronization to work
  37. *         correctly.
  38. * @throws UnsupportedOperationException if exclusive mode is not supported
  39. */
  40. protected boolean tryAcquire(int arg) {
  41. throw new UnsupportedOperationException();
  42. }
  43. /**
  44. * 以独占不可中断模式
  45. * Acquires in exclusive uninterruptible mode for thread already in
  46. * queue. Used by condition wait methods as well as acquire.
  47. *
  48. * @param node the node
  49. * @param arg the acquire argument
  50. * @return {@code true} if interrupted while waiting
  51. */
  52. final boolean acquireQueued(final Node node, int arg) {
  53. try {
  54. boolean interrupted = false;//记录线程是否曾经被中断过
  55. for (;;) {//死循环 用于acquire获取失败重试
  56. final Node p = node.predecessor();//获取结点的前驱结点
  57. if (p == head && tryAcquire(arg)) {//若前驱为头结点  继续尝试获取
  58. setHead(node);
  59. p.next = null; // help GC
  60. return interrupted;
  61. }
  62. ////检查是否需要等待(检查前驱结点的waitStatus的值>0/<0/=0) 如果需要就park当前线程  只有前驱在等待时才进入等待 否则继续重试
  63. if (shouldParkAfterFailedAcquire(p, node) &&
  64. parkAndCheckInterrupt())//线程进入等待需要,需要其他线程唤醒这个线程以继续执行
  65. interrupted = true;//只要线程在等待过程中被中断过一次就会被记录下来
  66. }
  67. } catch (RuntimeException ex) {
  68. //acquire失败  取消acquire
  69. cancelAcquire(node);
  70. throw ex;
  71. }
  72. }
  73. /**
  74. * 检查并更新acquire获取失败的结点的状态
  75. * 信号控制的核心
  76. * Checks and updates status for a node that failed to acquire.
  77. * Returns true if thread should block. This is the main signal
  78. * control in all acquire loops.  Requires that pred == node.prev
  79. *
  80. * @param pred node‘s predecessor holding status
  81. * @param node the node
  82. * @return {@code true} if thread should block
  83. */
  84. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  85. int s = pred.waitStatus;
  86. if (s < 0)
  87. /*
  88. * 这个结点已经设置状态要求对他释放一个信号 所以他是安全的等待
  89. * This node has already set status asking a release
  90. * to signal it, so it can safely park
  91. */
  92. return true;
  93. if (s > 0) {
  94. /*
  95. * 前驱结点被取消 跳过前驱结点 并尝试重试 知道找到一个未取消的前驱结点
  96. * Predecessor was cancelled. Skip over predecessors and
  97. * indicate retry.
  98. */
  99. do {
  100. node.prev = pred = pred.prev;
  101. } while (pred.waitStatus > 0);
  102. pred.next = node;
  103. }
  104. else
  105. /*
  106. * 前驱结点的状态为0时表示为新建的 需要设置成SIGNAL(-1)
  107. * 声明我们需要一个信号但是暂时还不park 调用者将需要重试保证它在parking之前不被acquire
  108. * Indicate that we need a signal, but don‘t park yet. Caller
  109. * will need to retry to make sure it cannot acquire before
  110. * parking.
  111. */
  112. compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
  113. return false;
  114. }
  115. /**
  116. * park当前线程方便的方法 并且然后会检查当前线程是否中断
  117. *
  118. * @return {@code true} if interrupted
  119. */
  120. private final boolean parkAndCheckInterrupt() {
  121. LockSupport.park(this);
  122. return Thread.interrupted();
  123. }

添加结点到等待队列

首先构建一个准备入队列的结点,如果当前队列不为空,则将mode的前驱指向tail(只是指定当前结点的前驱结点,这样下面的操作一即使失败了 也不会影响整个队列的现有连接关系),compareAndSetTail成功将mode设置为tail结点,则将原先的tail结点的后继节点指向mode。如果队列为空亦或者compareAndSetTail操作失败,没关系我们还有enq(node)为我们把关。

Java代码  

  1. /**
  2. *通过给定的线程和模式 创建结点和结点入队列操作
  3. *
  4. * @param current the thread 当前线程
  5. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 独占和共享模式
  6. * @return the new node
  7. */
  8. private Node addWaiter(Node mode) {
  9. Node node = new Node(Thread.currentThread(), mode);
  10. // Try the fast path of enq; backup to full enq on failure
  11. Node pred = tail;
  12. if (pred != null) {
  13. node.prev = pred;//只是指定当前结点的前驱结点,这样下面的操作一即使失败了   也不会影响整个队列的现有连接关系
  14. if (compareAndSetTail(pred, node)) {//原子地设置node为tail结点 CAS操作 操作一
  15. pred.next = node;
  16. return node;
  17. }
  18. }
  19. enq(node);//操作一失败时  这里会重复检查亡羊补牢一下  官方说法 double-check
  20. return node;
  21. }
  22. /**
  23. * 将结点插入队列 必要时进行初始化操作
  24. * @param node 带插入结点
  25. * @return node‘s predecessor 返回当前结点的前驱结点
  26. */
  27. private Node enq(final Node node) {
  28. for (;;) {
  29. Node t = tail;
  30. if (t == null) { // Must initialize 当前队列为空 进行初始化操作
  31. Node h = new Node(); // Dummy header 傀儡头结点
  32. h.next = node;
  33. node.prev = h;
  34. if (compareAndSetHead(h)) {//原子地设置头结点
  35. tail = node;//头尾同一结点
  36. return h;
  37. }
  38. }
  39. else {
  40. node.prev = t;
  41. if (compareAndSetTail(t, node)) {//原子地设置tail结点 上面操作一的增强操作
  42. t.next = node;
  43. return t;
  44. }
  45. }
  46. }
  47. }

acquire 取消结点

取消结点操作:首先会判断结点是否为null,若不为空,while循环查找距离当前结点最近的非取消前驱结点PN(方便GC处理取消的结点),然后取出这个前驱的后继结点指向,利用它来感知其他的取消或信号操作(例如 compareAndSetNext(pred, predNext, null)) 然后将当前结点的状态Status设置为CANCELLED

  • 当前结点如果是尾结点,就删除当前结点,将找到的非取消前驱结点PN设置为tail,并原子地将其后继指向为null
  • 当前结点存在后继结点SN,如果前驱结点需要signal,则将PN的后继指向SN;否则将通过unparkSuccessor(node);唤醒后继结点

Java代码  

  1. /**
  2. * 取消一个将要尝试acquire的结点
  3. *
  4. * @param node the node
  5. */
  6. private void cancelAcquire(Node node) {
  7. // 如果结点不存在就直接返回
  8. if (node == null)
  9. return;
  10. node.thread = null;
  11. // 跳过取消的结点 while循环直到找到一个未取消的结点
  12. Node pred = node.prev;
  13. while (pred.waitStatus > 0)
  14. node.prev = pred = pred.prev;
  15. //前面的操作导致前驱结点发送变化 但是pred的后继结点还是没有变化
  16. Node predNext = pred.next;//通过predNext来感知其他的取消或信号操作 例如 compareAndSetNext(pred, predNext, null)
  17. //这里用无条件的写来代替CAS操作
  18. node.waitStatus = Node.CANCELLED;
  19. // 如果当前node是tail结点 就删除当前结点
  20. if (node == tail && compareAndSetTail(node, pred)) {
  21. compareAndSetNext(pred, predNext, null);//原子地将node结点之前的第一个非取消结点设置为tail结点 并将其后继指向null
  22. } else {
  23. // 如果前驱不是头结点 并且前驱的状态为SIGNAL(或前驱需要signal)
  24. if (pred != head
  25. && (pred.waitStatus == Node.SIGNAL
  26. || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
  27. && pred.thread != null) {
  28. //如果node存在后继结点 将node的前驱结点的后继指向node的后继
  29. Node next = node.next;
  30. if (next != null && next.waitStatus <= 0)
  31. compareAndSetNext(pred, predNext, next);//原子地将pred的后继指向node的后继
  32. } else {
  33. //node没有需要signal的前驱,通知后继结点
  34. unparkSuccessor(node);
  35. }
  36. node.next = node; // help GC
  37. }
  38. }}

唤醒后继结点 unparkSuccessor

唤醒后继结点操作:首先会尝试清除当前结点的预期信号,这里即使操作失败亦或是信号已经被其他等待线程改变 都不影响
然后查找当前线程最近的一个非取消结点 并唤醒它

Java代码  

  1. /**
  2. * 如果存在后继结点 就唤醒它
  3. *
  4. * @param node the node
  5. */
  6. private void unparkSuccessor(Node node) {
  7. /*
  8. * 尝试清除预期信号 如果操作失败或该状态被其他等待线程改变 也没关系
  9. */
  10. compareAndSetWaitStatus(node, Node.SIGNAL, 0);
  11. /*
  12. * 准备unpark的线程在后继结点里持有(通常就是下一个结点)
  13. * 但如果被取消或为空  那么就从tail向后开始遍历查找实际的非取消后继结点
  14. */
  15. Node s = node.next;
  16. if (s == null || s.waitStatus > 0) {
  17. s = null;
  18. for (Node t = tail; t != null && t != node; t = t.prev)
  19. if (t.waitStatus <= 0)
  20. s = t;//找到一个后并不跳出for循环 为了找到一个距离node最近的非取消后继结点
  21. }
  22. if (s != null)//结点不为空 唤醒后继的等待线程
  23. LockSupport.unpark(s.thread);
  24. }

 回过头来总结一下:

当我们调用acquire(int)时,会首先通过tryAcquire尝试获取锁,一般都是留给子类实现(例如ReetrantLock$FairSync中的实现)

Java代码  

  1. /**
  2. * tryAcquire的公平版本
  3. * Fair version of tryAcquire.  Don‘t grant access unless
  4. * recursive call or no waiters or is first.
  5. */
  6. protected final boolean tryAcquire(int acquires) {
  7. final Thread current = Thread.currentThread();
  8. int c = getState();
  9. if (c == 0) {
  10. if (isFirst(current) &&
  11. compareAndSetState(0, acquires)) {
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. else if (current == getExclusiveOwnerThread()) {
  17. int nextc = c + acquires;
  18. if (nextc < 0)
  19. throw new Error("Maximum lock count exceeded");
  20. setState(nextc);
  21. return true;
  22. }
  23. return false;
  24. }

如果tryAcquire(int)返回为false,则说明没有获得到锁。 则!tryAcquire(int)为true,接着会继续调用acquireQueued(final Node node ,int arg)方法,当然这调用这个方法之前,我们需要将当前包装成Node加入到队列中(即调用addWaiter(Node mode))。

在acquireQueued()方法体中,我们会发现一个死循环,唯一跳出死循环的途径是 直到找到一个(条件1)node的前驱是傀儡head结点并且子类的tryAcquire()返回true,那么就将当前结点设置为head结点并返回结点对于线程的中断状态。如果(条件1)不成立,则执行shouldParkAfterFailuredAcquire()

在shouldParkAfterFailuredAcquire(Node pred,Node node)方法体中,

首先会判断node结点的前驱结点pred的waitStatus的值:

* 如果waitStatus>0,表明pred处于取消状态(CANCELLED)则从队列中移除pred。

* 如果waitStatus<0,表明线程需要park住

* 如果waitStatus=0,表明这是一个新建结点,需要设置成SIGNAL(-1),在下一次循环中如果不能获得锁就需要park住线程,parkAndCheckInterrupt()就是执行了park()方法来park线程并返回线程中断状态。

Java代码  

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

如果中间抛出RuntimeException异常,则会调用cancelAcquire(Node)方法取消获取。取消其实也很简单,首先判断node是否为空,如果不为空,找到node最近的非取消前驱结点PN,并将node的status设置为CANCELLED;

* 倘若node为tail,将node移除并将PN结点设置为tail PN的后继指向null

* 倘若node存在后继结点SN,如果前驱结点PN需要signal,则将PN后继指向SN 否则调用unparkSuccessor(Node)唤醒后继SN

AcquireShared共享锁

Java代码  

  1. /**
  2. * 以共享模式获取Acquire 对中断不敏感
  3. * 通过多次调用tryAcquireShared方法来实现 成功时返回
  4. * 否则线程加入Sync队列 可能重复进行阻塞和释放阻塞 调用tryAcquireShared知道成功
  5. *
  6. * @param arg the acquire argument.  This value is conveyed to
  7. *        {@link #tryAcquireShared} but is otherwise uninterpreted
  8. *        and can represent anything you like.
  9. */
  10. public final void acquireShared(int arg) {
  11. if (tryAcquireShared(arg) < 0)
  12. doAcquireShared(arg);
  13. }
  14. /**
  15. * 以共享不可中断模式获取Acquire
  16. * Acquires in shared uninterruptible mode.
  17. * @param arg the acquire argument
  18. */
  19. private void doAcquireShared(int arg) {
  20. final Node node = addWaiter(Node.SHARED);
  21. try {
  22. boolean interrupted = false;
  23. for (;;) {
  24. final Node p = node.predecessor();
  25. if (p == head) {
  26. int r = tryAcquireShared(arg);
  27. if (r >= 0) {
  28. setHeadAndPropagate(node, r);
  29. p.next = null; // help GC
  30. if (interrupted)
  31. selfInterrupt();
  32. return;
  33. }
  34. }
  35. if (shouldParkAfterFailedAcquire(p, node) &&
  36. parkAndCheckInterrupt())
  37. interrupted = true;
  38. }
  39. } catch (RuntimeException ex) {
  40. cancelAcquire(node);
  41. throw ex;
  42. }
  43. }
  44. /**
  45. * Sets head of queue, and checks if successor may be waiting
  46. * in shared mode, if so propagating if propagate > 0.
  47. *
  48. * @param pred the node holding waitStatus for node
  49. * @param node the node
  50. * @param propagate the return value from a tryAcquireShared
  51. */
  52. private void setHeadAndPropagate(Node node, int propagate) {
  53. setHead(node);//队列向后移一位
  54. if (propagate > 0 && node.waitStatus != 0) {//propagate>0表明共享数值大于前面要求的数值
  55. /*
  56. * Don‘t bother fully figuring out successor.  If it
  57. * looks null, call unparkSuccessor anyway to be safe.
  58. */
  59. Node s = node.next;
  60. if (s == null || s.isShared())//如果剩下只有一个node或者node.next是共享的 需要park住该线程
  61. unparkSuccessor(node);
  62. }
  63. }

条件Condition

Condition是服务单个Lock,condition.await()等方法在Lock上形成一个condition等待队列

condition.signal()方法在Lock上面处理condition等待队列然后将队列中的node加入到AQS的阻塞队列中等待对应的线程被unpark

Java代码  

  1. /**
  2. * 实现可中断的条件等待
  3. * <ol>
  4. * <li> If current thread is interrupted, throw InterruptedException
  5. * <li> Save lock state returned by {@link #getState}
  6. * <li> Invoke {@link #release} with
  7. *      saved state as argument, throwing
  8. *      IllegalMonitorStateException  if it fails.
  9. * <li> Block until signalled or interrupted
  10. * <li> Reacquire by invoking specialized version of
  11. *      {@link #acquire} with saved state as argument.
  12. * <li> If interrupted while blocked in step 4, throw exception
  13. * </ol>
  14. */
  15. public final void await() throws InterruptedException {
  16. if (Thread.interrupted())
  17. throw new InterruptedException();
  18. Node node = addConditionWaiter();//加入到condition的对用lock的私有队列中,与AQS阻塞队列形成相似
  19. //释放这个condition对应的lock的锁 因为若这个await方法阻塞住而lock没有释放锁
  20. //那么对于其他线程的node来说肯定是阻塞住的
  21. //因为condition对应的lock获得了锁,肯定在AQS的header处,其他线程肯定是得不到锁阻塞在那里,这样两边都阻塞的话就死锁了
  22. //故这里需要释放对应的lock锁
  23. int savedState = fullyRelease(node);
  24. int interruptMode = 0;
  25. while (!isOnSyncQueue(node)) {//判断condition是否已经转化成AQS阻塞队列中的一个结点 如果没有park这个线程
  26. LockSupport.park(this);
  27. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  28. break;
  29. }
  30. //这一步需要signal()或signalAll()方法的执行 说明这个线程已经被unpark 然后运行直到acquireQueued尝试再次获得锁
  31. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  32. interruptMode = REINTERRUPT;
  33. if (node.nextWaiter != null)
  34. unlinkCancelledWaiters();
  35. if (interruptMode != 0)
  36. reportInterruptAfterWait(interruptMode);
  37. }

网上找到的一个帮助理解Condition的gif图

这个AQS存在两中链表

* 一种链表是AQS sync链表队列,可称为 横向链表

* 一种链表是Condition的wait Node链表,相对于AQS sync是结点的一个纵向链表

当纵向链表被signal通知后 会进入对应的Sync进行排队处理

Java代码  

  1. /**
  2. * Moves the longest-waiting thread, if one exists, from the
  3. * wait queue for this condition to the wait queue for the
  4. * owning lock.
  5. *
  6. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  7. *         returns {@code false}
  8. */
  9. public final void signal() {
  10. if (!isHeldExclusively())
  11. throw new IllegalMonitorStateException();
  12. Node first = firstWaiter;
  13. if (first != null)
  14. doSignal(first);
  15. }
  16. /**
  17. * Removes and transfers nodes until hit non-cancelled one or
  18. * null. Split out from signal in part to encourage compilers
  19. * to inline the case of no waiters.
  20. * @param first (non-null) the first node on condition queue
  21. */
  22. private void doSignal(Node first) {
  23. do {
  24. if ( (firstWaiter = first.nextWaiter) == null)//将旧的头结点移出 让下一个结点顶替上来
  25. lastWaiter = null;
  26. first.nextWaiter = null;
  27. } while (!transferForSignal(first) &&//将旧的头结点加入到AQS的等待队列中
  28. (first = firstWaiter) != null);
  29. }
  30. /**
  31. * Transfers a node from a condition queue onto sync queue.
  32. * Returns true if successful.
  33. * @param node the node
  34. * @return true if successfully transferred (else the node was
  35. * cancelled before signal).
  36. */
  37. final boolean transferForSignal(Node node) {
  38. /*
  39. * If cannot change waitStatus, the node has been cancelled.
  40. */
  41. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  42. return false;
  43. /*
  44. * Splice onto queue and try to set waitStatus of predecessor to
  45. * indicate that thread is (probably) waiting. If cancelled or
  46. * attempt to set waitStatus fails, wake up to resync (in which
  47. * case the waitStatus can be transiently and harmlessly wrong).
  48. */
  49. Node p = enq(node);//进入AQS的阻塞队列
  50. int c = p.waitStatus;
  51. //该结点点的状态CANCELLED或者修改状态失败 就直接唤醒该结点内的线程
  52. //PS 正常情况下 这里是不会为true的故不会在这里唤醒该线程
  53. //只有发送signal信号的线程 调用了reentrantLock.unlock方法后(该线程已经加入到了AQS等待队列)才会被唤醒。
  54. if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
  55. LockSupport.unpark(node.thread);
  56. return true;
  57. }

转眼之间,2014已经与我渐行渐远  2015要开启源码研究之旅、fighting

时间: 2024-08-25 10:31:37

Java Concurrent之 AbstractQueuedSynchronizer的相关文章

Java并发编程-AbstractQueuedSynchronizer源码分析

简介 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础.使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态.然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作: java.util.concurrent.locks.Abstra

[Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例

在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景. Toast 被生产和消费的对象. ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast . Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中. Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中. Consumer 消费

Java Concurrent

Java Concurrent ExecutorService ExecutorService exec = Executors.newCachedThreadPool(); // create a cached pool ExecutorService exec = Executors.newFixedThreadPool(4); // fixed sized thread pool ExecutorService exec = Executors.newSingleThreadExecuto

Java Concurrent happens-before

happens-before relation on memory operations such as reads and writes of shared variables. The results of a write by one thread are guaranteed to be visible to a read by another thread only if the write operation happens-before the read operation. Th

java concurrent之ReentrantLock

在编码的过程中,有时候我们不得不借助锁同步来保证线程安全.synchronized关键字在上一篇博客中已经介绍:自从JDK5开始,添加了另一种锁机制:ReentrantLock. 二者的区别 1.lock是jdk5之后代码层面实现的,synchronized是JVM层面实现的. 2.synchronized在出现异常的时候能够自动释放锁,而lock必须在finally块中unlock()主动释放锁,否则会死锁. 3.在竞争不激烈的时候synchronized的性能是比lock好一点的,但是当竞争

java concurrent之前戏synchronized

对于多线程共享资源的情况需要进行同步,以避免一个线程的改动被另一个线程的改动所覆盖.最普遍的同步方式就是synchronized.把代码声明为synchronized,有两个重要后果,通常是指该代码具有 原子性(atomicity)和 可见性(visibility). 1.原子性强调的是执行,意味着个时刻,只有一个线程能够执行一段代码,这段代码通过一个monitor object保护.从而防止多个线程在更新共享状态时相互冲突. 2.可见性强调的是结果,它要对付内存缓存和编译器优化的各种反常行为.

How to Create a Java Concurrent Program

In this Document   Goal   Solution   Overview   Steps in writing Java Concurrent Program   Template Program:   Program Logic   Program Parameters   Database Operations   Setting request Completion Status   Register executable   Register Concurrent Pr

[Java Concurrent] 并发访问共享资源的简单案例

EvenGenerator 是一个偶数生成器,每调用一个 next() 就会加 2 并返回叠加后结果.在本案例中,充当被共享的资源. EvenChecker 实现了 Runnable 接口,可以启动新的线程执行 run() 任务,用于检测所指向的偶数生成器是否每次都返回偶数值. EvenCheckerThreadDemo 用于演示多线程下的执行情况. 非线性安全版本 EvenGenerator, 偶数生成器,每调用一个 next() 就会加 2 并返回叠加后结果. 这里的 next() 方法并非

Java concurrent AQS 源码详解

一.引言 AQS(同步阻塞队列)是concurrent包下锁机制实现的基础,相信大家在读完本篇博客后会对AQS框架有一个较为清晰的认识 这篇博客主要针对AbstractQueuedSynchronizer的源码进行分析,大致分为三个部分: 静态内部类Node的解析 重要常量以及字段的解析 重要方法的源码详解. 所有的分析仅基于个人的理解,若有不正之处,请谅解和批评指正,不胜感激!!! 二.Node解析 AQS在内部维护了一个同步阻塞队列,下面简称sync queue,该队列的元素即静态内部类No