Java生产消费模型—ArrayBlockingQueue详解

背景需求

  生产消费模型是线程协作关系中十分常见的一种。通常,一个(多个)线程负责生产,一个(多个)线程可以从生产的列表中获取并消费;生产的内容可以按需求设计,可以是一个Integer,可以是String,可以Object,也可以是任意类型的对象,只要有生产消费的需求。

  例如,厨师负责生产美食,放在桌子上,服务员负责取走(消费)美食。这里,厨师就扮演着生产者的身份,美食是生产的内容,服务员就扮演着消费者的身份。

  下面用这个厨师与服务员的案例来分析下生产消费模型需要实现哪些功能才能满足需求:

如何实现这个需求

  若要实现以上的需求,我们该考虑哪些方面呢?

(1)厨师是厨师,负责做美食;服务员负责消费美食。厨师与服务员可以同时运行(两个独立线程)。

(2)厨师与服务员作为两个独立线程,必须有一个约定好的公共区域:厨师把生产好的美食往这个区域放,服务员从这个区域取。并且,厨师与服务员并不想和对方接触过多(低耦合),只想和这个公共区域(桌子)打交道。

(3)通常,先生产的内容应该被先消费(先做的美食先送给顾客,防止凉了),符合FIFO特性。若要选取某种数据结构的容器作为公共区域,Queue是最佳方案(符合FIFO特性)。

(4)并发有危险:厨师和服务员都在这个公共区域(Queue)中操作,同时操作可能存在问题。例如服务员正在从区域A拿盘子时,厨师把新的盘子也往区域A放,会发生碰撞;又如,同一个盘子可能有多个服务员过来争抢;也可能,多个厨师做好了美食把盘子往同一个区域放,也会发生碰撞。

  因此,需要实现并发的保护:厨师(生产者)往桌子(Queue)上放盘子(生产)之前,先获取锁,以保证他在操作共享区域(Queue)时没有其他厨师或者服务员过来争抢导致发生冲突;在放完之后,释放掉锁,让其他的厨师或者服务员操作。服务员操作时也是一个道理,要先获取锁,操作完成之后要释放锁。

(5)阻塞的需求:若桌子(Queue)空了,服务员该怎么办呢?是每隔几秒钟过来看一下桌子?不好,因为这样太累(轮训方式开销大,并不知道什么时候Queue中才会有新的盘子)。

  比较好的方案是:在桌子上放一个BB机(Queue中实现条件变量),和厨师约定好:若桌子空了,服务员可以去睡觉,等厨师做好饭了,通过BB机呼叫一下服务员(唤醒消费线程)(若Queue消费完毕,消费线程可以阻塞等待【 队列非空】的条件,当生产线程有新的生产内容,把内容放进Queue之后,通过条件变量唤醒消费线程)。而桌子没空的时候(Queue中一直有数据),服务员可以一直工作,则不需要睡觉(消费线程一直消费,不需要等待)。

  同理,也可能出现相反的场景:服务员比较少,端盘子比较慢,而厨师比较多,做饭比较快(生产速度快于消费速度)。这时,若桌子无限大(无界队列),那厨师会一直往桌子上放,导致桌子上盘子越来越多;而若桌子大小有限(有界队列),那么当桌子放满了之后,那就没地儿放了,咋办?

  可以用一样的方式,再在队列内部添加一个条件变量,当队列满了,生产者则等待该队列【队列未满】条件的发生,同时休眠等待。当消费者消费一次之后,触发【队列未满】的条件,这时生产者可以被唤醒继续工作。

Java类库中成熟的设计-ArrayBlockingQueue

为了满足无数场景下以上类似的需求,jdk中加入了该线程安全阻塞FIFO队列的实现类:ArrayBlockingQueue,继承关系如下:

首先,BlockingQueue最基础的是个集合Collection;

同时,实现了Queue的接口,因此具备普通Queue的特性,可以offer/add以添加元素至队列尾部,可以poll以从队列头部取内容,可以peek查看队列头的元素。

同时,实现了BlockingQueue的接口,在Queue基础上实现的特性:

(1)一个是线程安全,可以并发offer,可以并发poll,可以并发同时offer和poll,内部是加锁ReentrantLock实现的;

(2)另一个,就是阻塞功能。

  >> 当调用blockingQueue.put(E e)接口想将元素入队列时,若队列未满,则直接入队列(enqueue);

  若队列已满,则notFull.await()休眠等待条件变量【notFull队列未满】的发生,才唤醒线程继续生产。

  >> 当调用blockingQueue.take()接口时想从队列中取队列头的元素时,若队列为空,则直接取走(dequeue);

  若队列已空,则notEmpty.await()休眠等到条件变量【notEmpry队列未满】的发生,才唤醒线程继续消费。

