【死磕Java并发】-----J.U.C之阻塞队列:SynchronousQueue

  【注】:SynchronousQueue实现算法看的晕乎乎的,写了好久才写完,如果当中有什么错误之处,忘各位指正
  
  作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:
  
  没有容量。与其他BlockingQueue不同,是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
  
  因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
  
  分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
  
  若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)。
  
  非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。
  
  与其他BlockingQueue一样,SynchronousQueue同样继承AbstractQueue和实现BlockingQueue接口:
  
  1
  
  2
  
  1
  
  2
  
  提供了两个构造函数:
  
  }
  
  // 通过 fair 值来决定公平性和非公平性
  
  // 公平性使用TransferQueue,非公平性采用
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  、TransferStack继承Transferer,Transferer为www.hjdyl.com SynchronousQueue的内部类,它提供了一个方法transfer(),该方法定义了转移数据的规范,如下:
  
  }
  
  1
  
  2
  
  3
  
  1
  
  2
  
  3
  
  方法主要用来完成转移数据的,如果e != null,相当于将一个数据交给消费者,如果e == null,则相当于从一个生产者接收一个消费者交出的数据。
  
  采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是通过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在中扮演着非常重要的作用,的put、take操作都是委托这两个类来实现的。
  
  是实现公平性策略的核心类,其节点为QNode,其定义如下:
  
  /** 头节点 */
  
  /** 尾节点 */
  
  // 指向一个取消的结点
  
  //当一个节点中最后一个插入时,它被取消了但是可能还没有离开队列
  
  /**
  
  * 省略很多代码
  
  */
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  在TransferQueue中除了头、尾节点外还存在一个cleanMe节点。该节点主要用于标记,当删除的节点是尾节点时则需要使用该节点。
  
  同时,对于TransferQueue需要注意的是,其队列永远都存在一个chuangshi88.cn/ dummy node,在构造时创建:
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  1
  
  2
  
  3
  
  4
  
  5
  
  在TransferQueue中定义了QNode类来表示队列中的节点,QNode节点定义如下:
  
  域
  
  数据项
  
  // 等待线程,用于
  
  //模式,表示当前是数据还是请求,只有当匹配的模式相匹配时才会交换
  
  }
  
  /**
  
  域,在TransferQueue中用于向next推进
  
  */
  
  }
  
  /**
  
  数据项
  
  */
  
  }
  
  /**
  
  * 取消本结点,将item域设置为自身
  
  */
  
  }
  
  /**
  
  * 是否被取消
  
  * 与tryCancel相照应只需要判断item释放等于自身即可
  
  */
  
  }
  
  }
  
  }
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  上面代码没啥好看的,需要注意的一点就是isData,该属性在进行数据交换起到关键性作用,两个线程进行数据交换的时候,必须要两者的模式保持一致。
  
  用于实现非公平性,定义如下:
  
  /**
  
  * 省略一堆代码
  
  */
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  中定义了三个状态:REQUEST表示消费数据的消费者,DATA表示生产数据的生产者,FULFILLING,表示匹配另一个生产者或消费者。任何线程对的操作都属于上述3种状态中的一种(对应着SNode节点的mode)。同时还包含一个head域,表示头结点。
  
  内部节点SNode定义如下:
  
  域
  
  // 相匹配的节点
  
  // 等待的线程
  
  域
  
  // 模型
  
  /**
  
  域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读
  
  */
  
  }
  
  }
  
  /**
  
  * 将s结点与本结点进行匹配,匹配成功,则unpark等待线程
  
  */
  
  }
  
  }
  
  }
  
  }
  
  }
  
  }
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  上面简单介绍了TransferQueue、TransferStack,由于SynchronousQueue的put、take操作都是调用Transfer的transfer()方法,只不过是传递的参数不同而已,put传递的是e参数,所以模式为数据(公平isData = true,非公平mode= DATA),而take操作传递的是null,所以模式为请求(公平isData = false,非公平mode = REQUEST),如下:
  
  操作
  
  }
  
  }
  
  操作
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  公平模式
  
  公平性调用TransferQueue的transfer方法:
  
  // 当前节点模式
  
  // 头、尾节点 为null,没有初始化
  
  // 头尾节点相等(队列为null) 或者当前节点和队列节点模式一样
  
  表示已有其他线程操作了,修改了tail,重新再来
  
  ,表示已经有其他线程添加了节点,tn 推进,重新处理
  
  // 当前线程帮忙推进尾节点,就是尝试将tn设置为尾节点
  
  }
  
  // 调用的方法的 wait 类型的, 并且 超时了, 直接返回
  
  在take操作阐述
  
  ,构建一个新节点Node
  
  // 将新建的节点加入到队列中,如果不成功,继续处理
  
  // 替换尾节点
  
  // 调用awaitFulfill, 若节点是 head.next, 则进行自旋
  
  // 若不是的话, 直接 block, 直到有其他线程 与之匹配, 或它自己进行线程的中断
  
  // 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点
  
  // 清理节点S
  
  }
  
  :用于判断节点是否已经从队列中离开了
  
  // 尝试将S节点设置为head,移出t
  
  // 释放线程 ref
  
  }
  
  // 返回
  
  }
  
  // 这里是从head.next开始,因为TransferQueue总是会存在一个dummy node节点
  
  // 节点
  
  // 不一致读,重新开始
  
  // 有其他线程更改了线程结构
  
  /**
  
  * 生产者producer和消费者consumer匹配操作
  
  */
  
  :判断isData与x的模式是否相同,相同表示已经匹配了
  
  :m节点被取消了
  
  :如果尝试将数据e设置到m上失败
  
  // 将m设置为头结点,h出列,然后重试
  
  }
  
  // 成功匹配了,m设置为头结点h出列,向前推进
  
  // 唤醒m上的等待线程
  
  }
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  69
  
  70
  
  71
  
  72
  
  73
  
  74
  
  75
  
  76
  
  77
  
  78
  
  79
  
  80
  
  81
  
  82
  
  83
  
  84
  
  85
  
  86
  
  87
  
  88
  
  89
  
  90
  
  91
  
  92
  
  93
  
  94
  
  95
  
  96
  
  97
  
  98
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  69
  
  70
  
  71
  
  72
  
  73
  
  74
  
  75
  
  76
  
  77
  
  78
  
  79
  
  80
  
  81
  
  82
  
  83
  
  84
  
  85
  
  86
  
  87
  
  88
  
  89
  
  90
  
  91
  
  92
  
  93
  
  94
  
  95
  
  96
  
  97
  
  98
  
  整个transfer的算法如下:
  
  1. 如果队列为null或者尾节点模式与当前节点模式一致,则尝试将节点加入到等待队列中(采用自旋的方式),直到被匹配或、超时或者取消。匹配成功的话要么返回null(producer返回的)要么返回真正传递的值(consumer返回的),如果返回的是node节点本身则表示当前线程超时或者取消了。
  
  2. 如果队列不为null,且队列的节点是当前节点匹配的节点,则进行数据的传递匹配并返回匹配节点的数据
  
  3. 在整个过程中都会检测并帮助其他线程推进
  
  当队列为空时,节点入列然后通过调用awaitFulfill()方法自旋,该方法主要用于自旋/阻塞节点,直到节点被匹配返回或者取消、中断。
  
  // 超时控制
  
  // 自旋次数
  
  // 如果节点Node恰好是head节点,则自旋一段时间,这里主要是为了效率问题,如果里面阻塞,会存在唤醒、线程上下文切换的问题
  
  // 如果生产者、消费者者里面到来的话,就避免了这个阻塞的过程
  
  // 自旋
  
  // 线程中断了,剔除当前节点
  
  // 如果线程进行了阻塞 -> 唤醒或者中断了,那么x != e 肯定成立,直接返回当前节点即可
  
  // 超时判断
  
  // 如果超时了,取消节点,continue,在if(x != e)肯定会成立,直接返回x
  
  }
  
  }
  
  // 自旋- 1
  
  // 等待线程
  
  // 进行没有超时的
  
  // 自旋次数过了, 直接 + timeout 方式
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  在自旋/阻塞过程中做了一点优化,就是判断当前节点是否为对头元素,如果是的则先自旋,如果自旋次数过了,则才阻塞,这样做的主要目的就在如果生产者、消费者立马来匹配了则不需要阻塞,因为阻塞、唤醒会消耗资源。在整个自旋的过程中会不断判断是否超时或者中断了,如果中断或者超时了则调用tryCancel()取消该节点。
  
  }
  
  1
  
  2
  
  3
  
  1
  
  2
  
  3
  
  取消过程就是将节点的item设置为自身(itemOffset是item的偏移量)。所以在调用awaitFulfill()方法时,如果当前线程被取消、中断、超时了那么返回的值肯定时S,否则返回的则是匹配的节点。如果返回值是节点S,那么if(x == s)必定成立,如下:
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  1
  
  2
  
  3
  
  4
  
  5
  
  如果返回的x == s成立,则调用clean()方法清理节点S:
  
  //
  
  节点被取消了,向前推进
  
  }
  
  // 队列为空,直接
  
  // 不一致,说明有其他线程改变了tail节点,重新开始
  
  推进tail节点,重新开始
  
  }
  
  不是尾节点 移出
  
  // 如果s已经被移除退出循环,否则尝试断开s
  
  }
  
  // s是尾节点,则有可能会有其他线程在添加新节点,则cleanMe出场
  
  // 如果dp不为null,说明是前一个被取消节点,将其移除
  
  节点d已经删除
  
  原来的节点 cleanMe 已经通过 advanceHead 进行删除
  
  原来的节点 s已经删除
  
  不是tail节点
  
  // 清除 cleanMe 节点, 这里的 dp == pred 若成立, 说明清除节点s,成功, 直接return, 不然的话要再次循环
  
  原来的 cleanMe 是 null, 则将 pred 标记为 cleamMe 为下次 清除 s 节点做标识
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  这个clean()方法感觉有点儿难度,我也看得不是很懂。这里是引用
  
  删除的节点不是queue尾节点, 这时 直接 pred.casNext(s, s.next) 方式来进行删除(和ConcurrentLikedQueue中差不多)
  
  删除的节点是队尾节点
  
  此时 cleanMe == null, 则 前继节点pred标记为 cleanMe, 为下次删除做准备
  
  此时 != null, 先删除上次需要删除的节点, 然后将至null, 让后再将 pred 赋值给
  
  非公平模式
  
  非公平模式transfer方法如下:
  
  // 栈为空或者当前节点模式与头节点模式一样,将节点压入栈内,等待匹配
  
  // 超时
  
  // 节点被取消了,向前推进
  
  // 重新设置头结点(弹出之前的头结点)
  
  else
  
  }
  
  // 不超时
  
  // 生成一个SNode节点,并尝试替换掉头节点
  
  // 自旋,等待线程匹配
  
  // 返回的m == s 表示该节点被取消了或者超时、中断了
  
  // 清理节点S,
  
  }
  
  // 因为通过前面一步将S替换成了head,如果h.next == s ,则表示有其他节点插入到S前面了,变成了head
  
  // 且该节点就是与节点S匹配的节点
  
  // 将s.next节点设置为head,相当于取消节点h、s
  
  // 如果是请求则返回匹配的域,否则返回节点S的域
  
  }
  
  }
  
  // 如果栈不为null,且两者模式不匹配(h != null www.078881.cn/ && h.mode != mode)
  
  // 说明他们是一队对等匹配的节点,尝试用当前节点s来满足h节点
  
  节点已经取消了,向前推进
  
  // 尝试将当前节点打上"正在匹配"的标记,并设置为head
  
  // 循环loop
  
  // s为当前节点,m是s的next节点,
  
  // m节点是s节点的匹配节点
  
  ,其他节点把m节点匹配走了
  
  // 将s弹出
  
  // 将s置空,下轮循环的时候还会新建
  
  // 退出该循环,继续主循环
  
  }
  
  // 获取m的next节点
  
  // 尝试匹配
  
  // 匹配成功,将s 、 m弹出
  
  // 如果没有匹配成功,说明有其他线程已经匹配了,把m移出
  
  }
  
  }
  
  }
  
  // 到这最后一步说明节点正在匹配阶段
  
  的next的节点,是正在匹配的节点,m 和 h配对
  
  其他线程把m节点抢走了,弹出h节点
  
  else
  
  }
  
  }
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  69
  
  70
  
  71
  
  72
  
  73
  
  74
  
  75
  
  76
  
  77
  
  78
  
  79
  
  80
  
  81
  
  82
  
  83
  
  84
  
  85
  
  86
  
  87
  
  88
  
  89
  
  90
  
  91
  
  92
  
  93
  
  94
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  69
  
  70
  
  71
  
  72
  
  73
  
  74
  
  75
  
  76
  
  77
  
  78
  
  79
  
  80
  
  81
  
  82
  
  83
  
  84
  
  85
  
  86
  
  87
  
  88
  
  89
  
  90
  
  91
  
  92
  
  93
  
  94
  
  整个处理过程分为三种情况,具体如下:
  
  1. 如果当前栈为空获取节点模式与栈顶模式一样,则尝试将节点加入栈内,同时通过自旋方式等待节点匹配,最后返回匹配的节点或者null(被取消)
  
  2. 如果栈不为空且节点的模式与首节点模式匹配,则尝试将该节点打上FULFILLING标记,然后加入栈中,与相应的节点匹配,成功后将这两个节点弹出栈并返回匹配节点的数据
  
  3. 如果有节点在匹配,那么帮助这个节点完成匹配和出栈操作,然后在主循环中继续执行
  
  当节点加入栈内后,通过调用awaitFulfill()方法自旋等待节点匹配:
  
  // 超时
  
  // 当前线程
  
  // 自旋次数
  
  用于检测当前节点是否需要自旋
  
  // 如果栈为空、该节点是首节点或者该节点是匹配节点,则先采用自旋,否则阻塞
  
  // 线程中断了,取消该节点
  
  // 匹配节点
  
  // 如果匹配节点m不为空,则表示匹配成功,直接返回
  
  // 超时
  
  // 节点超时,取消
  
  }
  
  }
  
  // 自旋;每次自旋的时候都需要检查自身是否满足自旋条件,满足就 - 1,否则为0
  
  // 第一次阻塞时,会将当前线程设置到s上
  
  // 阻塞 当前线程
  
  // 超时
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  方法会一直自旋/阻塞直到匹配节点。在S节点阻塞之前会先调用shouldSpin()方法判断是否采用自旋方式,为的就是如果有生产者或者消费者马上到来,就不需要阻塞了,在多核条件下这种优化是有必要的。同时在调用park()阻塞之前会将当前线程设置到S节点的waiter上。匹配成功,返回匹配节点m。
  
  方法如下:
  
  }
  
  1
  
  2
  
  3
  
  4
  
  1
  
  2
  
  3
  
  4
  
  同时在阻塞过程中会一直检测当前线程是否中断了,如果中断了,则调用tryCancel()方法取消该节点,取消过程就是将当前节点的math设置为当前节点。所以如果线程中断了,那么在返回m时一定是S节点自身。
  
  }
  
  1
  
  2
  
  3
  
  1
  
  2
  
  3
  
  方法如果返回的m == s,则表示当前节点已经中断取消了,则需要调用clean()方法,清理节点S:
  
  // 清理item域
  
  // 清理waiter域
  
  节点
  
  // 从栈顶head节点,取消从栈顶head到past节点之间所有已经取消的节点
  
  // 注意:这里如果遇到一个节点没有取消,则会退出
  
  如果p节点已经取消了,则剔除该节点
  
  // 如果经历上面while p节点还没有取消,则再次循环取消掉所有p 到past之间的取消节点
  
  else
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  方法就是将head节点到S节点之间所有已经取消的节点全部移出。【不清楚为何要用两个while,一个不行么】
  
  至此,SynchronousQueue的源码分析完成了,说下我个人感觉吧:个人感觉SynchronousQueue实现好复杂(可能是自己智商不够吧~~(>_<)~~),源码看了好久,这篇博客写了将近一个星期,如果有什么错误之处,烦请各位指正!!

