ArrayBlockingQueue 1.8 源码浅析

[TOC]

ArrayBlockingQueue 1.8 源码浅析

一,简介

ArrayBlockingQueue 是一个用数组实现的有界队列;此队列按照先进先出(FIFO)的规则对元素进行排序;默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序的访问队列,即先阻塞的线程先访问队列;非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问;为了保证公平性,通常会降低吞吐量。

二,类UML图

三,基本成员

    /** The queued items */
    // 记录数据的数组
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    // 索引用于 take,poll,peek,remove 等方法
    int takeIndex;

    /** items index for next put, offer, or add */
    // 索引用于 put,offer,or add 等方法
    int putIndex;

    /** Number of elements in the queue */
    // 总数
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    // 队列的锁
    final ReentrantLock lock;

    /** Condition for waiting takes */
    // 用于让线程等待,消费时队列为空
    private final Condition notEmpty;

    /** Condition for waiting puts */
    // 用于让线程等待,生产时队列满
    private final Condition notFull;

四,常用方法

构造方法

我们看下两个构造,其实也就是一个,注意没有无参构造,初始化时必须要给出容量。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 初始化一个ArrayBlockingQueue
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 初始化一个数组
        this.items = new Object[capacity];
        // 初始化一个锁
        lock = new ReentrantLock(fair);
        // 用来存放消费者的阻塞线程
        notEmpty = lock.newCondition();
        // 用来存放生产者的线程
        notFull =  lock.newCondition();
    }
add 方法

可以看出add调用的是offer方法,详情请看offer方法。

    public boolean add(E e) {
        // 调用父类的方法
        return super.add(e);
    }

    // 父类 AbstractQueue 的add方法
     public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

注意:add 插入失败会抛异常。

offer 方法
    // offer加入元素
    public boolean offer(E e) {
        // 不能为null
        checkNotNull(e);
        // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果数组满了,返回false
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    // enqueue

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        // 获取数组
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 唤醒消费阻塞的队列
        notEmpty.signal();
    }

注意:offer还有一个重载方法,带有超时时间的插入,支持中断offer(E e, long timeout, TimeUnit unit)。

put 方法
public void put(E e) throws InterruptedException {
        // 不能为null
        checkNotNull(e);
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 支持中断
        lock.lockInterruptibly();
        try {
            // 等于数组的容量
            while (count == items.length)
                // 等待
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

注意:put和前面的offer要区别,offer方法队列满是返回false,put方法是让线程等待,根据自己的场景用合适的方法。

poll 方法
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

注意:poll也有一个重载方法,带有超时和中断poll(long timeout, TimeUnit unit)。

take 方法
    // 消费
    public E take() throws InterruptedException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 支持中断
        lock.lockInterruptibly();
        try {
            // 队列为空
            while (count == 0)
                // 阻塞
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

注意:take和poll也是一对方法,poll队列为空返回null,take是让线程等待,直到唤醒。

peek 方法
// 获取队尾的元素 不删除
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
size 方法
    // 统计个数 size是准确值
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

五,总结

ArrayBlockingQueue 是有界的,所以我们在初始化是容量要设计好,因为它是不可以扩容的,还有我觉得这个队列适合一些稳定并发量的系统,如果并发量突然变大,导致队列满,会造成大量的线程等待,影响系统的响应;我们通过阅读源码也发现队列的源码是很轻量的,使用起来也很简单,让人很好理解;使用这个队列一定要注意put,offer,take,poll这两组方法,根据自己的业务场景选择是直接返回(响应速度快)还是阻塞线程。

参考:《Java 并发编程的艺术》

原文地址:https://blog.51cto.com/14220760/2411583

时间: 2024-08-02 06:12:47

ArrayBlockingQueue 1.8 源码浅析的相关文章

Volley框架源码浅析(一)

尊重原创http://blog.csdn.net/yuanzeyao/article/details/25837897 从今天开始,我打算为大家呈现关于Volley框架的源码分析的文章,Volley框架是Google在2013年发布的,主要用于实现频繁而且粒度比较细小的Http请求,在此之前Android中进行Http请求通常是使用HttpUrlConnection和HttpClient进行,但是使用起来非常麻烦,而且效率比较地下,我想谷歌正式基于此种原因发布了Volley框架,其实出了Voll

PM2源码浅析

PM2工作原理 最近在玩一个游戏,<地平线:黎明时分>,最终Boss是一名叫黑底斯的人,所谓为人,也许不对,黑底斯是一段强大的毁灭进程,破坏了盖娅主进程,从而引发的整个大陆机械兽劣化故事. 为什么要讲这么一段呢,是希望大家可以更好地理解pm2的原理,要理解pm2就要理解god和santan的关系,god和santan的关系就相当于盖娅和黑底斯在pm2中的01世界中,每一行代码每一个字节都安静的工作god就是Daemon进程 守护进程,重启进程,守护node程序世界的安宁,santan就是进程的

Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置

Android源码浅析(一)--VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置 最近地方工作,就是接触源码的东西了,所以好东西还是要分享,系列开了这么多,完结 的也没几个,主要还是自己覆盖的太广了,却又不精通,嘿嘿,工作需要,所以写下了本篇博客 一.VMware 12 我选择的虚拟机试VMware,挺好用的感觉,下载VMware就不说了,善用搜索键嘛,这里我提供一个我现在在用的 下载地址:链接:http://pan.baidu.com/s/1k

ReactiveCocoa2 源码浅析

ReactiveCocoa2 源码浅析 标签(空格分隔): ReactiveCocoa iOS Objective-C ? 开车不需要知道离合器是怎么工作的,但如果知道离合器原理,那么车子可以开得更平稳. ReactiveCocoa 是一个重型的 FRP 框架,内容十分丰富,它使用了大量内建的 block,这使得其有强大的功能的同时,内部源码也比较复杂.本文研究的版本是2.4.4,小版本间的差别不是太大,无需担心此问题. 这里只探究其核心 RACSignal 源码及其相关部分.本文不会详细解释里

【Spark Core】任务执行机制和Task源码浅析2

引言 上一小节<任务执行机制和Task源码浅析1>介绍了Executor的注册过程. 这一小节,我将从Executor端,就接收LaunchTask消息之后Executor的执行任务过程进行介绍. 1. Executor的launchTasks函数 DriverActor提交任务,发送LaunchTask指令给CoarseGrainedExecutorBackend,接收到指令之后,让它内部的executor来发起任务,即调用空闲的executor的launchTask函数. 下面是Coars

Volley框架源码浅析(二)

尊重原创 http://write.blog.csdn.net/postedit/25921795 在前面的一片文章Volley框架浅析(一)中我们知道在RequestQueue这个类中,有两个队列:本地队列和网络队列 /** The cache triage queue. */ private final PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<Request<

Android手势源码浅析-----手势绘制(GestureOverlayView)

Android手势源码浅析-----手势绘制(GestureOverlayView)

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

转:Spring FactoryBean源码浅析

http://blog.csdn.net/java2000_wl/article/details/7410714 在Spring BeanFactory容器中管理两种bean 1.标准Java Bean 2,另一种是工厂Bean,   即实现了FactoryBean接口的bean  它不是一个简单的Bean 而是一个生产或修饰对象生成的工厂Bean 在向Spring容器获得bean时  对于标准的java Bean  返回的是类自身的实例 而FactoryBean 其返回的对象不一定是自身类的一