源码解读

下面,带着以上这些概念的基础,看下源码实现。

首先,成员:

/** The queued items */
final Object[] items; //保存生产内容对象

/** items index for next take, poll, peek or remove */
int takeIndex; //数组下一个要消费位置

/** items index for next put, offer, or add */
int putIndex; //数组中下一个要生产存放的位置

/** Number of elements in the queue */
int count; //当前总共存放的内容对象数量

/** Main lock guarding all access */
final ReentrantLock lock; //并发操作的互斥,读取、写入之前都要获取该锁

/** Condition for waiting takes */
private final Condition notEmpty; //队列非空的条件变量,用于唤醒因队列空掉而阻塞的消费者线程

/** Condition for waiting puts */
private final Condition notFull; //队列非满的条件变量,用于唤醒因队列已满导致阻塞的生产者线程

  从以上的成员可以看得出来,数据是存放在数组Object[] items,并用putIndex指示下一个将要存放的位置,用getIndex存放下一个将要取元素的位置。

例如,假设items容量为5

在存入之前,应该是这样:

<<operation0>>
0      1      2      3      4
null    null    null    null    null
putIndex=0
takeIndex=0

存了一个‘A‘之后,应该是这样: putIndex++

<<operation1>>
0      1      2      3      4
‘A‘     null     null    null     null
      putIndex=1
getIndex=0

  

再存入一个‘B‘之后,应该是这样:putIndex++

<<operation2>>

0      1      2      3      4
‘A‘     ‘B’      null    null     null
            putIndex=2
getIndex=0

 

取一个元素出来,应该是这样:对头的元素‘A‘被取出来了,getIndex++

<<operation3>>
0      1      2      3      4
null    ‘B’      null    null     null
            putIndex=2
      getIndex=1

  

再存入2个元素:

<<operation4>>
0      1      2      3      4
null    ‘B’      ‘C‘      ‘d‘     null
                          putIndex=3
      getIndex=1

此时putIndex已经到头(4),若要再存入,则循环到0:



<<operaion5>>
0      1      2      3      4
null    ‘B’      ‘C‘      ‘d‘     ‘E‘
putIndex=0
      getIndex=1

此时,若再存入一个,则满了

<<operation6>>
0      1      2      3      4
‘F‘     ‘B’      ‘C‘      ‘d‘     ‘E‘
      putIndex=1
      getIndex=1
会发现,putIndex已经赶上了getIndex,没有空间了,那么生产者就会阻塞并等待【队列非满】条件变量的发生。

等到消费者再取一个元素出来,就会触发【队列非满】条件变量,让生产者线程唤醒继续生产。

<<operation7>>
0      1      2      3      4
‘F‘     null     ‘C‘      ‘d‘     ‘E‘
      putIndex=1
            getIndex=2

  

下面贴出部分源码,对应上述思路:

