生产者消费者模型 学习

简介

得知并发是Java程序员进阶的必经之路,所以从实际代码来先理解 生产者消费者模型

实战

Demo File

package demo;

/**
 * 定义商品
 *
 * @author draymonder
 *
 */
public class Goods {
    public final String name;
    public final int price;
    public final int id;

    // public Goods() {
    // }

    public Goods(String name, int price, int id) {
        this.name = name;
        this.price = price;
        this.id = id;
    }

    @Override
    public String toString() {
        return "name: " + name + ", price: " + price + ", id: " + id;
    }
}
package demo;

import java.util.Random;

/***
 * 线程对象,一个缓存位置,一个生产者,一个消费者,无限生产商品消费商品 流通的商品最多只有一个
 *
 * @author draymonder
 *
 */
public class ProductComsumerDemo1 {
    // 定义一个商品缓存位置
    private volatile Goods goods;

    // 定义一个对象作为锁,不使用goods作为锁是因为生产者每次回产生一个新的对象
    private Object obj = new Object();

    // isNew == true 生产者线程休息,消费者线程消费
    // isNew == false 消费者线程休息,生产者线程生产
    private volatile boolean isNew = false;

    // 商品id编号 每生产一个id自增1
    private int id = 1;

    // 随机产生一个sleep时间
    private Random rnd = new Random();

    // 消费者线程
    public class ComsumeThread implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (obj) {
                        // 当前没有商品
                        if (!isNew) {
                            obj.wait();
                        }
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));
                        // 模拟消费产品
                        System.out.println(goods);
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));

                        isNew = false;
                        // 唤醒阻塞obj上的生产者线程
                        obj.notify();
                    }
                    Thread.sleep(rnd.nextInt(250));
                }
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

    // 生产者线程
    public class ProductThread implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (obj) {
                        // 当前有新的产品没有被消费,需要先消费再生产
                        if (isNew) {
                            obj.wait();
                        }
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));

                        if (id % 2 == 0) {
                            goods = new Goods("商品A" + id, id, id);
                        } else {
                            goods = new Goods("商品B" + id, id, id);
                        }
                        id++;
                        isNew = true;
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));
                    }
                }
            } catch (Exception e) {
                // 什么都不做
            }
        }
    }

    public static void main(String[] args) {
        ProductComsumerDemo1 demo1 = new ProductComsumerDemo1();
        Runnable consume = demo1.new ComsumeThread();
        Runnable product = demo1.new ProductThread();
        // 先生产
        new Thread(product).start();

        // 后消费
        new Thread(consume).start();
        //new Thread(consume).start();
        //new Thread(consume).start();
    }

}

上述代码 构建了一个生产者,一个消费者,并且最多只有一个商品的流通的并发情况。

多个生产者 多个消费者 一个商品的流通

将上述main函数中的生产者,消费者多定义几个,是不是就可以了呢?

public static void main(String[] args) {
    ProductComsumerDemo1 demo1 = new ProductComsumerDemo1();
    Runnable consume = demo1.new ComsumeThread();
    Runnable product = demo1.new ProductThread();
    // 先生产
    new Thread(product).start();

    // 后消费
    new Thread(consume).start();
    new Thread(consume).start();
    new Thread(consume).start();
}

执行结果

name: 商品B1, price: 1, id: 1
name: 商品A2, price: 2, id: 2
name: 商品A2, price: 2, id: 2
name: 商品A2, price: 2, id: 2
name: 商品B3, price: 3, id: 3
name: 商品A4, price: 4, id: 4
name: 商品A4, price: 4, id: 4
name: 商品A4, price: 4, id: 4
name: 商品B5, price: 5, id: 5
name: 商品A6, price: 6, id: 6
name: 商品A6, price: 6, id: 6

可以观察到 单个id商品存在重复使用的情况,为什么呢?

现在我们来分析一下原因。当生产者生产好了商品,会唤醒因没有商品而阻塞消费者线程,假设唤醒的消费者线程超过两个,这两个线程会竞争获取锁,获取到锁的线程就会从obj.wait()方法中返回,然后消费商品,并把isNew置为false,然后释放锁。当被唤醒的另一个线程竞争获取到锁了以后也会从obj.wait()方法中返回。会再次消费同一个商品。
显然,每一个被唤醒的线程应该再次检查isNew这个条件。
所以无论是消费者,还是生产者,isNew的判断必须改成while循环,这样才能得到正确的结果而不受生产者的线程数和消费者的线程数的影响。

而对于只有一个生产者线程,一个消费者线程,用if判断是没有问题的,但是仍然强烈建议改成while语句进行判断。

package demo;

import java.util.Random;

/***
 * 线程对象,一个缓存位置,一个生产者,一个消费者,无限生产商品消费商品 流通的商品最多只有一个
 *
 * @author draymonder
 *
 */
