Java源码解析——集合框架(二)——ArrayBlockingQueue

ArrayBlockingQueue源码解析

ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据(实际上可看作一个循环数组)。常用的操作包括 add ,offer,put,remove,poll,take,peek。

一、类声明

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable

1)AbstractQueue提供了Queue接口的默认实现。

2)BlockingQueue接口定义了阻塞队列必须实现的方法。

3)通过实现 java.io.Serializable 接口以启用其序列化功能。未实现此接口的类将无法使其任何状态序列化或反序列化。序列化接口没有方法或字段,仅用于标识可序列化的语义。

二、成员变量

private final E[] items;//底层数据结构

private int takeIndex;//用来为下一个take/poll/remove的索引(出队)

private int putIndex;//用来为下一个put/offer/add的索引(入队)

private int count;//队列中元素的个数

private final ReentrantLock lock;//锁

private final Condition notEmpty;//等待出队的条件

private final Condition notFull;//等待入队的条件

三、构造方法

ArrayBlockingQueue提供了两个构造方法:

/**
* 创造一个队列,指定队列容量,指定模式
* @param fair
* true:先来的线程先操作
* false:顺序随机
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];//初始化类变量数组items
    lock = new ReentrantLock(fair);//初始化类变量锁lock
    notEmpty = lock.newCondition();//初始化类变量notEmpty Condition
    notFull = lock.newCondition();//初始化类变量notFull Condition
}

/**
* 创造一个队列,指定队列容量,默认模式为非公平模式
* @param capacity <1会抛异常
*/
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

ArrayBlockingQueue的组成:一个对象数组+1把锁ReentrantLock+2个条件Condition

三、成员方法

  • 入队方法

  ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

  add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

  offer方法如果队列满了,返回false,否则返回true

  add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

  这3个方法内部都会使用可重入锁保证原子性。

1)add方法:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

2)offer方法:

在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false。因为使用的是ReentrantLock重入锁,所以需要显式地加锁和释放锁。

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)//数组满了
                return false;
            else {//数组没满
                insert(e);//插入一个元素
                return true;
            }
        } finally {
            lock.unlock();
        }
}

在插入元素结束后,唤醒等待notEmpty条件(即获取元素)的线程。

/**
* 在队尾插入一个元素,并设置了超时等待的时间
* 如果数组已满,则进入等待,直到出现以下三种情况:
* 1、被唤醒
* 2、等待时间超时
* 3、当前线程被中断
*/
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
  if (e == null)
    throw new NullPointerException();
  long nanos = unit.toNanos(timeout);//将超时时间转换为纳秒
  final ReentrantLock lock = this.lock;
        /*
         * lockInterruptibly():
         * 1、 在当前线程没有被中断的情况下获取锁。
         * 2、如果获取成功,方法结束。
         * 3、如果锁无法获取,当前线程被阻塞,直到下面情况发生:
         * 1)当前线程(被唤醒后)成功获取锁
         * 2)当前线程被其他线程中断
         *
         * lock()
         * 获取锁,如果锁无法获取,当前线程被阻塞,直到锁可以获取并获取成功为止。
         */
  lock.lockInterruptibly();//加可中断的锁
  try {
    for (;;) {
      if (count != items.length) {//队列未满
        insert(e);
        return true;
      }
      if (nanos <= 0)//已超时
        return false;
      try {
        /*
        * 进行等待:
        * 在这个过程中可能发生三件事:
        * 1、被唤醒-->继续当前这个for(;;)循环
        * 2、超时-->继续当前这个for(;;)循环
        * 3、被中断-->之后直接执行catch部分的代码
        */
        nanos = notFull.awaitNanos(nanos);//进行等待(在此过程中,时间会流失,在此过程中,线程也可能被唤醒)
      } catch (InterruptedException ie) {//在等待的过程中线程被中断
        notFull.signal(); // 唤醒其他未被中断的线程
        throw ie;
      }
    }
  } finally {
    lock.unlock();
  }
}

