Java之集合(二十一)LinkedTransferQueue

  转载请注明源出处:http://www.cnblogs.com/lighten/p/7505355.html

1.前言

  本章介绍无界的阻塞队列LinkedTransferQueue,JDK7才提供了这个类,所以这个类具备了一些一般队列不具有的特性。此队列也是基于链表的,对于所有给定的生产者都是先入先出的。注意,该队列的size方法和ConcurrentLinkedQueue一样不是常量时间。由于队列的实现,其需要遍历队列才能计算出队列的大小,这期间队列发生的改变,遍历的结果会不正确。bulk操作并不保证原子性,比如迭代器迭代的时候执行addAll()方法,迭代器可能只能看到部分新加的元素。

2.LinkedTransferQueue

2.1 TransferQueue接口

  这个是JDK7才定义的一个节点,LinkedTransferQueue实现了这个接口,其新特性也就与之有关。通常阻塞队里中,生产者放入元素,消费者使用元素,这两个部分是分离的。这里的分离意思如下:厨师做好了菜放在柜台上,服务员端走,厨师是不需要管有没有人取走做的菜,服务员也不需要管厨师有没有做好菜,没做好菜阻塞就行了。上面就是个人所说的分离的意思。TransferQueue接口定义的相关内容就是厨师会知道做好的菜有没有被取走。

  1.tryTransfer(E):将元素立刻给消费者。准确的说就是立刻给一个等待接收元素的线程,如果没有消费者就会返回false,而不将元素放入队列。

  2.transfer(E):将元素给消费者,如果没有消费者就会等待。

  3.tryTransfer(E,long,TimeUnit):将元素立刻给消费者,如果没有就等待指定时间。给失败返回false。

  4.hasWaitingConsumer():返回当前是否有消费者在等待元素。

  5.getWaitingConsumerCount():返回等待元素的消费者个数。

2.2 设计原理

  Dual Queues是该队列的基础理论。此队列不进存放数据节点,也会存放请求节点。当一个线程试图放入一个数据节点,正好遇到一个请求数据的结点,会立刻匹配并移除该数据节点,对于请求节点入队列也是一样的。Blocking Dual Queues阻塞所有未匹配的线程,直到有匹配的线程出现。

  一个先入先出的dual queue实现是无锁队列算法M&S的变体。其包含两个指向字段:head指向一个匹配的结点,然后依次指向未匹配的结点,如果不为空。tail指向最后一个节点,或者null,如果队列为空。例如下图是一个包含四个元素的队列结构:

  M&S算法易于扩展和保持(通过CAS)这些头部和尾指针。在dual队列中,节点需要自动维护匹配状态。所以这里需要一些必要的变量:对于数据模式,匹配需要将一个item字段通过CAS从非null的数据转成null,反之对于请求模式,需要从null变成data。一旦一个节点匹配了,其状态将不再改变。因此通常安排元素链表的前缀是0个或多个匹配节点,而后跟随0个或多个未匹配节点。如果不关心时间或空间的效率,通过从头指针开始遍历队列放入取出操作都是对的。CAS操作第一个未匹配节点匹配时的item,在下一个字段追加后一个节点。然而这是一个糟糕的想法,虽然其确实有好处,不需对head或tail进行原子更新。

  LinkedTransferQueue采取了一种折中的方案,介于实时更新head/tail和不更新head/tail之间的方法。该方法对有时候需要额外的遍历去定位第一个或最后一个未匹配的结点和减少开销及队列结点的竞争更新这两个方面进行了权衡。例如,一个可能出现的队列快照如下图:

  slack(head位置和第一个未匹配的结点的最大距离,尾结点类似)的最佳值是一个经验问题,发现在1~3之间在大部分平台是最佳的值。更大的值会增加内存命中开销和长遍历链表的风险,更小的值则会增加CAS的竞争开销。

  具体实现:使用一个基础的threshold来更新,slack为2。所以在当前位置超过第一个或最后一个节点2个距离以上的时候就会更新head/tail。出入队列操作都是通过xfer方法完成的,只需要不同的参数来表示操作。

  其它的内容通过代码详细介绍。

