[源码]源代码解析 SynchronousQueue

简析SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue

三者都是blockingQueue.

LinkedBlockingQueue,ArrayBlockingQueue

有界,默认是Integer.Max;

SynchronousQueue没什么界不界的概念.之所以这么说.是因为它的操作必须成对.

注记方案:

oppo(oppo手机)是一对,offer和pool不阻塞

ppt是一对.put和take都阻塞.

1. 里面没有任何数据.调用offer()无法成功,返回flase,表示填充失败.调用put被阻塞,直到有人take或者poll, 反之亦然,如下.

2. 先take,被阻塞,直到有一个线程来offer,或者put.

两个不同的互补碰撞发生匹配完成(fullfill). 之前的take的线程被唤醒获得offer的提供的数据.

public static void main(String[] args) throws InterruptedException {
	final	SynchronousQueue<Long> workQueue = new SynchronousQueue<Long>();
		boolean offer = workQueue.offer(2L);
		System.out.println("main thread: offer="+offer);
		ExecutorService newCachedThreadPool =Executors.newCachedThreadPool();
// 内部实现是 <span style="font-family: 'Microsoft YaHei';">new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); </span>
		newCachedThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("take thread: begin take and thread will be blocked by call  park(await) method");
					Long take = workQueue.take();
					System.out.println("take thread:   take suceffull , take object="+take);

				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		});
		Thread.sleep(2000);
		System.out.println("main thread:  after sleep ");
	        newCachedThreadPool.execute(new Runnable() {
			@Override
			public void run() {
					long object = 123L;
					System.out.println("offer thead: begin offer "+object);
					boolean offer = workQueue.offer(object);
					System.out.println("offer thead: finish offer , is sucefully ? "+offer+" , if true, SynchronousQueue will unpark(notify) the take thread ");
			}
		});
	newCachedThreadPool.shutdown();
	}

输出:

main thread: offer=false
take thread: begin take and thread will be blocked by call  park(await) method
main thread:  after sleep
offer thead: begin offer 123
take thread:   take suceffull , take object=123
offer thead: finish offer , is sucefully ? true , if true, SynchronousQueue will unpark(notify) the take thread 

关键术语解析:

和其他queue不同的是SynchronousQueue的take()函数调用也有可能被添加到queue里,变成一个节点(未匹配时.)

Node类型一共分层两种. 一种是 isDate=true. (对应offer , put 函数) 一种是isDate=false (对应 take函数)

dual queue:dual的含义就好理解了,因为只有两类,可以当isDate=true和isDate=false遇到时会匹配.可翻译为

成双的,对偶的. 对偶队列.

same mode: 相同的模式(isDate都=true,或者isDate都=false).比如take产生的Node和前面已经放入到队列中的take动作Node就属于同一个模式

fulfill(从SynchronousQueue下面的一个注释我们可以理解,具体见本文下载的摘抄中的红体字):

字面英文翻译,完成.具体到算法里的含义是一个动作和之前的complementary(译为互补)的动作得到匹配.

complementary :互补的.比如先take,放到队列中.后面来一个offer动作就是complementary
(互补)

=============

SynchronousQueue下面的一个部分注释部分翻译.

/*

* This class implements extensions of the dual stack and dual

* queue algorithms described in "Nonblocking Concurrent Objects

* with Condition Synchronization", by W. N. Scherer III and

* M. L. Scott.  18th Annual Conf. on Distributed Computing,

* Oct. 2004 (see also

*

http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html

).参照的算法

* The (Lifo) stack is used for non-fair mode, and the (Fifo)

* queue for fair mode. The performance of the two is generally

* similar. Fifo usually supports higher throughput under

* contention but Lifo maintains higher thread locality in common

* applications.

*

* A dual queue (and similarly stack) is one that at any given

* time either holds "data" -- items provided by put operations,

* or "requests" -- slots representing take operations, or is

* empty. A call to "

fulfill

" 翻译: 一个目的是为了"完成"的调用 (i.e., a call requesting an item

* from a queue holding data or vice versa 翻译: 当queue里有数据时,一个调用请求数据,就会得到匹配. 这样的调用称为实现完成的调用 ) dequeues a

*

complementary

node.  (翻译: 会让一个互补的节点退出队列)The most interesting feature of these

* queues is that any operation can figure out which mode the

* queue is in, and act accordingly without needing locks.

下面还有一些注释未剪贴上来,比较了java实现的算法和借鉴的算法(见注释中网址)有何区别

* The algorithms here differ from the versions in the above paper

* in extending them for use in synchronous queues, as well as

* dealing with cancellation. The main differences include:

*

*  1. The original algorithms used bit-marked pointers, but

*     the ones here use mode bits in nodes, leading to a number

*     of further adaptations.

*  2. SynchronousQueues must block threads waiting to become

*     fulfilled.

*  3. Support for cancellation via timeout and interrupts,

*     including cleaning out cancelled nodes/threads

*     from lists to avoid garbage retention and memory depletion.

=============再来看看SynchronousQueue.TransferQueue.transfer下面的注释.=========

/**

* Puts or takes an item.

*/

