生产者消费者模型的正确姿势

简介

生产者、消费者模型是多线程编程的常见问题,最简单的一个生产者、一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中。这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法。

欢迎探讨,如有错误敬请指正

如需转载,请注明出处 http://www.cnblogs.com/nullzx/



定义商品类

package demo;

/*定义商品*/
public class Goods {
	public final String name;
	public final int price;
	public final int id;

	public Goods(String name, int price, int id){
		this.name = name; /*类型*/
		this.price = price; /*价格*/
		this.id = id; /*商品序列号*/
	}

	@Override
	public String toString(){
		return "name: " + name + ",   price:"+ price + ",   id: " + id;
	}
}

基本要求:

1)生产者不能重复生产一个商品,也就是说不能有两个id相同的商品

2)生产者不能覆盖一个商品(当前商品还未被消费,就被下一个新商品覆盖)。也就是说消费商品时,商品的id属性可以不连续,但不能出现缺号的情况

3)消费者不能重复消费一个商品

1. 生产者线程无线生产,消费者线程无限消费的模式

1.1使用线程对象,一个生产者线程,一个消费者线程,一个商品存储位置

package demo;

import java.util.Random;

/*使用线程对象,一个缓存位置,一个生产者,一个消费者,无限生产商品消费商品*/
public class ProducterComsumerDemo1 {

	/*定义一个商品缓存位置*/
	private volatile Goods goods;

	/*定义一个对象作为锁,不使用goods作为锁是因为生产者每次会产生一个新的对象*/
	private Object obj = new Object();

	/*isFull == true 生产者线程休息,消费者线程消费
	 *isFull == false 消费者线程休息,生产者线程生产*/
	private volatile boolean isFull = false;

	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;

	/*随机产生一个sleep时间*/
	private Random rnd = new Random();

	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			try{
				while(true){

					/*获取obj对象的锁, id 和 isFull 的操作都在同步代码块中*/
					synchronized(obj){

						if(!isFull){
							/*wait方法使当前线程阻塞,并释放锁*/
							obj.wait();
						}

						/*随机延时一段时间*/
						Thread.sleep(rnd.nextInt(250));

						/*模拟消费商品*/
						System.out.println(goods);

						/*随机延时一段时间*/
						Thread.sleep(rnd.nextInt(250));

						isFull = false;

						/*唤醒阻塞obj上的生产者线程*/
						obj.notify();

					}

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(250));

				}
			}catch (InterruptedException e){
				/*什么都不做*/
			}
		}
	}

/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){

			try {
				while(true){

					synchronized(obj){

						if(isFull){
							obj.wait();
						}

						Thread.sleep(rnd.nextInt(500));

						/*如果id为偶数,生产价格为2的产品A
						 *如果id为奇数,生产价格为1的产品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						}else{
							goods = new Goods("B", 1, id);
						}

						Thread.sleep(rnd.nextInt(250));

						id++;
						isFull = true;

						/*唤醒阻塞的消费者线程*/
						obj.notify();
					}
				}
			} catch (InterruptedException e) {
				/*什么都不做*/
			}

		}
	}

	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();

		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();

		new Thread(p).start();
		new Thread(c).start();
	}
}

运行结果

name: B,   price:1,   id: 1
name: A,   price:2,   id: 2
name: B,   price:1,   id: 3
name: A,   price:2,   id: 4
name: B,   price:1,   id: 5
name: A,   price:2,   id: 6
name: B,   price:1,   id: 7
name: A,   price:2,   id: 8
name: B,   price:1,   id: 9
name: A,   price:2,   id: 10
name: B,   price:1,   id: 11
name: A,   price:2,   id: 12
name: B,   price:1,   id: 13
……

从结果看出,商品类型交替生产,每个商品的id都不相同,且不会漏过任何一个id,生产者没有重复生产,消费者没有重复消费,结果完全正确。

1.2. 使用线程对象,多个生产者线程,多个消费者线程,1个缓存位置

1.2.1一个经典的bug

对于多生产者,多消费者这个问题,看起来我们似乎不用修改代码,只需在main方法中多添加几个线程就好。假设我们需要三个消费者,一个生产者,那么我们只需要在main方法中再添加两个消费者线程。

	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();

		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();

		new Thread(c).start();

		new Thread(p).start();

		new Thread(c).start();
		new Thread(c).start();

	}

