java.util.concurrent BlockingQueue

BlockingQueue

  它实现了Queue接口。它是A BlockingQueue with one thread putting into it, and another thread taking from it. 一端生产一端消费。

其中的一个线程将不断的将任务放入BlockingQueue,直到遇到它的临界值,但是不允许插入NULL,否则会抛出NullPointerException。另一个线程从中不断的取任务。

BlockingQueue 的方法

  BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

  抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 element(o) peek(o)    

四组不同的行为方式解释:

    1. 抛异常:如果试图的操作无法立即执行,抛一个异常。IllegalStateException
    2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
    3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
    4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

BlockingQueue 的实现

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):

    • ArrayBlockingQueue
    • DelayQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue
    • SynchronousQueue

借助ArrayBlockingQueue来实现: 

//消费者
public class Consumer implements Runnable {
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

//生产者
public class Producer implements Runnable {
    private BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            queue.add("001");
            Thread.sleep(1000);
            queue.add("002");
            Thread.sleep(1000);
            queue.add("003");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
//test
    public static void main(String[] args) {
        Executor executor = Executors.newCachedThreadPool();
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
        executor.execute(new Producer(queue));

        executor.execute(new Consumer(queue));

    }

  上面test中跟consumer/producer执行顺序没有关系。正如上面所说的在take的时候,会阻塞。

数组阻塞队列 ArrayBlockingQueue

  ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。既然是使用数组来实现的,就必须要满足数组的一些特性,比如删除的效率较低,无法改变其size。ArrayBlockingQueue使用FIFO的方式来组织数据。同时ArrayBlockingQueue提供了一种公平性的策略。

  public ArrayBlockingQueue(int capacity, boolean fair) {} fair=true的时候,会满足FIFO的策略。

延迟队列 DelayQueue

  这是一个无界的阻塞队列,队列中的每一个元素都有一个延迟期,只有在元素的延迟期失效的情况下才能take。队首元素是过期时间最长的元素。如果没有过期元素,那么返回null。  

getDelay(TimeUnit unit)

  返回元素的过期时长。只有在小于等于0的时候,元素才可以使用。

链阻塞队列 LinkedBlockingQueue

  LinkedBlockingQueue是一个可选的有界阻塞队列。处理元素的策略同样为FIFO。满足链表的性质。LinkedBlockingQueue相对数组实现的阻塞队列有较高的吞吐量,但是性能方面稍差。

有优先级的阻塞队列 PriorityBlockingQueue

  PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。
所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。

同步队列 SynchronousQueue

  SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

阻塞双端队列 BlockingDeque

j  ava.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用 BlockingDeque。
BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。
deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

【参阅】http://blog.csdn.net/defonds/article/details/44021605/

   http://tutorials.jenkov.com/java-concurrency/index.html

 

时间: 2024-08-10 21:29:39

java.util.concurrent BlockingQueue的相关文章

java.util.concurrent BlockingQueue详解

什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入 add(e) offer(e) put(e) o

[转载] java多线程学习-java.util.concurrent详解(四) BlockingQueue

转载自http://janeky.iteye.com/blog/770671 --------------------------------------------------------------------------------- 7.BlockingQueue     “支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用.“ 这里我们主要讨论BlockingQueue的最典型实现:LinkedBlockingQueue 和Arra

java多线程学习--java.util.concurrent

CountDownLatch,api 文档:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. 假设我们要打印1-100,最

Java线程池与java.util.concurrent

Java(Android)线程池 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 那你就out太多了,new Thre

java.util.concurrent 多线程框架

http://daoger.iteye.com/blog/142485 博客分类: 技术研究 多线程Java框架Tomcatthread (来源于http://www.zhuaxia.com/item/590227619/) JDK5中的一个亮点就是将Doug Lea的并发库引入到Java标准库中.Doug Lea确实是一个牛人,能教书,能出书,能编码,不过这在国外还是比较普遍的,而国内的教授们就相差太远了. 一般的服务器都需要线程池,比如Web.FTP等服务器,不过它们一般都自己实现了线程池,

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

java.util.concurrent.CountDownLatch

from when and where: CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier.Semaphore.ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下. introduce: CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行.例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行. func