Java并发编程(4)--生产者与消费者模式介绍

一、前言

  这种模式在生活是最常见的,那么它的场景是什么样的呢? 下面是我假象的,假设有一个仓库,仓库有一个生产者和一个消费者,消费者过来消费的时候会检测仓库中是否有库存,如果没有了则等待生产,如果有就先消费直至消费完成;而生产者每天的工作就是先检测仓库是否有库存,如果没有就开始生产,满仓了就停止生产等待消费,直至工作结束。下图是根据假象画的流程图:

  那么在程序中怎么才能达到这样的效果呢?下面介绍三种方式实现。

二、使用notify() 和 wait()实现

  相信大家这两个方法都不陌生,它是Object类中的两个方法,具体请看源码中的解释。提醒一点就是使用notify()和wait()方法时必须拥有对象锁

  根据上面假象我这定义一下明确场景:仓库库存有个最大值,如果仓库库存已经达到最大值那么就停止生产,小于就需要生产; 如果库存等于0则需要等待生产停止消费。另外生产者有个生产目标,当它生产了目标数后就结束生产;消费者也是,当消费一定的数据后就结束消费,否则等待消费。

  见下面代码:

package com.yuanfy.jmm.threads;

import com.yuanfy.util.SleepUtils;

import java.util.concurrent.TimeUnit;

public class Factory {
    // 当前库存大小
    private int size;
    // 库存容量(最大库存值)
    private int capacity;

    public Factory(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void produce(int num) {
        try {
            System.out.println("+++++生产者【" + Thread.currentThread().getName()
                    + "】, 他的任务是生产" + num + "件产品.");
            // 当生产完成就停止
            while (num > 0) {
                // 如果当前库存大小大于或等于库存容量值了,则停止生产等待消费。
                if (size >= capacity) {
                    System.out.println("+++++" + Thread.currentThread().getName() +
                            "检测库存已满,停止生产等待消费...");
                    // 等待消费
                    wait();
                    System.out.println("+++++" + Thread.currentThread().getName() + "开始生产...");
                }
                // 否则继续生产
                int inc = (num + size) > capacity ? (capacity - size) : num;
                num -= inc;
                size += inc;
                SleepUtils.second(1);
                System.out.println("+++++" + Thread.currentThread().getName() + " 生产了" + inc + "件,当前库存有" + size + "件.");
                // 生产后唤醒消费者
                notify();
            }
            System.out.println("+++++生产者【" + Thread.currentThread().getName()
                    + "】 生产结束.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized void consume(int num) {
        try {
            System.out.println("-----消费者【" + Thread.currentThread().getName()
                    + "】, 他需要消费" + num + "件产品.");
            // 当消费完成则停止
            while (num > 0) {
                // 如果当前库存大小小于等于0,则停止消费等待生产。
                if (size <= 0) {
                    System.out.println("-----" + Thread.currentThread().getName() + " 检测库存已空,停止消费等待生产...");
                    // 等待生产
                    wait();
                    System.out.println("-----" + Thread.currentThread().getName() + " 开始消费...");
                }
                // 否则继续消费
                int dec = (size - num) > 0 ? num : size;
                num -= dec;
                size -= dec;
                SleepUtils.second(1);
                System.out.println("-----" + Thread.currentThread().getName() + " 消费了" + dec + "件,当前有" + size + "件.");
                // 消费后唤醒生产者继续生产
                notify();
            }
            System.out.println("-----消费者【" + Thread.currentThread().getName()
                    + "】 消费结束.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  上面是工厂(仓库)类,主要包含两个任务一个是生产一个是消费,接下来创建两个线程去调用它,如下:

package com.yuanfy.jmm.threads;

/**
 * 生产线程
 */
class Produce {
    private Factory factory;

    public Produce(Factory factory) {
        this.factory = factory;
    }

    public void produce(String name, final int num) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                factory.produce(num);
            }
        }, name).start();
    }
}
/**
 * 消费线程
 */
class Consume {
    private Factory factory;

    public Consume(Factory factory) {
        this.factory = factory;
    }

    public void consume(String name, final int num) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                factory.consume(num);
            }
        }, name).start();
    }
}

public class ProduceConsumeDemo {

    public static void main(String[] args) {
        Factory f = new Factory(500);

        Consume consume = new Consume(f);
        consume.consume("消费线程",600);

        Produce produce = new Produce(f);
        produce.produce("生产线程",800);
    }
}

  注意上方,消费线程和生产线程都是拥有同一个工厂对象,然后进行生产和消费模式。那么我们直接运行,结果如下:

  

三、使用锁中的Condition对象进行控制

  这种方式估计用的比较少,因为使用Condition必须先使用锁Lock。这里我只介绍怎么用Condition对象进行控制实现生产者与消费者模式的实现。

  其实它跟上面那种方法有点类似,Condition对象中await()方法表示等待,signal()方法表示唤醒(看了AQS源码的应该都知道有这个对象且了解过这两个方法)。下面看下具体怎么实现:

public class Factory {
    // 当前大小
    private int size;

    // 总容量
    private int capacity;

    private Lock lock;

    // 已满的条件
    private Condition fullCondition;

    // 已空的条件
    private Condition emptyCondition;

    public Factory(int capacity) {
        this.capacity = capacity;
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        emptyCondition = lock.newCondition();
    }

    public void produce(int no) {
        lock.lock();
        try {
            while (no > 0) {
                while (size >= capacity) {
                    System.out.println(Thread.currentThread().getName() + " 报告仓库已满,等待快递员取件...");
                    fullCondition.await();
                    System.out.println(Thread.currentThread().getName() + " 报告开始进货...");
                }
                int inc = (no + size) > capacity ? (capacity - size) : no;
                no -= inc;
                size += inc;
                System.out.println(Thread.currentThread().getName() +
                        " 报告进货了: " + inc + "件, 当前库存数: " + size);
                emptyCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consume(int no) {
        lock.lock();
        try {
            while (no > 0) {
                while (size <= 0) {
                    System.out.println(Thread.currentThread().getName() + " 报告仓库已空,等待仓库管理员进货");
                    emptyCondition.await();
                    System.out.println(Thread.currentThread().getName() + " 报告开始取件...");
                }
                int dec = (size - no) > 0 ? no : size;
                no -= dec;
                size -= dec;
                System.out.println(Thread.currentThread().getName() +
                        " 报告取件: " + dec + ", 当前库存数: " + size);
                fullCondition.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

  看了上面工厂类的代码后是不是跟使用Object中wait()和notify()方法类似呢。 主要区别就是拥有对象的方式不一样,这里使用的lock进行且需要手动释放,而第一种是需要Synchronized进行控制。

四、使用阻塞队列进行实现

  这个就很简单了,它已经封装好等待和唤醒的操作,所以不进行案例分享了。其中涉及到两个重要方法put() 和 take

  

原文地址:https://www.cnblogs.com/yuanfy008/p/9574509.html

时间: 2024-11-18 09:43:33

Java并发编程(4)--生产者与消费者模式介绍的相关文章

JAVA并发实现五(生产者和消费者模式Condition方式实现)

package com.subject01; import java.util.PriorityQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Condition是在java 1.5中才出现的,它用来替代传统的Object的wait().notify()

Java多线程设计模式(2)生产者与消费者模式

1 Producer-Consumer Pattern Producer-Consumer Pattern主要就是在生产者与消费者之间建立一个“桥梁参与者”,用来解决生产者线程与消费者线程之间速度的不匹配. 当要从某个线程Produccer参与者将数据传输给其它线程Consumer参与者的时候,此时就可以在中间加一个Channel参与者,在Channel参与者中以某种方式存放接受的数据,再以某方式来获取收到的数据,Channel就可以来缓存两个线程之间传输的数据,在Channel参与者为了保证安

Java并发编程:Thread类的使用介绍

在学习Thread类之前,先介绍与线程相关知识:线程的几种状态.上下文切换,然后接着介绍Thread类中的方法的具体使用. 以下是本文的目录大纲: 一.线程的状态 二.上下文切换 三.Thread类中的方法 若有不正之处,请多多谅解并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/dolphin0520/p/3920357.html 一.线程的状态 在正式学习Thread类中的具体方法之前,我们先来了解一下线程有哪些状态,这个将会有助于后面

【Java并发编程】并发编程大合集-值得收藏

http://blog.csdn.net/ns_code/article/details/17539599这个博主的关于java并发编程系列很不错,值得收藏. 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Ja

【Java并发编程】并发编程大合集

转载自:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Java并发编程]Volatile关键字(上)

Java并发编程的艺术——互动出版网

这篇是计算机类的优质预售推荐>>>><Java并发编程的艺术> 阿里系和1号店资深技术专家撰写,Java并发编程领域的扛鼎之作,内容在InfoQ等社群得到高度认可,从JDK源码.JVM.CPU等多角度全面剖析与讲解Java并发编程的框架.原理和核心技术 编辑推荐 阿里系和1号店资深技术专家撰写,Java并发编程领域的扛鼎之作 内容在InfoQ等社群得到高度认可,从JDK源码.JVM.CPU等多角度全面剖析与讲解Java并发编程的框架.原理和核心技术 前言 为什么要写这本

5 并发编程--队列&amp;生产者消费者模型

1.队列的介绍 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: maxsize是队列中允许最大项数,省略则无大小限制. 但需要明确: 1.队列内存放的是消息而非大数据 2.队列占用的是内存空间,因而maxsize即便

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put方法将阻塞直到空间可用:如果队列为空,那么take方法将阻塞直到有元素可用.队列可以是有界的也可以是无界的. 如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存.同样,put方法的阻塞特性也极大地简化了生产者的编码.如果使用有界队列,当队列充满时,生产者将阻

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线