运行结果

name: B,   price:1,   id: 1
name: A,   price:2,   id: 2
name: A,   price:2,   id: 2
name: B,   price:1,   id: 3
name: B,   price:1,   id: 3
name: A,   price:2,   id: 4
name: A,   price:2,   id: 4
name: B,   price:1,   id: 5
name: B,   price:1,   id: 5
name: A,   price:2,   id: 6
……

从结果中,我们发现消费者重复消费了商品,所以这样做显然是错误的。这里我们定义多个消费者,一个生产者,所以遇到了重复消费的问题,如果定义成一个消费者,多个生产者就会遇到id覆盖的问题。如果我们定义多个消费者,多个生产者,那么即会遇到重复消费,也会遇到id覆盖的问题。注意,上面的代码使用的notifyAll唤醒方法,如果使用notify方法唤醒bug仍然可能发生

现在我们来分析一下原因。当生产者生产好了商品,会唤醒因没有商品而阻塞消费者线程,假设唤醒的消费者线程超过两个,这两个线程会竞争获取锁,获取到锁的线程就会从obj.wait()方法中返回,然后消费商品,并把isFull置为false,然后释放锁。当被唤醒的另一个线程竞争获取到锁了以后也会从obj.wait()方法中返回。会再次消费同一个商品。显然,每一个被唤醒的线程应该再次检查isFull这个条件。所以无论是消费者,还是生产者,isFull的判断必须改成while循环,这样才能得到正确的结果而不受生产者的线程数和消费者的线程数的影响。

而对于只有一个生产者线程,一个消费者线程,用if判断是没有问题的,但是仍然强烈建议改成while语句进行判断。

1.2.2正确的姿势

package demo;

import java.util.Random;

/*使用线程对象,一个缓存位置,一个生产者,一个消费者,无限生产商品,消费商品*/

public class ProducterComsumerDemo1 {

	/*定义一个商品缓存位置*/
	private volatile Goods goods;

	/*定义一个对象作为锁,不使用goods作为锁是因为生产者每次会产生一个新的对象*/
	private Object obj = new Object();

	/*isFull == true 生产者线程休息,消费者线程消费
	 *isFull == false 消费者线程消费,生产者线程生产*/
	private volatile boolean isFull = false;

	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;

	/*随机产生一个sleep时间*/
	private Random rnd = new Random();

	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			try{
				while(true){

					/*获取obj对象的锁, id 和 isFull 的操作都在同步代码块中*/
					synchronized(obj){

						while(!isFull){
							/*wait方法使当前线程阻塞,并释放锁*/
							obj.wait();
						}

						/*随机延时一段时间*/
						Thread.sleep(rnd.nextInt(250));

						/*模拟消费商品*/
						System.out.println(goods);

						/*随机延时一段时间*/
						Thread.sleep(rnd.nextInt(250));

						isFull = false;

						/*唤醒阻塞obj上的生产者线程*/
						obj.notifyAll();

					}

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(250));

				}
			}catch (InterruptedException e){
				/*我就是任性,这里什么都不做*/
			}
		}
	}

	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){

			try {
				while(true){

					synchronized(obj){

						while(isFull){
							obj.wait();
						}

						Thread.sleep(rnd.nextInt(500));

						/*如果id为偶数,生产价格为2的产品A
						     如果id为奇数,生产价格为1的产品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						}else{
							goods = new Goods("B", 1, id);
						}

						Thread.sleep(rnd.nextInt(250));

						id++;
						isFull = true;

						/*唤醒阻塞的消费者线程*/
						obj.notifyAll();
					}
				}
			} catch (InterruptedException e) {
				/*我就是任性,这里什么都不做*/
			}

		}
	}

	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();

		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();

		new Thread(c).start();

		new Thread(p).start();

		new Thread(c).start();
		new Thread(c).start();

1.3 使用线程对象,多个缓存位置(有界),多生产者,多消费者

1)当缓存位置满时,我们应该阻塞生产者线程

2)当缓存位置空时,我们应该阻塞消费者线程

下面的代码我没有用java对象内置的锁,而是用了ReentrantLock对象。是因为普通对象的锁只有一个阻塞队列,如果使用notify方式,无法保证唤醒的就是特定类型的线程(消费者线程或生产者线程),而notifyAll方法会唤醒所有的线程,当剩余的缓存商品的数量小于生产者线程数量或已缓存商品的数量小于消费者线程时效率就比较低。所以这里我们通过ReentrantLock对象构造两个阻塞队列提高效率。

