java wait notifyAll 生产者 消费者 BlockingDeque

--用wait notifyAll来实现生产者与消费者模式,如下

package com.collonn.procon2;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

public class PCTest {
	// the max number of product in product pool
	public static final int QUEUE_MAX_SIZE = 3;
	// product pool
	public static final LinkedList<Integer> QUEUE = new LinkedList<Integer>();
	// product name
	public static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();

	// the speed of producer
	public static final int PRODUCE_SPEED = 1000 * 1;
	// the speed of consumer
	public static final int CONSUME_SPEED = 100 * 1;

	// the number of producer
	public static final int PRODUCE_COUNT = 1;
	// the number of consumer
	public static final int CONSUME_COUNT = 5;

	// create producer
	private void produce() {
		for (int i = 0; i < PRODUCE_COUNT; i++) {
			new Thread(new Producer()).start();
		}
	}

	// create consumer
	private void consume() {
		for (int i = 0; i < CONSUME_COUNT; i++) {
			Thread thread = new Thread(new Consumer());
			thread.setName("td" + i);
			thread.start();
		}
	}

	public static void main(String[] args) throws Exception {
		PCTest pct1 = new PCTest();
		System.out.println("start producer...");
		pct1.produce();
		Thread.sleep(1000 * 5);
		System.out.println("start consumer...");
		pct1.consume();
	}

}

// producer
class Producer implements Runnable {

