Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue

Java中的阻塞队列接口BlockingQueue继承自Queue接口。

BlockingQueue接口提供了3个添加元素方法。

  1. add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常
  2. offer:添加元素到队列里,添加成功返回true,添加失败返回false
  3. put:添加元素到队列里,如果容量满了会阻塞直到容量不满

3个删除方法。

  1. poll:删除队列头部元素,如果队列为空,返回null。否则返回元素。
  2. remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false
  3. take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除

常用的阻塞队列具体类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

本文以ArrayBlockingQueue和LinkedBlockingQueue为例,分析它们的实现原理。

ArrayBlockingQueue

ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)。

ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改。

它带有的属性如下:

// 存储队列元素的数组,是个循环数组
final Object[] items;

// 拿数据的索引,用于take,poll,peek,remove方法
int takeIndex;

// 放数据的索引,用于put,offer,add方法
int putIndex;

// 元素个数
int count;

// 可重入锁
final ReentrantLock lock;
// notEmpty条件对象,由lock创建
private final Condition notEmpty;
// notFull条件对象,由lock创建
private final Condition notFull;

数据的添加

ArrayBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

add方法:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

add方法内部调用offer方法如下:

public boolean offer(E e) {
    checkNotNull(e); // 不允许元素为空
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用offer方法的时候只有1个线程
    try {
        if (count == items.length) // 如果队列已满
            return false; // 直接返回false,添加失败
        else {
            insert(e); // 数组没满的话调用insert方法
            return true; // 返回true,添加成功
        }
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用offer方法
    }
}

insert方法如下:

private void insert(E x) {
    items[putIndex] = x; // 元素添加到数组里
    putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
    ++count; // 元素个数+1
    notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}

put方法:

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 不允许元素为空
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程
    try {
        while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
            notFull.await(); // 线程阻塞并被挂起,同时释放锁
        insert(e); // 调用insert方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用put方法
    }
}

ArrayBlockingQueue的添加数据方法有add,put,offer这3个方法,总结如下:

add方法内部调用offer方法,如果队列满了,抛出IllegalStateException异常,否则返回true

offer方法如果队列满了,返回false,否则返回true

add方法和offer方法不会阻塞线程,put方法如果队列满了会阻塞线程,直到有线程消费了队列里的数据才有可能被唤醒。

这3个方法内部都会使用可重入锁保证原子性。

数据的删除

ArrayBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

poll方法:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程
    try {
        return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用poll方法
    }
}

poll方法内部调用extract方法:

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
    items[takeIndex] = null; // 对应取索引上的数据清空
    takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0
    --count; // 元素个数-1
    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
    return x; // 返回元素
}

take方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
    try {
        while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
            notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
        return extract(); // 调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用take方法
    }
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素
            if (o.equals(items[i])) { // 两个对象相等的话
                removeAt(i); // 调用removeAt方法
                return true; // 删除成功,返回true
            }
        }
        return false; // 删除成功,返回false
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用remove方法
    }
}

removeAt方法:

void removeAt(int i) {
    final Object[] items = this.items;
    if (i == takeIndex) { // 如果要删除数据的索引是取索引位置,直接删除取索引位置上的数据,然后取索引+1即可
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else { // 如果要删除数据的索引不是取索引位置,移动元素元素,更新取索引和放索引的值
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null;
                putIndex = i;
                break;
            }
        }
    }
    --count; // 元素个数-1
    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
}

ArrayBlockingQueue的删除数据方法有poll,take,remove这3个方法,总结如下:

poll方法对于队列为空的情况,返回null,否则返回队列头部元素。

remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false。

poll方法和remove方法不会阻塞线程。

take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中。

这3个方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程。

LinkedBlockingQueue

LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

内部使用放锁和拿锁,这两个锁实现阻塞(“two lock queue” algorithm)。

它带有的属性如下:

// 容量大小
private final int capacity;

// 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger
private final AtomicInteger count = new AtomicInteger(0);