1.3.1 普通方式

package demo;

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*使用线程对象,多个缓存位置(有界),多生产者,多消费者,无限循环模式*/

public class ProducterComsumerDemo2 {

	/*最大缓存商品数*/
	private final int MAX_SLOT = 2;

	/*定义缓存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();

	/*定义线程锁和锁对应的阻塞队列*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();

	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;

	/*随机产生一个sleep时间*/
	private Random rnd = new Random();

	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			while(true){

				/*加锁,queue的出列操作都在同步代码块中*/
				lock.lock();
				try {
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(200));

					/*模拟消费商品*/
					Goods goods = queue.remove();
					System.out.println(goods);

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(200));

					/*唤醒阻塞的生产者线程*/
					full.signal();

				} catch (InterruptedException e) {
					/*什么都不做*/
				}finally{
					lock.unlock();
				}

				/*释放锁后随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextInt(200));
				} catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}

	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{

		@Override
		public void run(){

			while(true){
				/*加锁,queue的入列操作,id操作都在同步代码块中*/
				lock.lock();
				try{

					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}

					Thread.sleep(rnd.nextInt(200));

					Goods goods = null;
					/*根据序号产生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
								 break;

						case 1 : goods = new Goods("B", 2, id);
								 break;

						case 2 : goods = new Goods("C", 3, id);
								 break;
					}

					Thread.sleep(rnd.nextInt(200));

					queue.add(goods);
					id++;

					/*唤醒阻塞的消费者线程*/
					empty.signal();

				}catch(InterruptedException e){
					/*什么都不做*/
				}finally{
					lock.unlock();
				}

				/*释放锁后随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextInt(100));
				} catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}

	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2();

		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();

		/*两个生产者线程,两个消费者线程*/
		new Thread(p).start();
		new Thread(p).start();

		new Thread(c).start();
		new Thread(c).start();

	}
}

运行结果

queue is empty
queue is empty
name: B,   price:2,   id: 1
name: C,   price:3,   id: 2
name: A,   price:1,   id: 3
queue is full
name: B,   price:2,   id: 4
name: C,   price:3,   id: 5
queue is full
name: A,   price:1,   id: 6
name: B,   price:2,   id: 7
name: C,   price:3,   id: 8
name: A,   price:1,   id: 9
name: B,   price:2,   id: 10
name: C,   price:3,   id: 11
name: A,   price:1,   id: 12
name: B,   price:2,   id: 13
name: C,   price:3,   id: 14
……

1.3.2 更优雅的实现方式

下面使用线程池(ThreadPool)和阻塞队列(LinkedBlockingQueue)原子类(AtomicInteger)以更加优雅的方式实现上述功能。LinkedBlockingQueue阻塞队列仅在take和put方法上锁,所以id必须定义为原子类。

package demo;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/*使用线程对象,多个缓存位置(有界),多生产者,多消费者,无限循环模式*/

public class ProducterComsumerDemo4 {

	/*最大缓存商品数*/
	private final int MAX_SLOT = 3;

	/*定义缓存商品的容器*/
	private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT);

	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private AtomicInteger id = new AtomicInteger(1);

	/*随机产生一个sleep时间*/
	private Random rnd = new Random();

	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			while(true){

				try {

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(200));

					/*模拟消费商品*/
					Goods goods = queue.take();
					System.out.println(goods);

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(200));

				} catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}

	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){

			while(true){

				try{

					int x = id.getAndIncrement();
					Goods goods = null;

					Thread.sleep(rnd.nextInt(200));

					/*根据序号产生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
								 break;

						case 1 : goods = new Goods("B", 2, x);
								 break;

						case 2 : goods = new Goods("C", 3, x);
								 break;
					}

					Thread.sleep(rnd.nextInt(200));

					queue.put(goods);

					Thread.sleep(rnd.nextInt(100));

				}catch(InterruptedException e){
					/*什么都不做*/
				}
			}
		}
	}

	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();

		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();

		/*定义线程池*/
		ExecutorService es = Executors.newCachedThreadPool();

		/*三个生产者线程,两个消费者线程*/
		es.execute(p);
		es.execute(p);
		es.execute(p);

		es.execute(c);
		es.execute(c);

		es.shutdown();
	}
}