	@Override
	public void run() {
		while (true) {
			try {
				synchronized (PCTest.QUEUE) {
					if(PCTest.QUEUE.size() == PCTest.QUEUE_MAX_SIZE){
						System.out.println("product pool full...");
						PCTest.QUEUE.wait();
					}else{
						int next = PCTest.ATOMIC_INTEGER.getAndAdd(1);
						PCTest.QUEUE.addLast(next);
						System.out.println("produce," + next);

						PCTest.QUEUE.notifyAll();
					}
				}

				Thread.sleep(PCTest.PRODUCE_SPEED);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

// consumer
class Consumer implements Runnable {

	@Override
	public void run() {
		try {
			while (true) {
				synchronized (PCTest.QUEUE) {
					if(PCTest.QUEUE.size() == 0){
						System.out.println("product pool empty...");
						PCTest.QUEUE.wait();
					}else{
						Integer data = PCTest.QUEUE.removeFirst();
						System.out.println("consume," + Thread.currentThread().getName() + "," + data);

						PCTest.QUEUE.notifyAll();
					}
				}

				Thread.sleep(PCTest.CONSUME_SPEED);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

--用BlockingDeque来实现生产者与消费者模式,如下

package com.collonn.procon;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

public class ProConMain {
	// product pool
	public static final BlockingDeque<Integer> BLOCK_QUEUE = new LinkedBlockingDeque<Integer>(5);
	// product name
	public static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();

	// the speed of producer
	public static final int PRODUCE_SPEED = 1000 * 3;
	// the speed of consumer
	public static final int CONSUME_SPEED = 1000 * 1;

	// the number of producer
	public static final int PRODUCE_COUNT = 1;
	// the number of consumer
	public static final int CONSUME_COUNT = 20;

	// create producer
	private void produce() {
		for (int i = 0; i < PRODUCE_COUNT; i++) {
			new Thread(new Producer()).start();
		}
	}

	// create consumer
	private void consume() {
		for (int i = 0; i < CONSUME_COUNT; i++) {
			Thread thread = new Thread(new Consumer());
			thread.setName("td" + i);
			thread.start();
		}
	}

	public static void main(String[] args) {
		ProConMain t1 = new ProConMain();
		t1.produce();
		t1.consume();
	}

}

// producer
class Producer implements Runnable {

	@Override
	public void run() {
		while (true) {
			try {
				int next = ProConMain.ATOMIC_INTEGER.getAndAdd(1);
				ProConMain.BLOCK_QUEUE.putLast(next);
				System.out.println("produce," + next);

				Thread.sleep(ProConMain.PRODUCE_SPEED);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

// consumer
class Consumer implements Runnable {

	@Override
	public void run() {
		try {
			while (true) {
				Integer data = ProConMain.BLOCK_QUEUE.takeFirst();
				System.out.println("consume," + Thread.currentThread().getName() + "," + data);

				Thread.sleep(ProConMain.CONSUME_SPEED);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

一:用Executor来实现生产者与消费者模式,如下

package com.collonn.executor;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ProConExecutor {
	// product name
	public static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();

	// the speed of producer
	public static final int PRODUCE_SPEED = 1000 * 1;
	// the speed of consumer
	public static final int CONSUME_SPEED = 1000 * 5;

	// the speed of consumer
	public static final int BLOCK_QUEUE_SIZE = 5;

	// create thread pool with deal strategy
	// corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, rejectedExecutionHandler
	public static final Executor EXECUTOR = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_SIZE),
			new RejectedExecutionHandler() {
				@Override
				public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
					System.out.println("[overload], ignore data:" + ((Taskk)r).getData() + ", poolSize:" + e.getPoolSize() + ", queueSize:" + e.getQueue().size());
				}
			});

	public static void main(String[] args) {
		// produce product, then, throw to thead pool, then deal the data
		new Thread() {
			@Override
			public void run() {
				try {
					while (true) {
						int next = ProConExecutor.ATOMIC_INTEGER.getAndAdd(1);
						ProConExecutor.EXECUTOR.execute(new Taskk(next));
						System.out.println("produce," + next);

						Thread.sleep(PRODUCE_SPEED);
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}.start();
	}

}

// process data
class Taskk implements Runnable {
	private Integer data;

	public Taskk(Integer data) {
		this.data = data;
	}

	public Integer getData() {
		return data;
	}

	public void setData(Integer data) {
		this.data = data;
	}

	@Override
	public void run() {
		try {
			System.out.println("consume," + this.data);

			Thread.sleep(ProConExecutor.CONSUME_SPEED);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

总结:我们可以看到,代码在一步步的精简,且更优雅。

java wait notifyAll 生产者 消费者 BlockingDeque

时间: 2024-12-29 10:24:30

java wait notifyAll 生产者 消费者 BlockingDeque的相关文章

JAVA多线程之生产者消费者

生产者消费者并发编程: 假设仓库有10个仓位,分别有10个生产者和10个消费者,生产者不断生产产品,放入仓库的仓位中,而消费者则不断从仓库中获取产品, 如果仓库已满,则生产者要等待,等消费者消费后,空出仓位后,再继续放入产品. 反之如果仓库已空,则消费者要等待,等待生产者生产出产品后,再继续消费产品. 关于生产者.消费者有四种实现方式 1,wait,nofity方式 2,ReentrantLock锁的await()和signal() 3,阻塞队列的方式 4,Semaphore 信号量方式 下面分

java多线程模拟生产者消费者问题,公司面试常常问的题。。。

package com.cn.test3; //java多线程模拟生产者消费者问题 //ProducerConsumer是主类,Producer生产者,Consumer消费者,Product产品 //Storage仓库 //批注:我把输出结果写在程序以下了,你能够看一下,事实上非常easy的,你想象一下产品从生产,到取出的一个生产线,我们定义两个线程,生产者线程,和消费者线程,一个是生产者不停的生产产品并放入数量有限的指定槽内,而消费者从指定槽依次取出产品,现实中的流水车间也相似于此. publ

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题.有如下几个常见的实现方法: 1. wait()/notify() 2. lock & condition 3. BlockingQueue 下面来逐一分析. 1. wait()/notify() 第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行:这也是最原始的实现. 1 public class WaitNotifyBroker<T> implements Broker&

java模拟实现生产者---消费者问题

本文章为小编原创,请尊重文章的原创性,转载请注意写明转载来源:http://blog.csdn.net/u012116457 已知技术参数: 生产者消费者问题,描述一组生产者向一组消费者提供产品/消息.它们共享一个有界缓冲区,生产者向其中放产品/消息,消费者从中取产品/消息.只要缓冲区未满,生产者可放产品/消息,只要缓冲区有数据,消费者可取消息.即应满足下列二个同步条件: 1.只有在缓冲池中至少有一个缓冲区已存入消息后,消费者才能从中提取消息,否则消费者必须等待. 2.只有缓冲池中至少有一个缓冲

java多线程解决生产者消费者问题

import java.util.ArrayList; import java.util.List; /** * Created by ccc on 16-4-27. */ public class Test { public static void main(String[] args) { GunClip clip = new GunClip(); Producer p = new Producer(clip); customer c = new customer(clip); p.star

Java多线程:生产者消费者更佳的解决方法(确定不会出现死锁)

今天看了一片博文,讲Java多线程之线程的协作,其中作者用程序实例说明了生产者和消费者问题,但我及其他读者发现程序多跑几次还是会出现死锁,百度搜了下大都数的例子也都存在bug,经过仔细研究发现其中的问题,并解决了,感觉有意义贴出来分享下. 下面首先贴出的是有bug的代码,一个4个类,Plate.java: package CreatorAndConsumer; import java.util.ArrayList; import java.util.List; /** * 盘子,表示共享的资源

java设计模式之生产者/消费者模式

什么是生产者/消费者模式? 某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式有如下几个优点: 1.解耦   由于有缓冲区的存在,生产者和消费者之间不直接依赖,耦合度降低. 2.支持并发   由于生产者与消费

Java多线程之生产者消费者问题&amp;lt;一&amp;gt;:使用synchronized keyword解决生产者消费者问题

今天看了一片博文,讲Java多线程之线程的协作,当中作者用程序实例说明了生产者和消费者问题,但我及其它读者发现程序多跑几次还是会出现死锁,百度搜了下大都数的样例也都存在bug,经过细致研究发现当中的问题.并攻克了,感觉有意义贴出来分享下. 以下首先贴出的是有bug的代码,一个4个类.Plate.java: package CreatorAndConsumer; import java.util.ArrayList; import java.util.List; /** * 盘子,表示共享的资源

[Java] 多线程下生产者消费者问题的五种同步方法实现

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题. 生产者消费者模式的优点 - 解耦 - 支持并发 - 支持忙闲不均 解决方法可分为两类: (1)用信号量和锁机制实现生产者和消费者之间的同步: - wait() / notify()方法 - await() / signal()方法 - BlockingQueue阻塞队列方法 - Semaphore方法 (2)在生产者和消费者之间建立一个管道.(一般不使用,缓冲区不易控制.数据不易封装和传输) - PipedInputStream