生产者消费者模型Java实现

生产者消费者模型

生产者消费者模型可以描述为:
①生产者持续生产,直到仓库放满产品,则停止生产进入等待状态;仓库不满后继续生产;
②消费者持续消费,直到仓库空,则停止消费进入等待状态;仓库不空后,继续消费;
③生产者可以有多个,消费者也可以有多个;

生产者消费者模型

对应到程序中,仓库对应缓冲区,可以使用队列来作为缓冲区,并且这个队列应该是有界的,即最大容量是固定的;进入等待状态,则表示要阻塞当前线程,直到某一条件满足,再进行唤醒。

常见的实现方式主要有以下几种。
①使用wait()notify()
②使用LockCondition
③使用信号量Semaphore
④使用JDK自带的阻塞队列
⑤使用管道流


使用wait()和notify()实现

前提是要熟悉Object的几个方法:

  • wait():当前线程释放锁,直到等到通知,再去获取锁
  • sleep():当前线程休眠,但不释放锁
  • notify():唤醒其他正在wait的线程

参考代码如下:

public class ProducerConsumer1 {

    class Producer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private int maxSize;

        public Producer(String threadName, Queue<Goods> queue, int maxSize) {
            this.threadName = threadName;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run() {
            while (true) {
                //模拟生产过程中的耗时操作
                Goods goods = new Goods();
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (queue) {
                    while (queue.size() == maxSize) {
                        try {
                            System.out.println("队列已满,【" + threadName + "】进入等待状态");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    queue.add(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    queue.notifyAll();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private Queue<Goods> queue;

        public Consumer(String threadName, Queue<Goods> queue) {
            this.threadName = threadName;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                Goods goods;
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            System.out.println("队列已空,【" + threadName + "】进入等待状态");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    goods = queue.remove();
                    System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    queue.notifyAll();
                }
                //模拟消费过程中的耗时操作
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();

        Thread producer1 = new Producer("生产者1", queue, maxSize);
        Thread producer2 = new Producer("生产者2", queue, maxSize);
        Thread producer3 = new Producer("生产者3", queue, maxSize);

        Thread consumer1 = new Consumer("消费者1", queue);
        Thread consumer2 = new Consumer("消费者2", queue);

        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();

        while (true) {

        }
    }
}

几个注意的地方:

①确定锁的对象是队列queue

②不要把生产过程和消费过程写在同步块中,这些操作无需同步,同步的仅仅是放入和取出这两个动作;

③因为是持续生产,持续消费,要用while(true){...}的方式将【生产、放入】或【取出、消费】的操作都一直进行。

④但由于是对队列使用synchronized的方式加锁,同一时刻,要么在放入,要么在取出,两者不能同时进行。


使用Lock和Condition实现

前提是要熟悉Lock接口以及常用实现类ReentrantLock,以及Condition的两个常用方法:

  • await():等待Condition的满足,会释放锁
  • signal():唤醒其他正在等待该Condition的线程
    参考代码如下:
public class ProducerConsumer2 {

    class Producer extends Thread {

        private String threadName;
        private Queue<Goods> queue;
        private Lock lock;
        private Condition notFullCondition;
        private Condition notEmptyCondition;
        private int maxSize;

        public Producer(String threadName, Queue<Goods> queue, Lock lock, Condition notFullCondition, Condition notEmptyCondition, int maxSize) {
            this.threadName = threadName;
            this.queue = queue;
            this.lock = lock;
            this.notFullCondition = notFullCondition;
            this.notEmptyCondition = notEmptyCondition;
            this.maxSize = maxSize;

        }

        @Override
        public void run() {
            while (true) {
                //模拟生产过程中的耗时操作
                Goods goods = new Goods();
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock.lock();
                try {
                    while (queue.size() == maxSize) {
                        try {
                            System.out.println("队列已满,【" + threadName + "】进入等待状态");
                            notFullCondition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    queue.add(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    notEmptyCondition.signalAll();

                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private Lock lock;
        private Condition notFullCondition;
        private Condition notEmptyCondition;

        public Consumer(String threadName, Queue<Goods> queue, Lock lock, Condition notFullCondition, Condition notEmptyCondition) {
            this.threadName = threadName;
            this.queue = queue;
            this.lock = lock;
            this.notFullCondition = notFullCondition;
            this.notEmptyCondition = notEmptyCondition;
        }

        @Override
        public void run() {
            while (true) {
                Goods goods;
                lock.lock();
                try {
                    while (queue.isEmpty()) {
                        try {
                            System.out.println("队列已空,【" + threadName + "】进入等待状态");
                            notEmptyCondition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    goods = queue.remove();
                    System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    notFullCondition.signalAll();

                } finally {
                    lock.unlock();
                }

                //模拟消费过程中的耗时操作
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();
        Lock lock = new ReentrantLock();
        Condition notEmptyCondition = lock.newCondition();
        Condition notFullCondition = lock.newCondition();

        Thread producer1 = new ProducerConsumer2.Producer("生产者1", queue, lock, notFullCondition, notEmptyCondition, maxSize);
        Thread producer2 = new ProducerConsumer2.Producer("生产者2", queue, lock, notFullCondition, notEmptyCondition, maxSize);
        Thread producer3 = new ProducerConsumer2.Producer("生产者3", queue, lock, notFullCondition, notEmptyCondition, maxSize);

        Thread consumer1 = new ProducerConsumer2.Consumer("消费者1", queue, lock, notFullCondition, notEmptyCondition);
        Thread consumer2 = new ProducerConsumer2.Consumer("消费者2", queue, lock, notFullCondition, notEmptyCondition);
        Thread consumer3 = new ProducerConsumer2.Consumer("消费者3", queue, lock, notFullCondition, notEmptyCondition);

        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        while (true) {

        }
    }
}

要注意的地方:

放入和取出操作均是用的同一个锁,所以在同一时刻,要么在放入,要么在取出,两者不能同时进行。因此,与使用wait()和notify()实现类似,这种方式的实现并不能最大限度地利用缓冲区(即例子中的队列)。如果要实现同一时刻,既可以放入又可以取出,则要使用两个重入锁,分别控制放入和取出的操作,具体实现可以参考LinkedBlockingQueue


使用信号量Semaphore实现

前提是熟悉信号量Semaphore的使用方式,尤其是release()方法,Semaphorerelease之前不必一定要先acquire。(如果不熟悉Semaphore,可以参考阅读【多线程与并发】Java并发工具类)

There is no requirement that a thread that releases a permit must
have acquired that permit by calling acquire.
Correct usage of a semaphore is established by programming convention
in the application.

参考代码如下:

public class ProducerConsumer4 {

    class Producer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private Semaphore queueSizeSemaphore;
        private Semaphore concurrentWriteSemaphore;
        private Semaphore notEmptySemaphore;

        public Producer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
            this.threadName = threadName;
            this.queue = queue;
            this.concurrentWriteSemaphore = concurrentWriteSemaphore;
            this.queueSizeSemaphore = queueSizeSemaphore;
            this.notEmptySemaphore = notEmptySemaphore;
        }

        @Override
        public void run() {
            while (true) {
                //模拟生产过程中的耗时操作
                Goods goods = new Goods();
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    queueSizeSemaphore.acquire();//获取队列未满的信号量
                    concurrentWriteSemaphore.acquire();//获取读写的信号量
                    queue.add(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    concurrentWriteSemaphore.release();
                    notEmptySemaphore.release();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private Queue<Goods> queue;
        private Semaphore queueSizeSemaphore;
        private Semaphore concurrentWriteSemaphore;
        private Semaphore notEmptySemaphore;

        public Consumer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
            this.threadName = threadName;
            this.queue = queue;
            this.concurrentWriteSemaphore = concurrentWriteSemaphore;
            this.queueSizeSemaphore = queueSizeSemaphore;
            this.notEmptySemaphore = notEmptySemaphore;
        }

        @Override
        public void run() {
            while (true) {
                Goods goods;
                try {
                    notEmptySemaphore.acquire();
                    concurrentWriteSemaphore.acquire();
                    goods = queue.remove();
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    concurrentWriteSemaphore.release();
                    queueSizeSemaphore.release();
                }

                //模拟消费过程中的耗时操作
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();
        Semaphore concurrentWriteSemaphore = new Semaphore(1);
        Semaphore notEmptySemaphore = new Semaphore(0);
        Semaphore queueSizeSemaphore = new Semaphore(maxSize);

        Thread producer1 = new ProducerConsumer4.Producer("生产者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread producer2 = new ProducerConsumer4.Producer("生产者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread producer3 = new ProducerConsumer4.Producer("生产者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);

        Thread consumer1 = new ProducerConsumer4.Consumer("消费者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread consumer2 = new ProducerConsumer4.Consumer("消费者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread consumer3 = new ProducerConsumer4.Consumer("消费者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);

        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        while (true) {
        }
    }
}

要注意的地方:

①理解代码中的三个信号量的含义
queueSizeSemaphore:(其中的许可证数量,可以理解为队列中可以再放入多少个元素),该信号量的许可证初始数量为仓库大小,即maxSize;生产者每放置一个商品,则该信号量-1,即执行acquire(),表示队列中已经添加了一个元素,要减少一个许可证;消费者每取出一个商品,该信号量+1,即执行release(),表示队列中已经少了一个元素,再给你一个许可证。
notEmptySemaphore:(其中的许可证数量,可以理解为队列中可以取出多少个元素),该信号量的许可证初始数量为0;生产者每放置一个商品,则该信号量+1,即执行release(),表示队列中添加了一个元素;消费者每取出一个商品,该信号量-1,即执行acquire(),表示队列中已经少了一个元素,要减少一个许可证;
concurrentWriteSemaphore,相当于一个写锁,在放入或取出商品的时候,都需要先获取再释放许可证。

②由于实现中,使用了concurrentWriteSemaphore实现了对队列并发写的控制,在同一时刻,只能对队列进行一种操作:放入或取出。假如把concurrentWriteSemaphore中的信号量初始化为2或者2以上的值,就会出现多个生产者同时放入或多个消费者同时消费的情况,而使用的LinkedList是不允许并发进行这种修改的,否则会出现溢出或取空的情况。所以,concurrentWriteSemaphore只能设置为1,也就导致性能与使用wait() / notify()方式类似,性能不高。


使用jdk自带的阻塞队列实现

前提是要记住两个阻塞取放方法,因为阻塞队列提供了很多存取元素的方法,几种存取方式在队列已满/已空时采取的措施如下:

方法/方式处理 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

所以,在这里,要选用put()take()这两个会阻塞的方法。

参考代码如下:

public class ProducerConsumer3 {

    class Producer extends Thread {
        private String threadName;
        private BlockingQueue<Goods> queue;

        public Producer(String threadName, BlockingQueue<Goods> queue) {
            this.threadName = threadName;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true){
                Goods goods = new Goods();
                try {
                    //模拟生产过程中的耗时操作
                    Thread.sleep(new Random().nextInt(100));
                    queue.put(goods);
                    System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        private String threadName;
        private BlockingQueue<Goods> queue;

        public Consumer(String threadName, BlockingQueue<Goods> queue) {
            this.threadName = threadName;
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true){
                try {
                    Goods goods = queue.take();
                    System.out.println("【" + threadName + "】消费了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
                    //模拟消费过程中的耗时操作
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void test() {

        int maxSize = 5;
        BlockingQueue<Goods> queue = new LinkedBlockingQueue<>(maxSize);

        Thread producer1 = new ProducerConsumer3.Producer("生产者1", queue);
        Thread producer2 = new ProducerConsumer3.Producer("生产者2", queue);
        Thread producer3 = new ProducerConsumer3.Producer("生产者3", queue);

        Thread consumer1 = new ProducerConsumer3.Consumer("消费者1", queue);
        Thread consumer2 = new ProducerConsumer3.Consumer("消费者2", queue);

        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();

        while (true) {
        }
    }
}

要注意的地方:

如果使用LinkedBlockingQueue作为队列实现,则可以实现:在同一时刻,既可以放入又可以取出,因为LinkedBlockingQueue内部使用了两个重入锁,分别控制取出和放入。
如果使用ArrayBlockingQueue作为队列实现,则在同一时刻只能放入或取出,因为ArrayBlockingQueue内部只使用了一个重入锁来控制并发修改操作。


使用管道流实现

//TODO


无锁的缓存框架: Disruptor

BlockingQueue 实现生产者和消费者模式简单易懂,但是BlockingQueue并不是一个高性能的实现:它完全使用锁和阻塞来实现线程之间的同步。在高并发的场合,它的性能并不是特别的优越。(ConconcurrentLinkedQueue是一个高性能的队列,但并不没有实现BlockingQueue接口,即不支持阻塞操作)。

Disruptor是LMAX公司开发的高效的无锁缓存队列。它使用无锁的方式实现了一个环形队列,非常适合于实现生产者和消费者模式,如:事件和消息的发布。

//TODO 应用场景的代码实现


参考

Java 实现生产者 – 消费者模型:各种实现方式的性能
高性能的生产者-消费者:无锁的实现:无锁实现
Java生产者和消费者模型的5种实现方式
生产者/消费者问题的多种Java实现方式
Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析:两种常用阻塞队列的区别

完整代码示例在此

作者:maxwellyue
链接:https://www.jianshu.com/p/7cbb6b0bbabc
来源:简书

原文地址:https://www.cnblogs.com/xiaoshen666/p/11258643.html

时间: 2024-10-10 09:30:29

生产者消费者模型Java实现的相关文章

生产者消费者模型java

马士兵老师的生产者消费者模型,我感觉理解了生产者消费者模型,基本懂了一半多线程. public class ProducerConsumer { public static void main(String[] args) { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); new Thread(p).start(); new Thread(c).start

生产者消费者模型-Java代码实现

什么是生产者-消费者模式 比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者,生产者消费者要解决的问题就是如何处理公共资源. 生产者-消费者模式的特点 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据 当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据:当

如何使用阻塞队列来实现生产者-消费者模型?

什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型? java.util.concurrent.BlockingQueue的特性是:当队列是空的时,从队列中获取或删除元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞. 阻塞队列不接受空值,当你尝试向队列中添加空值的时候,它会抛出NullPointerException. 阻塞队列的实现都是线程安全的,所有的查询方法都是原子的并且使用了内部锁或者其他形式的并发控制. BlockingQueue 接口是java colle

Java多线程之~~~~使用wait和notify实现生产者消费者模型

在多线程开发中,最经典的一个模型就是生产者消费者模型,他们有一个缓冲区,缓冲区有最大限制,当缓冲区满 的时候,生产者是不能将产品放入到缓冲区里面的,当然,当缓冲区是空的时候,消费者也不能从中拿出来产品,这就 涉及到了在多线程中的条件判断,java为了实现这些功能,提供了wait和notify方法,他们可以在线程不满足要求的时候 让线程让出来资源等待,当有资源的时候再notify他们让他们继续工作,下面我们用实际的代码来展示如何使用wait和 notify来实现生产者消费者这个经典的模型. 首先是

java生产者消费者模型

import java.util.Queue;import java.util.concurrent.LinkedBlockingQueue; public class Consumer extends Thread {    private String product;    private Queue<String> storeHouse = new LinkedBlockingQueue<String>();        public Consumer(){      

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题.有如下几个常见的实现方法: 1. wait()/notify() 2. lock & condition 3. BlockingQueue 下面来逐一分析. 1. wait()/notify() 第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行:这也是最原始的实现. 1 public class WaitNotifyBroker<T> implements Broker&

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

Java线程:并发协作-生产者消费者模型

对于多线程程序来说,不管任何编程语言,生产者消费者模型都是最经典的. 实际上,准确的说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确以下几点: 生产者仅仅在仓储未满时候生产,仓满则停止生产. 消费者仅仅在仓储有产品时候才能消费,仓空则等待. 当消费者发现仓储没有产品的时候会通知生产者生产. 生产者在生产出可消费产品时候,应该通知消费者去消费. 此模型将要结合java.lang.Object的wait与notify,notify

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也.-<论语> PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~ Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue.Queue 用来临时保存一组等待处理的元素.BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作. BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型. 这里写图