无论是第一个offer方法还是第二个offer方法都调用了insert方法,insert方法的步骤是首先添加元素,然后利用inc函数进行索引的添加,最后会唤醒因为队列中没有数据而等待被阻塞的获取数据的方法。

private void insert(E x) {
    items[putIndex] = x; // 元素添加到数组里
    putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
    ++count; // 元素个数+1
    notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}

其中inc函数来改变索引的增加:

final int inc(int i) {
    return (++i == items.length) ? 0 : I;
}

3)put方法

/**
* 在队尾插入一个元素
* 如果队列满了,一直阻塞,直到数组不满了或者线程被中断
*/
public void put(E e) throws InterruptedException {
  if (e == null)
    throw new NullPointerException();
  final E[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    try {
      while (count == items.length)//队列满了,一直阻塞在这里
        /*
        * 一直等待条件notFull,即被其他线程唤醒
        * (唤醒其实就是,有线程将一个元素出队了,然后调用notFull.signal()唤醒其他等待这个条件的线程,同时队列也不慢了)
         */
        notFull.await();
    } catch (InterruptedException ie) {//如果被中断
      notFull.signal(); // 唤醒其他等待该条件(notFull,即入队)的线程
      throw ie;
    }
    insert(e);
  } finally {
    lock.unlock();
  }
}
  • 出队方法

ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

poll方法和remove方法不会阻塞线程。

take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

1)poll方法

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程
    try {
        return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用poll方法
    }
}

poll方法内部调用extract方法:

private E extract() {
  final E[] items = this.items;
  E x = items[takeIndex];//获取出队元素
  items[takeIndex] = null;//将出队元素位置置空
  /*
  * 第一次出队的元素takeIndex==0,第二次出队的元素takeIndex==1
  * (注意:这里出队之后,并没有将后面的数组元素向前移)
  */
  takeIndex = inc(takeIndex);
  --count;//数组元素个数-1
  notFull.signal();//数组已经不满了,唤醒其他等待notFull条件的线程
  return x;//返回出队的元素
}

同样地notfull标志表示数组已经不满,可以执行被阻塞的入队操作。

2)take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
    try {
        while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
            notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
        return extract(); // 调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用take方法
    }
}

3)remove方法

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素
            if (o.equals(items[i])) { // 两个对象相等的话
                removeAt(i); // 调用removeAt方法
                return true; // 删除成功,返回true
            }
        }
        return false; // 删除成功,返回false
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用remove方法
    }
}

以及

void removeAt(int i) {
    final Object[] items = this.items;
    if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
    --count; // 元素个数-1
    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
}

原文地址:https://www.cnblogs.com/winterfells/p/8870104.html

时间: 2024-11-09 00:18:31

Java源码解析——集合框架(二)——ArrayBlockingQueue的相关文章

Java源码之集合框架(图)

百度java 集合图时,搜出来一张图,图的蛮不错的,现在借用一下. 图片来自:http://blog.csdn.net/bondsui/article/details/8520078

【原】Android热更新开源项目Tinker源码解析系列之二:资源文件热更新

上一篇文章介绍了Dex文件的热更新流程,本文将会分析Tinker中对资源文件的热更新流程. 同Dex,资源文件的热更新同样包括三个部分:资源补丁生成,资源补丁合成及资源补丁加载. 本系列将从以下三个方面对Tinker进行源码解析: Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Android热更新开源项目Tinker源码解析系列之二:资源热更新 Android热更新开源项目Tinker源码解析系类之三:so热更新 转载请标明本文来源:http://www.cnblogs

JAVA 笔记(三) 从源码深入浅出集合框架