// 头结点
private transient Node<E> head;

// 尾节点
private transient Node<E> last;

// 拿锁
private final ReentrantLock takeLock = new ReentrantLock();

// 拿锁的条件对象
private final Condition notEmpty = takeLock.newCondition();

// 放锁
private final ReentrantLock putLock = new ReentrantLock();

// 放锁的条件对象
private final Condition notFull = putLock.newCondition();

ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只能有1个被执行,不允许并行执行。

而LinkedBlockingQueue有2个锁,放锁和拿锁,添加数据和删除数据是可以并行进行的,当然添加数据和删除数据的时候只能有1个线程各自执行。

数据的添加

LinkedBlockingQueue有不同的几个数据添加方法,add、offer、put方法。

add方法内部调用offer方法:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); // 不允许空元素
    final AtomicInteger count = this.count;
    if (count.get() == capacity) // 如果容量满了,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e); // 容量没满,以新元素构造节点
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程
    try {
        if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行
            enqueue(node); // 节点添加到链表尾部
            c = count.getAndIncrement(); // 元素个数+1
            if (c + 1 < capacity) // 如果容量还没满
                notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
        }
    } finally {
        putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法
    }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
    return c >= 0; // 添加成功返回true,否则返回false
}

put方法:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); // 不允许空元素
    int c = -1;
    Node<E> node = new Node(e); // 以新元素构造节点
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程
    try {
        while (count.get() == capacity) { // 如果容量满了
            notFull.await(); // 阻塞并挂起当前线程
        }
        enqueue(node); // 节点添加到链表尾部
        c = count.getAndIncrement(); // 元素个数+1
        if (c + 1 < capacity) // 如果容量还没满
            notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满
    } finally {
        putLock.unlock(); // 释放放锁,让其他线程可以调用put方法
    }
    if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
        signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
}

LinkedBlockingQueue的添加数据方法add,put,offer跟ArrayBlockingQueue一样,不同的是它们的底层实现不一样。

ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒。

而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程。

数据的删除

LinkedBlockingQueue有不同的几个数据删除方法,poll、take、remove方法。

poll方法:

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0) // 如果元素个数为0
        return null; // 返回null
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock(); // 拿锁加锁,保证调用poll方法的时候只有1个线程
    try {
        if (count.get() > 0) { // 判断队列里是否还有数据
            x = dequeue(); // 删除头结点
            c = count.getAndDecrement(); // 元素个数-1
            if (c > 1) // 如果队列里还有元素
                notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
        }
    } finally {
        takeLock.unlock(); // 释放拿锁,让其他线程可以调用poll方法
    }
    if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
        signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
                return x;
}

take方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程
    try {
        while (count.get() == 0) { // 如果队列里已经没有元素了
            notEmpty.await(); // 阻塞并挂起当前线程
        }
        x = dequeue(); // 删除头结点
        c = count.getAndDecrement(); // 元素个数-1
        if (c > 1) // 如果队列里还有元素
            notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
    } finally {
        takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法
    }
    if (c == capacity) // 由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
        signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
    return x;
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁
    try {
        for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) { // 判断是否找到对象
                unlink(p, trail); // 修改节点的链接信息,同时调用notFull的signal方法
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock(); // 2个锁解锁
    }
}

LinkedBlockingQueue的take方法对于没数据的情况下会阻塞,poll方法删除链表头结点,remove方法删除指定的对象。

需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。

时间: 2024-12-10 19:43:54

Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue的相关文章

Java NIO使用及原理分析(4) 来自网上资料整理

在上一篇文章中介绍了关于缓冲 区的一些细节内容,现在终于可以进入NIO中最有意思的部分非阻塞I/O.通常在进行同步I/O操作时,如果读取数据,代码会阻塞直至有 可供读取的数据.同样,写入调用将会阻塞直至数据能够写入.传统的Server/Client模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求.这种模式带来的一个问题就是线程数量的剧增,大量的线程会 增大服务器的开销.大多数的实现为了避免这个问题,都采用了线程池模型

