BlockingQueue深入分析

1.BlockingQueue定义的常用方法如下

  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图addput 或offer 一个null 元素时,某些实现会抛出NullPointerExceptionnull 被用作指示poll 操作失败的警戒值。

2、BlockingQueue的几个注意点

【1】BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity,超出此容量,便无法无阻塞地put 附加元素。没有任何内部容量约束的BlockingQueue 总是报告Integer.MAX_VALUE 的剩余容量。

【2】BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

【3】BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

【4】BlockingQueue 实质上不支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的end-of-stream 或poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

3、简要概述BlockingQueue常用的四个实现类


1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

其中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

下面主要看一下ArrayBlockingQueue的源码:

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁
        lock.lock();//请求锁直到获得锁(不可以被interrupte)
        try {
            if (count == items.length)//如果队列已经满了
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();//
        }
}
看insert方法:
private void insert(E x) {
        items[putIndex] = x;
        //增加全局index的值。
        /*
        Inc方法体内部:
        final int inc(int i) {
        return (++i == items.length)? 0 : i;
            }
        这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满)
        */
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();//wake up one waiting thread
}    
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted
        try {
            try {
                while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
}    
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {    

        if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != items.length) {
                    insert(e);
                    return true;
                }
                if (nanos <= 0)
                    return false;
                try {
                //如果没有被 signal/interruptes,需要等待nanos时间才返回
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }    
public boolean add(E e) {
    return super.add(e);
}
父类:
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }  

该类中有几个实例变量:takeIndex/putIndex/count

用三个数字来维护这个队列中的数据变更:
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;    
时间: 2024-11-14 05:26:16

BlockingQueue深入分析的相关文章

BlockingQueue深入分析(转)

1.BlockingQueue定义的常用方法如下   抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e,time,unit) 移除 remove() poll() take() poll(time,unit) 检查 element() peek() 不可用 不可用 1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常 2)offer(anObjec

BlockingQueue

BlockingQueue的使用 http://www.cnblogs.com/liuling/p/2013-8-20-01.html BlockingQueue深入分析 http://blog.csdn.net/tayanxunhua/article/details/20962307 java中线程队列BlockingQueue的用法 http://blog.itpub.net/143526/viewspace-1060365/

《java并发编程实战》读书笔记4--基础构建模块,java中的同步容器类&amp;并发容器类&amp;同步工具类,消费者模式

上一章说道委托是创建线程安全类的一个最有效策略,只需让现有的线程安全的类管理所有的状态即可.那么这章便说的是怎么利用java平台类库的并发基础构建模块呢? 5.1 同步容器类 包括Vector和Hashtable,此外还包括在JDK1.2中添加的一些功能相似的类,这些同步的封装器类由Collections.synchronizedXxx等工厂方法创建的.这些类实现线程安全的方式是:将他们的状态封装起来,并对每个共有方法都进行同步,使得每次只能有一个线程能访问容器的状态. 关于java中的Vect

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用. 1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程.BlockingQueue的核心方法有: boolean add(E e) ,把 e 添加到BlockingQueue里.如果BlockingQueue可以容纳,则返回true,否则抛出异常. boolean offer(E e),表示如果可能的话,将 e 加到B

BlockingQueue(阻塞队列)详解

一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由

深入浅出 Java Concurrency (22): 并发容器 part 7 可阻塞的BlockingQueue (2)[转]

在上一节中详细分析了LinkedBlockingQueue 的实现原理.实现一个可扩展的队列通常有两种方式:一种方式就像LinkedBlockingQueue一样使用链表,也就是每一个元素带有下一个元素的引用,这样的队列原生就是可扩展的:另外一种就是通过数组实现,一旦队列的大小达到数组的容量的时候就将数组扩充一倍(或者一定的系数倍),从而达到扩容的目的.常见的ArrayList就属于第二种.前面章节介绍过的HashMap确是综合使用了这两种方式. 对于一个Queue而言,同样可以使用数组实现.使

深入分析:Fragment与Activity交互的几种方式(一,使用Handler)

这里我不再详细介绍那写比较常规的方式,例如静态变量,静态方法,持久化,application全局变量,收发广播等等. 首先我们来介绍使用Handler来实现Fragment与Activity 的交互. 第一步,我们需要在Activity中定义一个方法用来设置Handler对象. public void setHandler(Handler handler) { mHandler = handler; } 第二步,在Fragment中的回调函数onAttach()中得到Fragment所在Acti

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe

Android开发艺术探索——第七章:Android动画深入分析

Android开发艺术探索--第七章:Android动画深入分析 Android的动画可以分成三种,view动画,帧动画,还有属性动画,其实帧动画也是属于view动画的一种,,只不过他和传统的平移之类的动画不太一样的是表现形式上有点不一样,view动画是通过对场景的不断图像交换而产生的动画效果,而帧动画就是播放一大段图片,很显然,图片多了会OOM,属性动画通过动态的改变对象的属性达到动画效果,也是api11的新特性,在低版本无法使用属性动画,但是我们依旧有一些兼容库,OK,我们还是继续来看下详细