2.3 数据结构

  上图是一个基本的数据结构:

    MP表示是否是多核处理器;

    FRONT_SPINS当一个节点目前是队列的第一个等待者,在多核处理器上自旋的次数2n

    CHAINED_SPINS当一个节点先于另一个明显自旋的结点阻塞时自旋的次数。

  上图是Node节点的基本结构,next就是下一个节点了,isData表示是请求还是数据节点,另外两个字段就是对应不同模式要存储的值了。Node的基本操作如下:

    casNext:CAS更新当前结点next的字段

    casItem:CAS更新当前结点的item字段

    forgetNext:CAS设置当前结点的next字段为自身

    forgetContents:CAS设置item字段为自身,waiter为null

    isMatched:是否是匹配了的结点

    isUnmatchedRequest:是否是未匹配的请求节点

    cannotPrecede:当该节点是未匹配节点却与当前的结点类型不符的时候,返回true。意思就是当前都是请求节点,数据节点应该立刻被消耗,未匹配的结点应该是同一种节点。

    tryMatchData:数据节点尝试匹配  

2.4 基本操作

  存入一个元素:该队列的put、offer和offer(E,timeout,unit)方法所调用的都是同一个方法。

  不允许放入的数据为空,放入操作的模式是ASYNC。从头指针处开始死循环,当前结点p没有被匹配,数据节点不能匹配直接跳出循环,不进行匹配,后面会进入how!=NOW的判断,创建新节点,尝试追加到队列尾。如果可以匹配就替换P节点的值,失败意味着被其它线程抢先了,继续循环,成功了意味着这两个匹配成功,可能需要更新头结点。q=p且p!=h的循环意味着已经跳过了一个元素,n又取了q.next,p又是当前被匹配了的结点,这就意味着前面有2个match的结点:head和p。达到slack为2的条件,更新头结点,并遗弃之前的head。不需要更新头结点的时候直接跳出循环。匹配完成之后就是唤醒p结点的waiter(如果p是请求节点的话)返回item。

  队列尾追加节点操作如上图,从尾结点开始,

    如果当前结点p为null,头结点也是null,初始化队列,设置头结点,返回s追加节点。

    如果该节点不能放入队列,返回null。

    如果p.next不为空,意味着当前结点不是尾结点,重新找到尾结点,继续循环。

    如果p节点在设置的时候被插队了,继续找其下一个循环

    如果成功了,p!=t,且tail也不等于t。意味着有尾结点后面又追加了2个节点,slack>=2更新尾结点。返回p节点。

  取出都是消费者data为null,poll的模式是NOW,有时间限制就是TIMED,take方法使用的是SYNC。回到xfer方法,我们可以知道:其先找到第一个未匹配的元素进行匹配,匹配了不管什么模式都是直接返回,没匹配就要根据模式来了,先是how!=NOW才会有额外操作,所以poll取出就是NOW,取不到那就是没准备好,直接返回就可以了。其它三种模式没匹配到都会尝试追加该节点,没追加上肯定是模式不匹配,意味着可以匹配的,重新循环。如果不是ASYNC模式,那就是带有时间或异步的模式,需要等待。

  以上就是整个类的设计思路了,分成四种模式:NOW就是立刻返回不追加元素到末尾,ASYNC就是同步需要添加元素到队列尾,TIMED用于有时间限制的操作,SYNC用于无时间限制无限等待的操作。awaitMatch方法不再进行介绍,就是等到指定时间。size方法和getWaitingConsumerCount方法都是遍历链表,超过Integer.MAX_VALUE就返回这个值,区别就是该链表是处于什么模式而已。其它的方法不再描述,上面是基本的操作。

3.使用例子

    @Test
	public void testTransfer() {
		LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(500);	// 再改成1500
					System.out.println(Thread.currentThread().getName()+"-"+queue.take());
					System.out.println(Thread.currentThread().getName()+"-"+queue.take());
					System.out.println(Thread.currentThread().getName()+"-"+queue.take());
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		},"consumer").start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName()+"-"+queue.tryTransfer(1));
				try {
					System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.tryTransfer(2, 1, TimeUnit.SECONDS));
					queue.transfer(3);
					System.out.println(Thread.currentThread().getName()+"-"+"等到3被消费:true");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"prodcuer").start();
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

  更改1500毫秒之后:

时间: 2024-10-10 07:04:04

Java之集合(二十一)LinkedTransferQueue的相关文章

Java Web总结二十一Listener监听器

一.事件三要素 1.事件源:操作事件的对象,例如:窗体Frame 2.事件监听器:事件监听器监听事件源,例如WindowListner,它是一个接口 3.事件,例如:单击事件,通过事件,可以取得事件源 二.适配器模式 1.当一个接口有较多的方法时,而实现类只需对其中少数几个实现,此时可以使用适配器模式 2.适配器模式常用于GUI编程 三.八种Web监听器 1.Web中有三个事件源,分别是ServletContext->HttpSession->ServletRequest 2.ServletC