java阻塞队列

对消息的处理有些麻烦,要保证各种确认.为了确保消息的100%发送成功,笔者在之前的基础上做了一些改进.其中要用到多线程,用于重复发送信息. 所以查了很多关于线程安全的东西,也看到了阻塞队列,发现这个模式很不错,可惜我目前用不到. 关于这个的讲解已经很多了,阻塞这个,就是当队列中没有数据的时候,线程读取的话会等待.当队列中的数据满的时候,线程添加数据的时候,也会等待. 有个例子很生动形象,往盘子里面放鸡蛋,只能放固定数目的.盘子里面没有鸡蛋,无法从中拿出来.当盘子里满了,也放不进去.直到被拿出去才

Java NIO使用及原理分析 (四)(转)

在上一篇文章中介绍了关于缓冲区的一些细节内容,现在终于可以进入NIO中最有意思的部分非阻塞I/O.通常在进行同步I/O操作时,如果读取数据,代码会阻塞直至有 可供读取的数据.同样,写入调用将会阻塞直至数据能够写入.传统的Server/Client模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求.这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销.大多数的实现为了避免这个问题,都采用了线程池模型,并

Java阻塞队列线程集控制的实现

队列以一种先进先出的方式管理数据.如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞. 在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期的把中间结果存到阻塞队列中.而其他工作者线程把中间结果取出并在将来修改它们.队列会 自动平衡负载.如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞.如果第一个线程集运行的快,那么它将等待第二个线程集赶上来. 下面的程序展示了如何使用阻塞队列来控制线程集.程序在一个目录及它的所有

JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用. 1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程.BlockingQueue的核心方法有: boolean add(E e) ,把 e 添加到BlockingQueue里.如果BlockingQueue可以容纳,则返回true,否则抛出异常. boolean offer(E e),表示如果可能的话,将 e 加到B

java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

BlockingQueue是阻塞队列接口类,该接口继承了Queue接口 BlockingQueue实现类常见的有以下几种. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里.有界也就意味着,它不能够存储无限多数量的元素.它有一个同一时间能够存储元素数量的上限.你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改). D

Java NIO使用及原理分析 (一)(转)

最近由于工作关系要做一些Java方面的开发,其中最重要的一块就是Java NIO(New I/O),尽管很早以前了解过一些,但并没有认真去看过它的实现原理,也没有机会在工作中使用,这次也好重新研究一下,顺便写点东西,就当是自己学习 Java NIO的笔记了.本文为NIO使用及原理分析的第一篇,将会介绍NIO中几个重要的概念. 在Java1.4之前的I/O系统中,提供的都是面向流的I/O系统,系统一次一个字节地处理数据,一个输入流产生一个字节的数据,一个输出流消费一个字节的数据,面向流的I/O速度

Java NIO使用及原理分析(二)(转)

在第一篇中,我们介绍了NIO中的两个核心对象:缓冲区和通道,在谈到缓冲区时,我们说缓冲区对象本质上是一个数组,但它其实是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果我们使用get()方法从缓冲区获取数据或者使用put()方法把数据写入缓冲区,都会引起缓冲区状态的变化.本文为NIO使用及原理分析的第二篇,将会分析NIO中的Buffer对象. 在缓冲区中,最重要的属性有下面三个,它们一起合作完成对缓冲区内部状态的变化跟踪: position:指定了下一个将要被写

支付宝app支付java后台流程及原理分析

java版支付宝app支付流程及原理分析 本实例是基于springmvc框架编写     一.流程步骤         1.执行流程           当手机端app(就是你公司开发的app)在支付页面时,调起服务端(后台第1个创建订单接口)接口,后台把需要调起支付宝支付的参数返回给手机端,手机端拿到         这些参数后,拉起支付宝支付环境完成支付,完成支付后会调异步通知(第2个接口),此时需要给支付宝返回成功或者失败信息,成功后会调用同步通知(第3个接口)         返回支付成