BlockingQueue的使用

http://www.cnblogs.com/liuling/p/2013-8-20-01.html

BlockingQueue的使用

本例介绍一个特殊的队列:BlockingQueue,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作. 
    使用BlockingQueue的关键技术点如下: 
    1.BlockingQueue定义的常用方法如下: 
        1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则报异常 
        2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
        3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续. 
        4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null 
        5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止 
    2.BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类 
        1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的. 
        2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的 
        3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序. 
        4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的. 
    3.LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

  下面是两个使用BlockingQueue的例子:

 1 package com.thread;
 2 import java.util.concurrent.ArrayBlockingQueue;
 3 import java.util.concurrent.BlockingQueue;
 4
 5 public class BlockingQueueTest {
 6     public static void main(String[] args) {
 7         final BlockingQueue queue = new ArrayBlockingQueue(3);
 8         for(int i=0;i<2;i++){
 9             new Thread(){
10                 public void run(){
11                     while(true){
12                         try {
13                             Thread.sleep((long)(Math.random()*1000));
14                             System.out.println(Thread.currentThread().getName() + "准备放数据!");
15                             queue.put(1);
16                             System.out.println(Thread.currentThread().getName() + "已经放了数据," +
17                                         "队列目前有" + queue.size() + "个数据");
18                         } catch (InterruptedException e) {
19                             e.printStackTrace();
20                         }
21
22                     }
23                 }
24
25             }.start();
26         }
27
28         new Thread(){
29             public void run(){
30                 while(true){
31                     try {
32                         //将此处的睡眠时间分别改为100和1000,观察运行结果
33                         Thread.sleep(1000);
34                         System.out.println(Thread.currentThread().getName() + "准备取数据!");
35                         queue.take();
36                         System.out.println(Thread.currentThread().getName() + "已经取走数据," +
37                                 "队列目前有" + queue.size() + "个数据");
38                     } catch (InterruptedException e) {
39                         e.printStackTrace();
40                     }
41                 }
42             }
43
44         }.start();
45     }
46 }

 1 package com.thread;
 2 import java.util.concurrent.ArrayBlockingQueue;
 3 import java.util.concurrent.BlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.locks.Condition;
 7 import java.util.concurrent.locks.Lock;
 8 import java.util.concurrent.locks.ReentrantLock;
 9
10 public class BlockingQueueCondition {
11
12     public static void main(String[] args) {
13         ExecutorService service = Executors.newSingleThreadExecutor();
14         final Business3 business = new Business3();
15         service.execute(new Runnable(){
16
17             public void run() {
18                 for(int i=0;i<50;i++){
19                     business.sub();
20                 }
21             }
22
23         });
24
25         for(int i=0;i<50;i++){
26             business.main();
27         }
28     }
29
30 }
31
32 class Business3{
33     BlockingQueue subQueue = new ArrayBlockingQueue(1);
34     BlockingQueue mainQueue = new ArrayBlockingQueue(1);
35     //这里是匿名构造方法,只要new一个对象都会调用这个匿名构造方法,它与静态块不同,静态块只会执行一次,
36     //在类第一次加载到JVM的时候执行
37     //这里主要是让main线程首先put一个,就有东西可以取,如果不加这个匿名构造方法put一个的话程序就死锁了
38     {
39         try {
40             mainQueue.put(1);
41         } catch (InterruptedException e) {
42             e.printStackTrace();
43         }
44     }
45     public void sub(){
46         try
47         {
48             mainQueue.take();
49             for(int i=0;i<10;i++){
50                 System.out.println(Thread.currentThread().getName() + " : " + i);
51             }
52             subQueue.put(1);
53         }catch(Exception e){
54
55         }
56     }
57
58     public void main(){
59
60         try
61         {
62             subQueue.take();
63             for(int i=0;i<5;i++){
64                 System.out.println(Thread.currentThread().getName() + " : " + i);
65             }
66             mainQueue.put(1);
67         }catch(Exception e){
68         }
69     }
70 }

时间: 2024-10-12 09:48:06

BlockingQueue的使用的相关文章

BlockingQueue(阻塞队列)详解

一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由

深入浅出 Java Concurrency (22): 并发容器 part 7 可阻塞的BlockingQueue (2)[转]

在上一节中详细分析了LinkedBlockingQueue 的实现原理.实现一个可扩展的队列通常有两种方式:一种方式就像LinkedBlockingQueue一样使用链表,也就是每一个元素带有下一个元素的引用,这样的队列原生就是可扩展的:另外一种就是通过数组实现,一旦队列的大小达到数组的容量的时候就将数组扩充一倍(或者一定的系数倍),从而达到扩容的目的.常见的ArrayList就属于第二种.前面章节介绍过的HashMap确是综合使用了这两种方式. 对于一个Queue而言,同样可以使用数组实现.使

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe

使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.使用场景. 首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出:在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享.强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程. BlockingQueue的

JAVA多线程之间共享数据BlockingQueue介绍

在JAVA的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 一.认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:  从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从

深入浅出 Java Concurrency (21): 并发容器 part 6 可阻塞的BlockingQueue (1)[转]

在<并发容器 part 4 并发队列与Queue简介>节中的类图中可以看到,对于Queue来说,BlockingQueue是主要的线程安全版本.这是一个可阻塞的版本,也就是允许添加/删除元素被阻塞,直到成功为止. BlockingQueue相对于Queue而言增加了两个操作:put/take.下面是一张整理的表格. 看似简单的API,非常有用.这在控制队列的并发上非常有好处.既然加入队列和移除队列能够被阻塞,这在实现生产者-消费者模型上就简单多了. 清单1 是生产者-消费者模型的一个例子.这个

Java多线程-工具篇-BlockingQueue

前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入

java.util.concurrent BlockingQueue

BlockingQueue 它实现了Queue接口.它是A BlockingQueue with one thread putting into it, and another thread taking from it. 一端生产一端消费. 其中的一个线程将不断的将任务放入BlockingQueue,直到遇到它的临界值,但是不允许插入NULL,否则会抛出NullPointerException.另一个线程从中不断的取任务. BlockingQueue 的方法 BlockingQueue 具有

[转载] java多线程学习-java.util.concurrent详解(四) BlockingQueue

转载自http://janeky.iteye.com/blog/770671 --------------------------------------------------------------------------------- 7.BlockingQueue     “支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用.“ 这里我们主要讨论BlockingQueue的最典型实现:LinkedBlockingQueue 和Arra

java-线程-使用阻塞队列(BlockingQueue)控制线程通信

BlockingQueue是一个接口,也是Queue的子接口.BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则线程被阻塞:但消费者线程试图从BlockingQueue中取出元素时,如果队列已空,则该线程阻塞. 程序的两个线程通过交替向BlockingQueue中放入元素.取出元素,即可很好地控制线程的通信. BlockingQueue提供如下两个支持阻塞的方法: put(E e):尝试把Eu元素放如BlockingQueue中,