时间: 2024-09-30 13:16:44

【死磕Java并发】-----J.U.C之阻塞队列:SynchronousQueue的相关文章

死磕Java并发-----J.U.C之Java并发容器:ConcurrentHashMap

HashMap是我们用得非常频繁的一个集合,但是由于它是非线程安全的,在多线程环境下,put操作是有可能产生死循环的,导致CPU利用率接近100%.为了解决该问题,提供了Hashtable和Collections.synchronizedMap(hashMap)两种解决方案,但是这两种方案都是对读写加锁,独占式,一个线程在读时其他线程必须等待,吞吐量较低,性能较为低下.故而Doug Lea大神给我们提供了高性能的线程安全HashMap:ConcurrentHashMap. ConcurrentH

【死磕Java并发】-----J.U.C之重入锁:ReentrantLock

此篇博客所有源码均来自JDK 1.8 ReentrantLock,可重入锁,是一种递归无阻塞的同步机制.它可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更强大.灵活的锁机制,可以减少死锁发生的概率. API介绍如下: 一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大.ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥

【死磕Java并发】-----J.U.C之AQS:CLH同步队列

此篇博客所有源码均来自JDK 1.8 在上篇博客[死磕Java并发]-–J.U.C之AQS:AQS简介中提到了AQS内部维护着一个FIFO队列,该队列就是CLH同步队列. CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态. 在CLH同步队列中,一个节点表示一个线程