2. 有限商品个数

这个问题显然比上面的问题要复杂不少,原因在于要保证缓存区的商品要全部消费掉,没有重复消费商品,没有覆盖商品,同时还要保证所有线程能够正常结束,防止存在一直阻塞的线程。

2.1 使用线程对象,多个缓存位置(有界),多生产者,多消费者

思路 定义一下三个变量

/*需要生产的总商品数*/
	private final int TOTAL_NUM = 30;

	/*已产生的数量*/
	private volatile int productNum = 0;

	/*已消耗的商品数*/
	private volatile int comsumedNum = 0;

每生产一个商品 productNum 自增1,直到TOTAL_NUM为止,如果不满足条件 productNum < TOTAL_NUM 则结束进程,自增操作必须在full.await()方法调用之前,防止生产者线程无法唤醒。

同理,每消费一个商品 comsumedNum 自增1,直到TOTAL_NUM为止,如果不满足条件 comsumedNum < TOTAL_NUM 则结束进程,自增操作必须在empty.await()方法调用之前,防止消费者线程无法唤醒。

comsumedNum和productNum相当于计划经济时代的粮票一样,有了它能够保证生产者线程在唤醒后一定需要生产一个商品,消费者线程在唤醒以后一定能够消费一个商品

package demo;

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/*使用线程对象,多个缓存位置(有界),多生产者,多消费者, 有限商品个数*/

public class ProducterComsumerDemo3 {

	/*需要生产的总商品数*/
	private final int TOTAL_NUM = 30;

	/*已产生的数量*/
	private volatile int productNum = 0;

	/*已消耗的商品数*/
	private volatile int comsumedNum = 0;

	/*最大缓存商品数*/
	private final int MAX_SLOT = 2;

	/*定义线程公用的锁和条件*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();

	/*定义缓存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();

	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;

	/*随机产生一个sleep时间*/
	private Random rnd = new Random();

	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			while(true){
				/*加锁, id、comsumedNum 操作都在同步代码块中*/
				lock.lock();
				try {
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(250));

					if(comsumedNum < TOTAL_NUM){
						comsumedNum++;
					}else{
						/*这里会自动执行finally的语句,释放锁*/
						break;
					}

					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(250));

					/*模拟消费商品*/
					Goods goods = queue.remove();
					System.out.println(goods);

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(250));

					/*唤醒阻塞的生产者线程*/
					full.signal();

				} catch (InterruptedException e) {

				}finally{
					lock.unlock();
				}

				/*释放锁后,随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextInt(250));
				} catch (InterruptedException e) {

				}
			}

			System.out.println(
					"customer "
					+ Thread.currentThread().getName()
					+ " is over");
		}
	}

	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{

		@Override
		public void run(){

			while(true){

				lock.lock();
				try{
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(250));

					if(productNum < TOTAL_NUM){
						productNum++;
					}else{
						/*这里会自动执行finally的语句,释放锁*/
						break;
					}

					Thread.sleep(rnd.nextInt(250));

					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}

					Thread.sleep(rnd.nextInt(250));

					Goods goods = null;
					/*根据序号产生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
								 break;

						case 1 : goods = new Goods("B", 2, id);
								 break;

						case 2 : goods = new Goods("C", 3, id);
								 break;
					}

					queue.add(goods);
					id++;

					/*唤醒阻塞的消费者线程*/
					empty.signal();

				}catch(InterruptedException e){

				}finally{
					lock.unlock();
				}

				/*释放锁后,随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextInt(250));
				} catch (InterruptedException e) {
					/*什么都不做*/
				}
			}

			System.out.println(
					"producter "
					+ Thread.currentThread().getName()
					+ " is over");
		}
	}

	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3();

		ComsumeThread c = pcd.new ComsumeThread();
		ProductThread p = pcd.new ProductThread();

		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();

		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();

		System.out.println("main Thread is over");
	}
}

2.2利用线程池,原子类,阻塞队列,以更优雅的方式实现

LinkedBlockingQueue阻塞队列仅在take和put方法上锁,所以productNum和comsumedNum必须定义为原子类。

package demo;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/*使用线程对象,多个缓存位置(有界),多生产者,多消费者,无限循环模式*/

public class ProducterComsumerDemo4 {