take(), put()

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
private E dequeue() {    // assert lock.getHoldCount() == 1;    // assert items[takeIndex] != null;    final Object[] items = this.items;    @SuppressWarnings("unchecked")    E x = (E) items[takeIndex];    items[takeIndex] = null;    if (++takeIndex == items.length)        takeIndex = 0;    count--;    if (itrs != null)        itrs.elementDequeued();    notFull.signal();    return x;}

  

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
private void enqueue(E x) {    // assert lock.getHoldCount() == 1;    // assert items[putIndex] == null;    final Object[] items = this.items;    items[putIndex] = x;    if (++putIndex == items.length)        putIndex = 0;    count++;    notEmpty.signal();}

offer(), add()

再贴一下其他类似接口的源码:
public boolean offer(E e) {
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		if (count == items.length)
			return false;
		else {
			enqueue(e);
			return true;
		}
	} finally {
		lock.unlock();
	}
}

public boolean add(E e) {
	if (offer(e))
		return true;
	else
		throw new IllegalStateException("Queue full");
}

 offer, add与put职责类型,区别在于:

offer若因队列满了直接返回false,比较温和;而add因队列满了会抛出异常,比较强制;而put若队列满了,会阻塞等待知道队列有位置了再插入元素。

poll()

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

poll()与take()类似,区别在于:

poll时若队列为空,那么直接返回null;而take时,若队列为空,会阻塞直到队列不为空了,再返回队列中的数据;



原文地址:https://www.cnblogs.com/xinxinBlog/p/10624900.html

时间: 2024-11-11 15:02:08

Java生产消费模型—ArrayBlockingQueue详解的相关文章

事件驱动模型实例详解(Java篇)

或许每个软件从业者都有从学习控制台应用程序到学习可视化编程的转变过程,控制台应用程序的优点在于可以方便的练习某个语言的语法和开发习惯(如.net和java),而可视化编程的学习又可以非常方便开发出各类人机对话界面(HMI).可视化编程或许是一个初学者开始对软件感兴趣的开始,也可能是一个软件学习的里程碑点,因为我们可以使用各类软件集成开发环境(IDE)方便的在现成的界面窗口上拖放各类组件(Component),这类组件包括我们常见的按钮(Button),单选按钮(Radio Button),复选框

Java网络编程和NIO详解3:IO模型与Java网络编程模型

Java网络编程和NIO详解3:IO模型与Java网络编程模型 基本概念说明 用户空间与内核空间 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方).操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限.为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间.针对linux操作系统而言,将最高的1G字节(从虚拟地址

Java网络编程和NIO详解2:JAVA NIO一步步构建IO多路复用的请求模型

Java网络编程与NIO详解2:JAVA NIO一步步构建IO多路复用的请求模型 知识点 nio 下 I/O 阻塞与非阻塞实现 SocketChannel 介绍 I/O 多路复用的原理 事件选择器与 SocketChannel 的关系 事件监听类型 字节缓冲 ByteBuffer 数据结构 场景 接着上一篇中的站点访问问题,如果我们需要并发访问10个不同的网站,我们该如何处理? 在上一篇中,我们使用了java.net.socket类来实现了这样的需求,以一线程处理一连接的方式,并配以线程池的控制

Java内存模型(JMM)详解

在Java JVM系列文章中有朋友问为什么要JVM,Java虚拟机不是已经帮我们处理好了么?同样,学习Java内存模型也有同样的问题,为什么要学习Java内存模型.它们的答案是一致的:能够让我们更好的理解底层原理,写出更高效的代码. 就Java内存模型而言,它是深入了解Java并发编程的先决条件.对于后续多线程中的线程安全.同步异步处理等更是大有裨益. 硬件内存架构 在学习Java内存模型之前,先了解一下计算机硬件内存模型.我们多知道处理器与计算机存储设备运算速度有几个数量级的差别.总不能让处理

【JAVA】wait和notify用法,附生产/消费模型

关于wait和notify的用法,网上已经有很多详细解释了,我只是简单的总结下. wait用于释放锁A,并让wait所在的线程阻塞.除非被持有锁A的其它线程执行notify来唤醒,它才能重新"活"过来. notify用于唤醒因为等待锁A而阻塞的线程,让它们做好竞争锁A的准备.如果有多个线程因等待锁A而被阻塞,notify只唤醒一个,唤醒所有用notifyAll. 参考下面的线程状态图,对理解wait和notify有很大的帮助. 总结: wait和notify通常和synchronize

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty 转自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ netty是基于NIO实现的异步事件驱动的网络编程框架,学完NIO以后,应该看看netty的实现,netty框架涉及的内容特别多,这里只介绍netty的基本使用和实现原理,更多扩展的内容将在以后推出. 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎

Java垃圾回收机制(GC)详解

Java垃圾回收机制(GC)详解 简介: 垃圾回收GC(Garbage Collection)是Java语言的核心技术之一,之前我们曾专门探讨过Java 7新增的垃圾回收器G1的新特性,但在JVM的内部运行机制上看,Java的垃圾回收原理与机制并未改变.垃圾收集的目的在于清除不再使用的对象.GC通过确定对象是否被活动对象引用来确定是否收集该对象.GC首先要判断该对象是否是时候可以收集.两种常用的方法是引用计数和对象引用遍历. 垃圾收集的算法分析: Java语言规范没有明确地说明JVM使用哪种垃圾

Java网络编程和NIO详解开篇:Java网络编程基础

Java网络编程和NIO详解开篇:Java网络编程基础 计算机网络编程基础 转自:https://mp.weixin.qq.com/s/XXMz5uAFSsPdg38bth2jAA 我们是幸运的,因为我们拥有网络.网络是一个神奇的东西,它改变了你和我的生活方式,改变了整个世界. 然而,网络的无标度和小世界特性使得它又是复杂的,无所不在,无所不能,以致于我们无法区分甚至无法描述. 对于一个码农而言,了解网络的基础知识可能还是从了解定义开始,认识OSI的七层协议模型,深入Socket内部,进而熟练地

Java网络编程和NIO详解6:Linux epoll实现原理详解

Java网络编程和NIO详解6:Linux epoll实现原理详解 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎阅览我的CSDN专栏:Java网络编程和NIO https://blog.csdn.net/column/details/21963.html 部分代码会放在我的的Github:https://github.com/h2pl/ Linux epoll实现原理详解 在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者pol