【死磕Java并发】-----J.U.C之读写锁:ReentrantReadWriteLock

此篇博客所有源码均来自JDK 1.8 重入锁ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,但是在大多数场景下,大部分时间都是提供读服务,而写服务占有的时间较少.然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低.所以就提供了读写锁. 读写锁维护着一对锁,一个读锁和一个写锁.通过分离读锁和写锁,使得并发性比一般的排他锁有了较大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会被阻塞. 读写锁的主要特

【死磕Java并发】-----Java内存模型之分析volatile

前篇博客[死磕Java并发]-–深入分析volatile的实现原理 中已经阐述了volatile的特性了: volatile可见性:对一个volatile的读,总可以看到对这个变量最终的写: volatile原子性:volatile对单个读/写具有原子性(32位Long.Double),但是复合操作除外,例如i++; JVM底层采用"内存屏障"来实现volatile语义 下面LZ就通过happens-before原则和volatile的内存语义两个方向介绍volatile. volat

Java并发编程(十一)-- 阻塞队列

在介绍Java的阻塞队列之前,我们简单介绍一下队列. 队列 队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们.队列会自动平衡负载.如果第一个线程集运行得比第二个慢,则第二个 线程集

【死磕Java并发】-----J.U.C之AQS:阻塞和唤醒线程