Java从零开始学二十一(集合List接口)

一.List接口 List是Collection的子接口,里面可以保存各个重复的内容,此接口的定义如下: public interface List<E> extends Collection<E> 二.List接口的扩展方法 No. 方法 类型 描述 1 public void add(int index, E element) 普通 在指定位置增加元素 2 public boolean addAll(int index, Collection<? extends E>

Java之集合(二十六)ConcurrentSkipListMap

转载请注明源出处:http://www.cnblogs.com/lighten/p/7542578.html 1.前言 一个可伸缩的并发实现,这个map实现了排序功能,默认使用的是对象自身的compareTo方法,如果提供了比较器,使用比较器的比较方法.简单来说ConcurrentSkipListMap是TreeMap的并发实现,但是为什么没有称之为ConcurrentTreeMap呢?这和其自身的实现有关.该类是SkipLists的变种实现,提供了log(n)的时间开销:containsKey

Java学习系列(二十一)Java面向对象之注解详解

转载请注明出处:http://blog.csdn.net/lhy_ycu/article/details/45295947 一.前言 Java中的注解Annotation运用到很多方面,比如之前讲的单元测试中的@Test.Spring.SpringMVC还有其他框架等等.Java本身自带的注解也有一些,比如:@Override(子类要重写/覆写父类的对应方法).@Deprecated(表示方法不建议被使用).@SuppressWarnings(抑制警告)等等.当然,我们也可以自定义一些自己需要的

java核心学习(二十一) 多线程---创建启动线程的三种方式

本节开始java多线程编程的学习,对于操作系统.进程.线程的基本概念不再赘述,只是了解java对于多线程编程的支持有哪些. 一.继承Thread类来创建线程 java语言中使用Thread类来代表线程,代表线程的类可以通过继承Thread类并重写run()方法来实现多线程开发,调用线程类实例的start方法来启动该线程.下面来试一试 package ThreadTest; public class FirstThread extends Thread{ private int i; public

Java从零开始学二十一(Ubuntu下配置eclipse)

首先要配置jdk:http://www.cnblogs.com/liunanjava/p/4296540.html 一.下载 eclipse:http://www.eclipse.org/downloads/ 二.安装配置 2.1.解压 sudo tar -zxvf eclipse-jee-luna-SR1a-linux-gtk-x86_64.tar.gz 2.2 .cp到一个文件夹中 移动文件或者复制 sudo mv /home/ln0491/Downloads/eclipse/ /usr/l

Java之集合(二十二)PriorityBlockingQueue

转载请注明源出处:http://www.cnblogs.com/lighten/p/7510799.html 1.前言 本章介绍阻塞队列PriorityBlockingQueue.这是一个无界有序的阻塞队列,排序规则和之前介绍的PriorityQueue一致,只是增加了阻塞操作.同样的该队列不支持插入null元素,同时不支持插入非comparable的对象.它的迭代器并不保证队列保持任何特定的顺序,如果想要顺序遍历,考虑使用Arrays.sort(pq.toArray()).该类不保证同等优先级

java基础(二十一)IO流(四)

这里有我之前上课总结的一些知识点以及代码大部分是老师讲的笔记 个人认为是非常好的,,也是比较经典的内容,真诚的希望这些对于那些想学习的人有所帮助! 由于代码是分模块的上传非常的不便.也比较多,讲的也是比较清晰的!如果你有需要可以在评论中留下您的邮箱我看到一定会免费发给你的!感谢这个平台让我们大家共同进步吧!! 记住 程序员是无私的!!! 也非常欢迎到我的博客内观看 博客地址:http://www.cnblogs.com/duscl/ /* 1:登录注册IO版本案例(掌握) 要求,对着写一遍. c

Java抽象类(二十一)

为什么需要抽象类? 抽象类是一种模版模式,抽象类为所有子类提供了一个通用模版,子类可以在这个模版基础上进行扩展. 通过抽象类,可以避免子类设计的随意性.通过抽象类,我们就可以做到严格限制子类的设计,使子类之间更加通用. 使用抽象类注意的几点: 1.有抽象方法的类只能定义成抽象类 2.抽象类不能实例化,及不能用new来实例化抽象类 3.抽象类可以包含属性,方法,构造方法.但是构造方法不能用来new实例,只能用来被子类调用 4.抽象类只能用来继承 5.抽象方法必须被子类实现 下面是一个小例子: //