public class ProductComsumerDemo1 {
    // 定义一个商品缓存位置
    private volatile Goods goods;

    // 定义一个对象作为锁,不使用goods作为锁是因为生产者每次回产生一个新的对象
    private Object obj = new Object();

    // isNew == true 生产者线程休息,消费者线程消费
    // isNew == false 消费者线程休息,生产者线程生产
    private volatile boolean isNew = false;

    // 商品id编号 每生产一个id自增1
    private int id = 1;

    // 随机产生一个sleep时间
    private Random rnd = new Random();

    // 消费者线程
    public class ComsumeThread implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (obj) {
                        // 当前没有商品
                        while (!isNew) {
                            obj.wait();
                            System.out.println(Thread.currentThread().getName() + " 正在wait");
                        }
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));
                        // 模拟消费产品
                        System.out.println(goods);
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));

                        isNew = false;
                        // 唤醒阻塞obj上的生产者线程
                        obj.notify();
                    }
                    Thread.sleep(rnd.nextInt(250));
                }
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    }

    // 生产者线程
    public class ProductThread implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (obj) {
                        // 当前有新的产品没有被消费,需要先消费再生产
                        while (isNew) {
                            obj.wait();
                            System.out.println(Thread.currentThread().getName() + " 正在wait");
                        }
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));

                        if (id % 2 == 0) {
                            goods = new Goods("商品A" + id, id, id);
                        } else {
                            goods = new Goods("商品B" + id, id, id);
                        }
                        id++;
                        isNew = true;
                        // 延迟时间
                        Thread.sleep(rnd.nextInt(250));
                    }
                }
            } catch (Exception e) {
                // 什么都不做
            }
        }
    }

    public static void main(String[] args) {
        ProductComsumerDemo1 demo1 = new ProductComsumerDemo1();
        Runnable consume = demo1.new ComsumeThread();
        Runnable product = demo1.new ProductThread();
        // 先生产
        new Thread(product).start();
        new Thread(product).start();
        new Thread(product).start();

        // 后消费
        new Thread(consume).start();
        // new Thread(consume).start();
        // new Thread(consume).start();
    }

}

原文地址:https://www.cnblogs.com/Draymonder/p/10192084.html

时间: 2024-10-10 10:50:11

生产者消费者模型 学习的相关文章

多线程学习-基础(十二)生产者消费者模型:wait(),sleep(),notify()实现

一.多线程模型一:生产者消费者模型   (1)模型图:(从网上找的图,清晰明了) (2)生产者消费者模型原理说明: 这个模型核心是围绕着一个"仓库"的概念,生产者消费者都是围绕着:"仓库"来进行操作,一个仓库同时只能被一个生产者线程或一个消费者线程所操作,synchronized锁住的也是这个仓库,仓库是一个容器,所以会有边界值,0和仓库可存放上限,在这个上限内,可以设置多种级别,不同的级别可以执行不同的策略流程. (3)本案例使用知识点: Thread.curre

Python学习笔记——进阶篇【第九周】———线程、进程、协程篇(队列Queue和生产者消费者模型)

Python之路,进程.线程.协程篇 本节内容 进程.与线程区别 cpu运行原理 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 参考链接http://www.cnblogs.com/alex3714/articles/5230609.html

LINUX学习:生产者&消费者模型复习

回顾一下生产者消费者模型. #include <stdio.h> #include <stdlib.h> #include <string.h> #include <semaphore.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #define ERR_EXIT(m) do { perror(m); exit(EXIT_FAILURE

python2.0_s12_day9之day8遗留知识(queue队列&amp;生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ

#queue队列 #生产者消费者模型

1 #queue队列 #生产者消费者模型 2 3 #queue队列 #有顺序的容器 4 #程序解耦 5 #提高运行效率 6 7 #class queue.Queue(maxsize=0) #先入先出 8 #class queue.LifoQueue(maxsize=0)最后在第一 9 #class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列#VIP客户 10 11 #Queue.qsize() 12 #Queue.empty() #return

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

Python连载38-协程、可迭代、迭代器、生产者消费者模型

一.生产者消费者模型 import multiprocessing from time import ctime def consumer(input_q): print("Into consumer:",ctime()) while True: #处理项 item = input_q.get() print("pull",item,"out of q")#此处替换为有用的工作 input_q.task_done()#发出信号通知任务完成 pri

1-7 生产者消费者模型

一 生产者消费者模型介绍 为什么要使用生产者消费者模型 生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者和消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生

4.利用python生成器实现简单的“生产者消费者”模型

假如说,没有生成器这种对象,那么如何实现这种简单的"生产者消费者"模型呢? import time def producer(): pro_list = [] for i in range(10000): print "包子%s制作ing" %(i) time.sleep(0.5) pro_list.append("包子%s" %i) return pro_list def consumer(pro_list): for index,stuffe