此篇博客所有源码均来自JDK 1.8 在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued(): if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; 通过这段代码我们可以看到,在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的

【死磕Java并发】-----J.U.C之Condition

此篇博客所有源码均来自JDK 1.8 在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait().notify()系列方法可以实现等待/通知模式.在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待.唤醒操作更加详细和灵活.下图是Condition与Object的监视器方法的对比(摘自<Java并发编程的艺术>): Condition提供了一系列的方法来对阻塞和唤醒线程: a

【死磕Java并发】—–J.U.C之AQS(一篇就够了)

[隐藏目录] 1 独占式 1.1 独占式同步状态获取 1.2 独占式获取响应中断 1.3 独占式超时获取 1.4 独占式同步状态释放 2 共享式 2.1 共享式同步状态获取 2.2 共享式同步状态释放 3 参考资料 此篇博客所有源码均来自JDK 1.8 在前面提到过,AQS是构建Java同步组件的基础,我们期待它能够成为实现大部分同步需求的基础.AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做,AQS提供了大量的模板方法来实

【死磕Java并发】-----深入分析volatile的实现原理

通过前面一章我们了解了synchronized是一个重量级的锁,虽然JVM对它做了很多优化,而下面介绍的volatile则是轻量级的synchronized.如果一个变量使用volatile,则它比使用synchronized的成本更加低,因为它不会引起线程上下文的切换和调度.Java语言规范对volatile的定义如下: Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量. 上面比较绕口,通俗点讲就是说一个变量如果用volatil