SynchronousQueue 的简单应用

SynchronousQueue是这样一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。 
     不能在同步队列上进行 peek,因为仅在试图要取得元素时,该元素才存在; 
     除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头是尝试添加到队列中的首个已排队线程元素; 如果没有已排队线程,则不添加元素并且头为 null。 
     对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空集合。此队列不允许 null 元素。
    它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、 
事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 
    对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。 
    但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。 公平通常会降低吞吐量,但是可以减小可变性并避免得不到服务。 
    注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。 
    注意2:它是线程安全的,是阻塞的。 
    注意3:不允许使用 null 元素。 
    注意4:公平排序策略是指调用put的线程之间,或take的线程之间。公平排序策略可以查考ArrayBlockingQueue中的公平策略。 
    注意5:SynchronousQueue的以下方法: 
    * iterator() 永远返回空,因为里面没东西。 
    * peek() 永远返回null。 
    * put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。 
    * offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。 
    * offer(2000, TimeUnit.SECONDS) 往queue里放一个element但是等待指定的时间后才返回,返回的逻辑和offer()方法一样。 
    * take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。 
    * poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。 
    * poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且remove掉queue里的element,其实就是再等其他的thread来往里塞。 
    * isEmpty()永远是true。 
    * remainingCapacity() 永远是0。 
    * remove()和removeAll() 永远是false。

这是一个很有意思的阻塞队列,其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其 实没有任何一个元素,或者说容量是0,严格说并不是一种容器。由于队列没有容量,因此不能调用peek操作,因为只有移除元素时才有元素。

一个没有容量的并发队列有什么用了?或者说存在的意义是什么?

SynchronousQueue 的实现非常复杂,当然了如果真要去分析还是能够得到一些经验的,但是前面分析了过多的结构后,发现越来越陷于数据结构与算法里面了。我的初衷是通过研究并 发实现的原理来更好的利用并发来最大限度的利用可用资源。所以在后面的章节中尽可能的少研究数据结构和算法,但是为了弄清楚里面的原理,必不可免的会涉及 到一些这方面的知识,希望后面能够适可而止。

再回到话题。SynchronousQueue 内部没有容量,但是由于一个插入操作总是对应一个移除操作,反过来同样需要满足。那么一个元素就不会再SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方 向。

需要特别说明的是,尽管元素在SynchronousQueue 内部不会“停留”,但是并不意味之SynchronousQueue 内部没有队列。实际上SynchronousQueue 维护者线程队列,也就是插入线程或者移除线程在不同时存在的时候就会有线程队列。既然有队列,同样就有公平性和非公平性特性,公平性保证正在等待的插入线 程或者移除线程以FIFO的顺序传递资源。

显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入着(生产者)传递给移除着(消费者),这在多任务队列中是最快处理任务的方式。在线程池的相关章节中还会更多的提到此特性。

SynchronousQueue<E>的定义如下

public class SynchronousQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable

从上面可以看出,它实现BlockingQueue<E>,所以是阻塞队列,从名字看,它又是同步的。

它模拟的功能类似于生活中一手交钱一手交货这种情形,像那种货到付款或者先付款后发货模型不适合使用SynchronousQueue。
首先要知道SynchronousQueue没有容纳元素的能力,即它的isEmpty()方法总是返回true,但是给人的感觉却像是只能容纳一个元素。

另外在创建SynchronousQueue时可以传递一个boolean参数来指定它是否是访问它的线程按遵守FIFO顺序处理,true表示遵守FIFO。

