ArrayBlockingQueue,BlockingQueue分析

BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,让容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。

ArrayBlockingQueue是一个由数组支持的有界阻塞队列。在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。

基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {
    public static void main(String[] args)  {
        final BlockingQueue queue = new ArrayBlockingQueue(3);
        //final BlockingQueue queue = new LinkedBlockingQueue(3);
        for(int i=0;i<2;i++){
            new Thread(){
                public void run(){
                    while(true){
                        try {
                            Thread.sleep((long)(Math.random()*1000));
                            System.out.println(Thread.currentThread().getName() + "准备放数据!");
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + "已经放了数据," +
                                    "队列目前有" + queue.size() + "个数据");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                }

            }.start();
        }

        new Thread(){
            public void run(){
                while(true){
                    try {
                        //将此处的睡眠时间分别改为100和1000,观察运行结果
                        Thread.sleep(100);
                        System.out.println(Thread.currentThread().getName() + "准备取数据!");
                        queue.take();
                        System.out.println(Thread.currentThread().getName() + "已经取走数据," +
                                "队列目前有" + queue.size() + "个数据");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }.start();
    }
}

1000:

Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有2个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有1个数据
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有2个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有3个数据
Thread-0准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-0已经放了数据,队列目前有3个数据
Thread-1准备放数据!
Thread-0准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-1已经放了数据,队列目前有3个数据
Thread-1准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-0已经放了数据,队列目前有3个数据
Thread-0准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-1已经放了数据,队列目前有3个数据
Thread-1准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-0已经放了数据,队列目前有3个数据
Thread-0准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-1已经放了数据,队列目前有3个数据
Thread-1准备放数据!
Thread-2准备取数据!
Thread-0已经放了数据,队列目前有3个数据
Thread-2已经取走数据,队列目前有3个数据
Thread-0准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有2个数据
Thread-1已经放了数据,队列目前有3个数据
Thread-1准备放数据!
Thread-2准备取数据!

100:

Thread-2准备取数据!
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-1准备放数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-1已经放了数据,队列目前有0个数据
Thread-2准备取数据!
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-1准备放数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-1已经放了数据,队列目前有0个数据
Thread-2准备取数据!
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有2个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有1个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有2个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有1个数据
Thread-2准备取数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-0准备放数据!
Thread-2已经取走数据,队列目前有0个数据
Thread-0已经放了数据,队列目前有0个数据
Thread-2准备取数据!
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-2准备取数据!
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-2已经取走数据,队列目前有0个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有1个数据
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有2个数据
Thread-2准备取数据!

ArrayBlockingQueue与LinkedBlockingQueue的区别:

1.    队列中锁的实现不同
       ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
       LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock

2.    在生产或消费时操作不同
       ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
       LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node<E>进行插入或移除,会影响性能

3.    队列大小初始化方式不同
       ArrayBlockingQueue实现的队列中必须指定队列的大小;
       LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

注意:
1.    在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出
2.    在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时,
       LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右,
       即LinkedBlockingQueue消耗在1500毫秒左右,而ArrayBlockingQueue只需150毫秒左右。

=======================================================

BlockingQueue具体分析:(Java7源码)

add()和remove()是最原始的方法,也是最不常用的。

原因是,当队列满了或者空了的时候,会抛出IllegalStateException("Queuefull")/NoSuchElementException(),并不符合我们对阻塞队列的要求;

因此,ArrayBlockingQueue里,这两个方法的实现,直接继承自java.util.AbstractQueue:

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

有上述源码可知,add()和remove()实现的关键,是来自java.util.Queue接口的offer()和poll()方法。
offer():在队列尾插入一个元素。若成功便返回true,若队列已满则返回false。
poll():同理,取出并删除队列头的一个元素。若成功便返回true,若队列为空则返回false。
这里使用的是ReentrantLock,在插入或者取出前,都必须获得队列的锁,以保证同步。

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();
        }
    }

由于offer()/poll()是非阻塞方法,一旦队列已满或者已空,均会马上返回结果,也不能达到阻塞队列的目的。因此有了put()/take()这两个阻塞方法:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }

put()/take()的实现,比起offer()/poll()复杂了一些,尤其有两个地方值得注意:

1. 取得锁以后,循环判断队列是否已满或者已空,并加上Condition的await()方法将当前正在调用put()的线程挂起,直至notFull.signal()唤起。

2. 这里使用的是lock.lockInterruptibly()而不是lock.lock()。原因在这里。lockInterruptibly()这个方法,优先考虑响应中断,而不是响应普通获得锁或重入获得锁。简单来说就是,由于put()/take()是阻塞方法,一旦有interruption发生,必须马上做出反应,否则可能会一直阻塞。

最后,无论是offer()/poll()还是put()/take(),都要靠insert()/extract()这个私有方法去完成真正的工作:

private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

    final int dec(int i) {
        return ((i == 0) ? items.length : i) - 1;
    }

insert()/extract(),是真正将元素放进数组或者将元素从数组取出并删除的方法。由于ArrayBlockingQueue是有界限的队列(Bounded Queue),因此inc()/dec()方法保证元素不超出队列的界限。

另外,每当insert()后,要使用notEmpty.signal()唤起因队列空而等待取出的线程;每当extract()后,同理要使用notFull.signal()唤起因队列满而等待插入的线程。

列举一些使用队列时的错误做法:

1. 不能忽略offer()的返回值。offer()作为有返回值的方法,可以在判断的时候十分有作用(例如add()的实现)。因此,千万不要忽略offer()方法的返回值。

2. 在循环里使用isEmpty()和阻塞方法:

take()是阻塞方法,无需做isEmpty()的判断,直接使用即可。而这种情况很有可能导致死锁,因为由于不断循环,锁会一直被isEmpty()取得(因为size()方法会取得锁),而生产者无法获得锁。

3. 频繁使用size()方法去记录。size()方法是要取得锁的,意味着这不是一个廉价的方法。可以使用原子变量代替。

http://www.cnblogs.com/liuling/p/2013-8-20-01.html

http://www.cnblogs.com/techyc/p/3782079.html?utm_source=tuicool&utm_medium=referral

http://blog.csdn.net/luohuacanyue/article/details/16359777

时间: 2024-10-24 05:02:48

ArrayBlockingQueue,BlockingQueue分析的相关文章

Java并发包--ArrayBlockingQueue

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3498652.html ArrayBlockingQueue介绍 ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列.线程安全是指,ArrayBlockingQueue内部通过"互斥锁"保护竞争资源,实现了多线程对竞争资源的互斥访问.而有界,则是指ArrayBlockingQueue对应的数组是有界限的. 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用. 1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程.BlockingQueue的核心方法有: boolean add(E e) ,把 e 添加到BlockingQueue里.如果BlockingQueue可以容纳,则返回true,否则抛出异常. boolean offer(E e),表示如果可能的话,将 e 加到B

Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue

Java中的阻塞队列接口BlockingQueue继承自Queue接口. BlockingQueue接口提供了3个添加元素方法. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常 offer:添加元素到队列里,添加成功返回true,添加失败返回false put:添加元素到队列里,如果容量满了会阻塞直到容量不满 3个删除方法. poll:删除队列头部元素,如果队列为空,返回null.否则返回元素. remove:基于对象找到

java同步包种ArrayBlockingQueue类的分析与理解

前言: ArrayBlockingQueue类是一个阻塞队列,重要用于多线程操作的条件. 一,官方解释 一个建立在数组之上被BlockingQueue绑定的阻塞队列.这个队列元素顺序是先进先出.队列的头部是在队列中待的时间最长的元素.队列的尾部是再队列中待的时间最短的元素.新的元素会被插入到队列尾部,并且队列从队列头部获取元素. 这是一个典型的绑定缓冲,在这个缓冲区中,有一个固定大小的数组持有生产者插入的数据,并且消费者会提取这些数据.一旦这个类被创建,那么这个数组的容量将不能再被改变.尝试使用

死磕 java集合之ArrayBlockingQueue源码分析

问题 (1)ArrayBlockingQueue的实现方式? (2)ArrayBlockingQueue是否需要扩容? (3)ArrayBlockingQueue有什么缺点? 简介 ArrayBlockingQueue是java并发包下一个以数组实现的阻塞队列,它是线程安全的,至于是否需要扩容,请看下面的分析. 队列 队列,是一种线性表,它的特点是先进先出,又叫FIFO,就像我们平常排队一样,先到先得,即先进入队列的人先出队. 源码分析 主要属性 // 使用数组存储元素 final Object

ArrayBlockingQueue源码分析

转自:http://www.xiaoyaochong.net/wordpress/?p=354 ArrayBlockingQueue是Java并发框架中阻塞队列的最基本的实现,分析这个类就可以知道并发框架中是如何实现阻塞的. 笔者工作了一两年之后,还不知道阻塞是如何实现的,当然有一个原因是前期学习的东西比较杂,前后端的东西的懂一点,但是了解的不够深刻,我觉得这是编程学习的禁忌,不管是前端还是后端,在工作3年的时候,你应该有一个方向是拿得出手,见得了人的. 转回整体,ArrayBlockingQu

ArrayBlockingQueue和LinkedBlockingQueue分析

一.BlockingQueue接口       BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,让容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞. 二.ArrayBlockingQueue      ArrayBlockingQueue是一个由数组支持的有界阻塞队列.在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式. 三.LinkedBlock

Java BlockingQueue 源码分析

简介 BlockingQueue 是 Java concurrent包提供的多线程安全的阻塞队列,其子类包括 LinkedBlockingQueue 和 ArrayBlockingQueue. 关键API 说到队列,自然少不了首尾的插入删除操作,BlockingQueue的API中提供了好几种插入删除方法. 这些方法在遇到无法满足的执行条件时,如队列满了(添加元素时)/队列为空(取出元素时),会采取不同的措施:抛出异常,返回false/null,阻塞调用API的线程,等待一定时间等.具体如下表:

java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

BlockingQueue是阻塞队列接口类,该接口继承了Queue接口 BlockingQueue实现类常见的有以下几种. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里.有界也就意味着,它不能够存储无限多数量的元素.它有一个同一时间能够存储元素数量的上限.你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改). D