Object transfer(Object e, boolean timed, long nanos) {

/* Basic algorithm is to loop trying to take either of

* two actions:

*

* 1. If queue apparently empty or holding same-mode nodes,

*    try to add node to queue of waiters, wait to be

*    fulfilled (or cancelled) and return matching item.

*    了解了上面的几个术语概念.就很容易明白这句话的含义.

当队列是空,或者是同一种Mode时,直接放入到列队尾.不会完成(fullfill)

* 2. If queue apparently contains waiting items, and this

*    call is of complementary mode, try to

fulfill

by CAS‘ing

*    item field of waiting node and

dequeuing

it, and then

*    returning matching item.

*

* In each case, along the way, check for and try to help

* advance head and tail on behalf of other stalled/slow

* threads.

*

* The loop starts off with a null check guarding against

* seeing uninitialized head or tail values. This never

* happens in current SynchronousQueue, but could if

* callers held non-volatile/final ref to the

* transferer. The check is here anyway because it places

* null checks at top of loop, which is usually faster

* than having them implicitly interspersed.

*/

QNode s = null; // constructed/reused as needed

boolean isData = (e != null);

for (;;) {

QNode t = tail;

QNode h = head;

if (t == null || h == null)         // saw uninitialized value

continue;                       // spin

if (h == t || t.isData == isData) { // empty or same-mode

//楼主注:①

QNode tn = t.next;

if (t != tail)                  // inconsistent read

continue;

if (tn != null) {               // lagging tail

advanceTail(t, tn);

continue;

}

if (timed && nanos <= 0)        // can‘t wait

return null;

if (s == null)

s = new QNode(e, isData);

if (!t.casNext(null, s))        // failed to link in

continue;

advanceTail(t, s);              // swing tail and wait

Object x = awaitFulfill(s, e, timed, nanos); // 楼主注: 会被阻塞,等待其他线程互补操作时唤醒

if (x == s) {                   // wait was cancelled

clean(t, s);

return null;

}

if (!s.isOffList()) {           // not already unlinked

advanceHead(t, s);          // unlink if head

if (x != null)              // and forget fields

s.item = s;

s.waiter = null;

}

return (x != null)? x : e; // 楼主注: 因为是互补匹配的.要么x=null 要么 e=null  返回一个非null的值即可.不管该线程调用的是take还是put,都返回数据.

} else {                            // complementary-mode  //楼主注:②

QNode m = h.next;               // node to fulfill

if (t != tail || m == null || h != head)

continue;                   // inconsistent read

Object x = m.item;

if (isData == (x != null) ||

// m already fulfilled

x == m ||                   // m cancelled

!m.casItem(x, e)) {         // lost CAS

advanceHead(h, m);          // dequeue and retry

continue;

}

advanceHead(h, m);              // successfully fulfilled

LockSupport.unpark(m.waiter);

return (x != null)? x : e;    // 楼主注: 因为是互补匹配的.要么x=null 要么 e=null  返回一个非null的值即可.不管该线程调用的是take还是put,都返回数据.

}

}

}

我一开始没理解的一点是:

当一个head是 isDate=false , tail是isDate=true时, 一个线程进来的操作是isDate=false时.

不会进入①,进入②后看代码又无法和head完成匹配(fullfill).

后来想明白了,这种情况不会发生.因为tail是isDate=true,这个会与head完成匹配(fullfill).换句话说.

队列里tail和head肯定是same mode.所以当①判断失败,进入②后,肯定能和head完成匹配(fulfill)

[源码]源代码解析 SynchronousQueue

时间: 2024-12-05 10:12:46

[源码]源代码解析 SynchronousQueue的相关文章

Retrofit源码设计模式解析(下)

本文将接着<Retrofit源码设计模式解析(上)>,继续分享以下设计模式在Retrofit中的应用: 适配器模式 策略模式 观察者模式 单例模式 原型模式 享元模式 一.适配器模式 在上篇说明CallAdapter.Factory使用工厂模式时,提到CallAdapter本身采用了适配器模式.适配器模式将一个接口转换成客户端希望的另一个接口,使接口本不兼容的类可以一起工作. Call接口是Retrofit内置的发送请求给服务器并且返回响应体的调用接口,包括同步.异步请求,查询.取消.复制等功

