Java多线程(十):BlockingQueue实现生产者消费者模型

BlockingQueue

BlockingQueue、解决了多线程中,如何高效安全“传输”数据的问题。程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程。

方法介绍


BlockingQueue是Queue的子类

void put(E e)

插入指定元素,当BlockingQueue为满,则线程阻塞,进入Waiting状态,直到BlockingQueue有空闲空间再继续。
这里以ArrayBlockingQueue为例进行分析

void take()

队首出队,当BlockingQueue为空,则线程阻塞,进入Waiting状态,直到BlockingQueue不为空再继续。

int drainTo(Collection<? super E> c)

从队列中批量取出数据,并放入到另一个集合中,返回转移数据的数量,只需一次加锁和解锁。

BlockingQueue的实现类

ArrayBlockingQueue

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

基于数组实现的BlockingQueue,需要指定队列容量,可以指定是否为公平锁;只有一个ReentrantLock,生产者和消费者不能异步执行。

LinkedBlockingQueue

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

基于链表实现的BlockingQueue,可以指定队列容量,不指定队列容量默认为Integer.MAX_VALUE;有两个ReentrantLock,生产者和消费者可以异步执行。

BlockingQueue实现生产者消费者模型

  • 缓冲区可以存放大量数据
  • 生产者和消费者速度各不相同
public class MyThread42 {
    public static void main(String[] args)
    {
        final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
        Runnable producerRunnable = new Runnable()
        {
            int i = 0;
            public void run()
            {
                while (true)
                {
                    try
                    {
                        System.out.println("我生产了一个" + i++);
                        bq.put(i + "");
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };

        Runnable customerRunnable = new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    try
                    {
                        System.out.println("我消费了一个" + bq.take());
                        Thread.sleep(3000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread producerThread = new Thread(producerRunnable);
        Thread customerThread = new Thread(customerRunnable);
        producerThread.start();
        customerThread.start();
    }
}

输出结果如下

我生产了一个0
我消费了一个1
我生产了一个1
我生产了一个2
我消费了一个2
我生产了一个3
我生产了一个4
我生产了一个5
我消费了一个3
我生产了一个6
我生产了一个7
我生产了一个8
我消费了一个4
我生产了一个9
我生产了一个10
我生产了一个11
我消费了一个5
我生产了一个12
我生产了一个13
我生产了一个14
我消费了一个6
我生产了一个15
我生产了一个16
我消费了一个7
我生产了一个17
我消费了一个8
我生产了一个18
我消费了一个9
我生产了一个19
我消费了一个10
我生产了一个20
我消费了一个11
我生产了一个21
我消费了一个12
我生产了一个22
我消费了一个13
我生产了一个23
我消费了一个14
我生产了一个24

······

生产者没有生产到BlockingQueue的容量(极限是10)之前,生产3个,消费1个,再生产到BlockingQueue的容量之后,生产一个消费一个,因为不能超过BlockingQueue的容量。

原文地址:https://www.cnblogs.com/Java-Starter/p/11315057.html

时间: 2024-10-07 14:25:53

Java多线程(十):BlockingQueue实现生产者消费者模型的相关文章

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

多线程学习-基础(十二)生产者消费者模型:wait(),sleep(),notify()实现

一.多线程模型一:生产者消费者模型   (1)模型图:(从网上找的图,清晰明了) (2)生产者消费者模型原理说明: 这个模型核心是围绕着一个"仓库"的概念,生产者消费者都是围绕着:"仓库"来进行操作,一个仓库同时只能被一个生产者线程或一个消费者线程所操作,synchronized锁住的也是这个仓库,仓库是一个容器,所以会有边界值,0和仓库可存放上限,在这个上限内,可以设置多种级别,不同的级别可以执行不同的策略流程. (3)本案例使用知识点: Thread.curre

Java线程:并发协作-生产者消费者模型

对于多线程程序来说,不管任何编程语言,生产者消费者模型都是最经典的. 实际上,准确的说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确以下几点: 生产者仅仅在仓储未满时候生产,仓满则停止生产. 消费者仅仅在仓储有产品时候才能消费,仓空则等待. 当消费者发现仓储没有产品的时候会通知生产者生产. 生产者在生产出可消费产品时候,应该通知消费者去消费. 此模型将要结合java.lang.Object的wait与notify,notify

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生

Java 多线程学习笔记:生产者消费者问题

前言:最近在学习Java多线程,看到ImportNew网上有网友翻译的一篇文章<阻塞队列实现生产者消费者模式>.在文中,使用的是Java的concurrent包中的阻塞队列来实现.在看完后,自行实现阻塞队列. (一)准备 在多线程中,生产者-消费者问题是一个经典的多线程同步问题.简单来说就是有两种线程(在这里也可以做进程理解)——生产者和消费者,他们共享一个固定大小的缓存区(如一个队列).生产者负责产生放入新数据,消费者负责取出缓存区的数据.具体介绍请参考 Producer-consumer

Java多线程之并发协作生产者消费者设计模式

两个线程一个生产者个一个消费者 需求情景 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个 涉及问题 同步问题:如何保证同一资源被多个线程并发访问时的完整性.常用的同步方法是采用标记或加锁机制 wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制. wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行. notify

Linux多线程之同步2 &mdash;&mdash; 生产者消费者模型

思路 生产者和消费者(互斥与同步).资源用队列模拟(要上锁,一个时间只能有一个线程操作队列). m个生产者.拿到锁,且产品不满,才能生产.当产品满,则等待,等待消费者唤醒.当产品由空到不空,通知消费者.n个消费者.拿到锁,且有产品,才能消费.当产品空,则等待,等待生产者唤醒.当产品由满到不满,通知生产者.    生产者条件:队列不满消费者条件:队列不空因此有两个条件变量. 代码 /**********************************************************

Java多线程之~~~~使用wait和notify实现生产者消费者模型

在多线程开发中,最经典的一个模型就是生产者消费者模型,他们有一个缓冲区,缓冲区有最大限制,当缓冲区满 的时候,生产者是不能将产品放入到缓冲区里面的,当然,当缓冲区是空的时候,消费者也不能从中拿出来产品,这就 涉及到了在多线程中的条件判断,java为了实现这些功能,提供了wait和notify方法,他们可以在线程不满足要求的时候 让线程让出来资源等待,当有资源的时候再notify他们让他们继续工作,下面我们用实际的代码来展示如何使用wait和 notify来实现生产者消费者这个经典的模型. 首先是

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也.-<论语> PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~ Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue.Queue 用来临时保存一组等待处理的元素.BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作. BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型. 这里写图