下面使用SynchronousQueue模拟只能生产一个产品的生产者-消费者模型

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test19 {
	public static void main(String[] args) {
		SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
		new Customer(queue).start();
		new Product(queue).start();
	}
	static class Product extends Thread{
		SynchronousQueue<Integer> queue;
		public Product(SynchronousQueue<Integer> queue){
			this.queue = queue;
		}
		@Override
		public void run(){
			while(true){
				int rand = new Random().nextInt(1000);
				System.out.println("生产了一个产品:"+rand);
				System.out.println("等待三秒后运送出去...");
				try {
					TimeUnit.SECONDS.sleep(3);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				queue.offer(rand);
			}
		}
	}
	static class Customer extends Thread{
		SynchronousQueue<Integer> queue;
		public Customer(SynchronousQueue<Integer> queue){
			this.queue = queue;
		}
		@Override
		public void run(){
			while(true){
				try {
					System.out.println("消费了一个产品:"+queue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("------------------------------------------");
			}
		}
	}
	/**
	 * 运行结果:
	 *  生产了一个产品:464
		等待三秒后运送出去...
		消费了一个产品:773
		------------------------------------------
		生产了一个产品:547
		等待三秒后运送出去...
		消费了一个产品:464
		------------------------------------------
		生产了一个产品:87
		等待三秒后运送出去...
		消费了一个产品:547
		------------------------------------------
	 */
}

  

时间: 2024-11-10 06:03:09

SynchronousQueue 的简单应用的相关文章

Android Java 线程池 ThreadPoolExecutor源代码篇

线程池简单点就是任务队列+线程组成的. 接下来我们来简单的了解下ThreadPoolExecutor的源代码. 先看ThreadPoolExecutor的简单类图,对ThreadPoolExecutor总体来个简单的认识. 为了分析ThreadPoolExecutor我们得下扯点队列和队列里面的任务这东西. 常见三种BlockingQueue堵塞队列SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue当然还有其它的,简单类图(仅仅画了Sy

Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀

详述: 线程阀是一种线程与线程之间相互制约和交互的机制: 作用:http://wsmajunfeng.iteye.com/blog/1629354阻塞队列BlockingQueue:数组阻塞队列ArrayBlockingQueue:链表阻塞队列LinkedBlockingQueue:优先级阻塞队列PriorityBlockingQueue:延时队列DelayQueue:同步队列SynchronousQueue:链表双向阻塞队列LinkedBlockingDeque:链表传输队列LinkedTra

【死磕Java并发】-----J.U.C之阻塞队列:SynchronousQueue

[注]:SynchronousQueue实现算法看的晕乎乎的,写了好久才写完,如果当中有什么错误之处,忘各位指正 作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性: 没有容量.与其他BlockingQueue不同,是一个不存储元素的BlockingQueue.每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然. 因为没有容量,所以对应 peek, contains, clear, isEmpty - 等方

从一个简单的Java单例示例谈谈并发

一个简单的单例示例 单例模式可能是大家经常接触和使用的一个设计模式,你可能会这么写 public class UnsafeLazyInitiallization { private static UnsafeLazyInitiallization instance; private UnsafeLazyInitiallization() { } public static UnsafeLazyInitiallization getInstance(){ if(instance==null){ /

从一个简单的Java单例示例谈谈并发 JMM JUC

原文: http://www.open-open.com/lib/view/open1462871898428.html 一个简单的单例示例 单例模式可能是大家经常接触和使用的一个设计模式,你可能会这么写 public class UnsafeLazyInitiallization { private static UnsafeLazyInitiallization instance; private UnsafeLazyInitiallization() { } public static U

【实战Java高并发程序设计 7】让线程之间互相帮助--SynchronousQueue的实现

[实战Java高并发程序设计 1]Java中的指针:Unsafe类 [实战Java高并发程序设计 2]无锁的对象引用:AtomicReference [实战Java高并发程序设计 3]带有时间戳的对象引用:AtomicStampedReference [实战Java高并发程序设计 4]数组也能无锁:AtomicIntegerArray [实战Java高并发程序设计 5]让普通变量也享受原子操作 [实战Java高并发程序设计6]挑战无锁算法:无锁的Vector实现 在对线程池的介绍中,提到了一个非

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

一个简单的Java单例示例谈谈并发

一个简单的单例示例 单例模式可能是大家经常接触和使用的一个设计模式,你可能会这么写 public class UnsafeLazyInitiallization { private static UnsafeLazyInitiallization instance; private UnsafeLazyInitiallization() { } public static UnsafeLazyInitiallization getInstance(){ if(instance==null){ /

JDK7中TransferQueue的使用以及TransferQueue与SynchronousQueue的差别

JDK7对JDK5中的J.U.C并发工具进行了增强,其中之一就是新增了TransferQueue.java并发相关的JSR规范,可以查看Doug Lea维护的blog.现在简单介绍下这个类的使用方式. public interface TransferQueue<E> extends BlockingQueue<E> { /** * Transfers the element to a waiting consumer immediately, if possible. * * &