	/*最大缓存商品数*/
	private final int MAX_SLOT = 3;

	/*定义缓存商品的容器*/
	private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT);

	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private AtomicInteger id = new AtomicInteger(1);

	/*随机产生一个sleep时间*/
	private Random rnd = new Random();

	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		public void run(){

			while(true){

				try {

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(200));

					/*模拟消费商品*/
					Goods goods = queue.take();
					System.out.println(goods);

					/*随机延时一段时间*/
					Thread.sleep(rnd.nextInt(200));

				} catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}

	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		public void run(){

			while(true){

				try{

					int x = id.getAndIncrement();
					Goods goods = null;

					Thread.sleep(rnd.nextInt(200));

					/*根据序号产生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
								 break;

						case 1 : goods = new Goods("B", 2, x);
								 break;

						case 2 : goods = new Goods("C", 3, x);
								 break;
					}

					Thread.sleep(rnd.nextInt(200));

					queue.put(goods);

					Thread.sleep(rnd.nextInt(100));

				}catch(InterruptedException e){
					/*什么都不做*/
				}
			}
		}
	}

	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{

		ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();

		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();

		/*定义线程池*/
		ExecutorService es = Executors.newCachedThreadPool();

		/*三个生产者线程,两个消费者线程*/
		es.execute(p);
		es.execute(p);
		es.execute(p);

		es.execute(c);
		es.execute(c);

		es.shutdown();
	}
}
时间: 2024-10-09 18:36:19

生产者消费者模型的正确姿势的相关文章

OpenMP实现生产者消费者模型

生产者消费者模型已经很古老了吧,最近写了个OpenMP版的此模型之实现,来分享下. 先说一下模型的大致做法是: 1.生产者需要取任务,生产产品. 2.消费者需要取产品,消费产品. 生产者在生产某个产品之后,要告知消费者此产品已经可以使用了.消费者通过获得可以使用这个信号来取得产品,进一步消费产品. 比如,我们有N个图像需要对每一个图像作滤波或者变换等处理,并且把处理后的结果存到硬盘上. 那么生产者可以将N个图像看成N个任务,每个任务都是独立的,每个任务的计算结果可以看成是产品,消费者就是取这个产

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

Java线程:并发协作-生产者消费者模型

对于多线程程序来说,不管任何编程语言,生产者消费者模型都是最经典的. 实际上,准确的说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确以下几点: 生产者仅仅在仓储未满时候生产,仓满则停止生产. 消费者仅仅在仓储有产品时候才能消费,仓空则等待. 当消费者发现仓储没有产品的时候会通知生产者生产. 生产者在生产出可消费产品时候,应该通知消费者去消费. 此模型将要结合java.lang.Object的wait与notify,notify

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

并发通信、生产者消费者模型

多进程之间通信的限制 看一个例子: import multiprocessing as mp data=666 def func(): global data data=222 p=mp.Process(target=func) p.start() p.join() print(data) >>>666 可以看到,声明为global的data也没有发生变化,输出结果仍然是666,这正是多进程之间通信的限制,各个进程之间是相互独立的,互不干扰的内存空间.因此如果想要空想数据就必须开辟一段共

管道实现生产者消费者模型

# 管道实现生产者消费者模型 # # 应该特别注意管道端点的正确管理问题,如果是生产者或消费者中都没有使用管道的端点就应该将它关闭 # 这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端.如果忘记执行这个步骤 # 程序可能在消费者中的recv()操作上挂起.管道是由操作系统进行引用计数的,必须在所有进程中关闭管道 # 后才能生成EoFEroor异常,因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点. # Pipe 存在数据不安全性 # 存在管道的一端被

生产者消费者模型 学习

简介 得知并发是Java程序员进阶的必经之路,所以从实际代码来先理解 生产者消费者模型 实战 Demo File package demo; /** * 定义商品 * * @author draymonder * */ public class Goods { public final String name; public final int price; public final int id; // public Goods() { // } public Goods(String nam

Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型

一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例子 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

守护进程 进程:一个正在运行的程序. 主进程创建守护进程: 1.守护进程会在主进程代码执行结束后就终止, 2.守护进程内无法再开启子进程,否则抛出异常. 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止. 例子:from multiprocessing import Processimport time def task(): print('老了....') time.sleep(2) print('睡了一会..') if __name__ == '__main__': prin