集合框架概述 以Java来说,我们日常所做的编写代码的工作,其实基本上往往就是在和对象打交道. 但显然有一个情况是,一个应用程序里往往不会仅仅只包含数量固定且生命周期都是已知的对象. 所以,就需要通过一些方式来对对象进行持有,那么通常是通过怎么样的方式来持有对象呢? 通过数组是最简单的一种方式,但其缺陷在于:数组的尺寸是固定的,即数组在初始化时就必须被定义长度,且无法改变. 也就说,通过数组来持有对象虽然能解决对象生命周期的问题,但仍然没有解决对象数量未知的问题. 这也是集合框架出现的核心原因,

[java源码解析]对HashMap源码的分析(二)

上文我们讲了HashMap那骚骚的逻辑结构,这一篇我们来吹吹它的实现思想,也就是算法层面.有兴趣看下或者回顾上一篇HashMap逻辑层面的,可以看下HashMap源码解析(一).使用了哈希表得"拉链法". 我打算按这个顺序来讲HashMap:几个关键属性 -> 构造方法-> 存取元素方法 ->解决hash冲突方法->HashMap扩容问题. 4个关键属性: /** *HashMap的存储大小 */ transient int size; /** * HashMa

Java源码解析 - ThreadPoolExecutor 线程池

1 线程池的好处 线程使应用能够更加充分合理地协调利用CPU.内存.网络.I/O等系统资源.线程的创建需要开辟虚拟机栈.本地方法栈.程序计数器等线程私有的内存空间;在线程销毁时需要回收这些系统资源.频繁地创建和销毁线程会浪费大量的系统资源,增加并发编程风险. 在服务器负载过大的时候,如何让新的线程等待或者友好地拒绝服务? 这些都是线程自身无法解决的;所以需要通过线程池协调多个线程,并实现类似主次线程隔离.定时执行.周期执行等任务. 线程池的作用包括:●利用线程池管理并复用线程.控制最大并发数等●

深入Java源码解析容器类List、Set、Map

1 常用容器继承关系图先上一张网上的继承关系图个人觉得有些地方不是很准确,比如Iterator不是容器,只是一个操作遍历集合的方法接口,所以不应该放在里面.并且Map不应该继承自Collection.所以自己整理了一个常用继承关系图如下如上图所示,接下去会自顶向下解释重要的接口和实现类.2 Collection和Map在Java容器中一共定义了2种集合, 顶层接口分别是Collection和Map.但是这2个接口都不能直接被实现使用,分别代表两种不同类型的容器.简单来看,Collection代表

【Java源码解析】-- HashMap源码解析

目录 源码解析 1.构造方法 无参构造方法 int型参数的构造方法 int,float两个参数的构造方法 hsah方法 2.添加元素(put()方法) 3.扩容方法(resize()方法) 4.获取元素(get()方法) 5.移除元素(remove()) 6.树化(treeifyBin()) 关于HashMap常见的问题 1.为什么容量始终是2的幂次? 3.既然红黑树那么好,为啥hashmap不直接采用红黑树,而是当大于等于8个的时候才转换红黑树? 4.JDK1.7 扩容死锁产生原因 5.JDK

JDK8源码解析 -- HashMap(二)

在上一篇JDK8源码解析 -- HashMap(一)的博客中关于HashMap的重要知识点已经讲了差不多了,还有一些内容我会在今天这篇博客中说说,同时我也会把一些我不懂的问题抛出来,希望看到我这篇博客的大神帮忙解答困扰我的问题,让我明白一个所以然来.彼此互相进步,互相成长.HashMap从jdk7到jdk8版本改变大,1.新增加的节点在链表末尾进行添加  2.使用了红黑树. 1. HashMap容量大小求值方法 // 返回2的幂次 static final int tableSizeFor(in

Java源码解析之HashMap

一.HashMap类声明: HashMap继承于AbstractMap并且实现了接口Map,Cloneable,Serializable. public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {} 二.HashMap类层次: HashMap实现了三个接口,继承一个抽象类.除此之外我们应该知道Object是所有类的超类.之所以有一个A