ngx lua模块源码简单解析

ngx lua模块源码简单解析分类: nginx 2014-07-11 11:45 2097人阅读 评论(0) 收藏 举报nginxlua数据结构架构目录(?)[+]对nginx lua模块的整个流程,原理简单解析.由于nginx lua模块相关配置,指令,API非常多,所以本文档只以content_by_lua指令举例说明. 读本文档最好配合读源码. 不适合对nginx和lua一点都不了解的人看.1.相关配置详细配置见 https://github.com/openresty/lua-ngin

基于Linux平台下网络病毒Caem.c源码及解析

Came.c型病毒在这里主要修改了用户的密码,同时对用户的终端设备进行了监视.希望与大家共同交流 转载请注明出处:http://blog.csdn.net/u010484477     O(∩_∩)O谢谢 #define HOME "/" #define TIOCSCTTY 0x540E #define TIOCGWINSZ 0x5413 #define TIOCSWINSZ 0x5414 #define ECHAR 0x1d #define PORT 39617 #define BU

《Android源码设计模式解析》读书笔记——Android中你应该知道的设计模式

断断续续的,<Android源码设计模式解析>也看了一遍,书中提到了很多的设计模式,但是有部分在开发中见到的几率很小,所以掌握不了也没有太大影响. 我觉得这本书的最大价值有两点,一个是从设计模式的角度去理解Android源码,结合着日常开发中的常用类,对设计模式的理解会更加的深刻:另外一个好处就是了解常用模式,再看其他人写的代码的时候,更容易理解代码思路.下面是我的读书笔记和一些思考,设计模式只整理我认为重要的部分. 建造者模式 建造者模式最明显的标志就是Build类,而在Android中最常

第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本課主題 Master 资源调度的源码鉴赏 Spark 的 Worker 是基于什么逻辑去启动 Executor 资源调度管理 任務調度與資源是通過 DAGScheduler.TaskScheduler.SchedulerBackend 等進行的作業調度 資源調度是指應用程序如何獲得資源 任務調度是在資源調度的基礎上進行的,沒有資源調度那麼任務調度就成為了無源之水無本之木 Master 资源调度的源码鉴赏 因為 Master 負責資源管理和調度,所以資源調度方法 scheduer 位於 Mast

《Android源码设计模式解析与实战》读书笔记(十三)

第十三章.备忘录模式 备忘录模式是一种行为模式,该模式用于保存对象当前的状态,并且在之后可以再次恢复到此状态,有点像是我们平常说的"后悔药". 1.定义 在不破坏封闭的前提下,捕获一个对象的内部状态,并在该对象之外保存这个状态,这样,以后就可将该对象恢复到原先保存的状态. 2.使用场景 (1)需要保存一个对象在某一个时刻的状态或部分状态. (2)如果用一个接口来让其他对象得到这些状态,将会暴露对象的实现细节并破坏对象的封装性,一个对象不希望外界直接访问其内部状态,通过中间对象可以间接访

源码深度解析SpringMvc请求运行机制(转)

源码深度解析SpringMvc请求运行机制 本文依赖的是springmvc4.0.5.RELEASE,通过源码深度解析了解springMvc的请求运行机制.通过源码我们可以知道从客户端发送一个URL请求给springMvc开始,到返回数据给客户端期间是怎么运转的. 1.用户请求处理过程: 1.用户发送请求时会先从DispathcherServler的doService方法开始,在该方法中会将ApplicationContext.localeResolver.themeResolver等对象添加到

iOS开发之Masonry框架源码深度解析

Masonry是iOS在控件布局中经常使用的一个轻量级框架,Masonry让NSLayoutConstraint使用起来更为简洁.Masonry简化了NSLayoutConstraint的使用方式,让我们可以以链式的方式为我们的控件指定约束.本篇博客的主题不是教你如何去使用Masonry框架的,而是对Masonry框架的源码进行解析,让你明白Masonry是如何对NSLayoutConstraint进行封装的,以及Masonry框架中的各个部分所扮演的角色是什么样的.在Masonry框架中,仔细

Spring源码深度解析第一天

其实第一天已经过去了,今天是第二天.iteye刚注册的小号就被封了.不论是它的失误还是他的失误总之我跟iteye是没有缘分了. 昨天基本没有进展.所以从今天开始说了.下面流水账开始了. <Spring源码深度解析>这本书没有pdf完整版是让我很失望的.如果有完整版即使看完了我也会选择买一本实体如果有用的话. 书中说从github下载源码.发现github没有想象中的简单易懂.还需要记忆很多命令才能玩得转.从github上获得了Spring源码后需要使用Gradle来编译成eclipse项目.g