生产者-消费者问题【Java实现】

综合示例,演示有限长度字符序列缓冲区的并发读写, 或者称 生产者 - 消费者问题。错漏之处, 恳请指出 ^_^

/**
 * PCProblem :
 * 模拟生产者-消费者问题, 生产者产生字符并写入字符序列缓冲区, 消费者从缓冲区取走字符
 *
 * @author shuqin1984 2011-08-05
 *
 */

package threadprogramming.basic.simulation;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PCProblem {

	public static void main(String[] args) {

		System.out.println(" ---- Thread main starts up ---- ");

		// 模拟 生产者 - 消费者 任务

		SharedCharBuffer sharedBuffer = new SharedCharBuffer(10);
		ExecutorService es = Executors.newCachedThreadPool();

		for (int i=1; i <= 10; i++) {
			es.execute(new ProducerThread(i, sharedBuffer));
			es.execute(new ConsumerThread(i, sharedBuffer));
		}
		es.shutdown();	

		// 运行 5 秒后终止模拟 

		try {
			TimeUnit.SECONDS.sleep(5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		ProducerThread.cancel();
		ConsumerThread.cancel();
		es.shutdownNow();

		System.out.println("Time to be over.");

	}

}

  生产者: Producer.java

/**
 * ProducerThread: 生产者线程
 */

package threadprogramming.basic.simulation;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class ProducerThread extends Thread {

 	private static String str = "abc1defg2hijk3lmno4pqrs5tuvwx6yz" +
    "AB7CDEF8GHIJK9LMNO0PQR_STU*VWXYZ";

	private static volatile boolean endflag = false;

	private final int id;

	private SharedCharBuffer buffer;

	public ProducerThread(int id, SharedCharBuffer buffer) {
		this.id = id;
		this.buffer = buffer;
	}

	public static void cancel() {
		endflag = true;
	}

	public boolean isCanceled() {
		return endflag == true;
	}

	/**
	 * 生产者任务: 只要任务不取消,且缓冲区不满,就往缓冲区中字符
	 */
	public void run()
	{
		while (!isCanceled()  && !Thread.interrupted()) {
			synchronized (buffer) {
				while (buffer.isFull()) {
					  // 缓冲区已满,生产者必须等待
				    try {
						buffer.wait();
					} catch (InterruptedException e) {
						System.out.println(this + " Interrupted.");
					}
				}
				char ch = produce();
				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " 准备写缓冲区:" + ch);
				buffer.write(ch);
				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " :\t\t\t" + buffer);
				buffer.notifyAll();
			}
			try {
				TimeUnit.MILLISECONDS.sleep(100);
			} catch (InterruptedException e) {
				System.out.println(this + " Interrupted.");
			}
		}
		System.out.println("Exit from: " + this);
	}

	public char produce()
	{
		Random rand = new Random();
		return str.charAt(rand.nextInt(64));
	}

    public String toString()
    {
    	return "P[" + id + "]";
    }

}

   消费者:

/**
 * ConsumerThread:  消费者线程
 *
 */

package threadprogramming.basic.simulation;

import java.util.concurrent.TimeUnit;

public class ConsumerThread implements Runnable {

	private static volatile boolean endflag = false;

	private final int id;

	private SharedCharBuffer buffer;

	public ConsumerThread(int id, SharedCharBuffer buffer) {
		this.id = id;
		this.buffer = buffer;
	}

	public static void cancel() {
		endflag = true;
	}

	public boolean isCanceled() {
		return endflag == true;
	}

	/**
	 * consume:
	 * 当缓冲区buffer中有字符时,就取出字符显示【相当于消费者】。
	 *
	 */
	public char consume() {
		return buffer.fetch();
	}

	/**
	 * 消费者任务: 只要任务不取消,且缓冲区不被置空,就从缓冲区中取出字符消费。
	 */
	public void run() {

		while (!isCanceled() && !Thread.interrupted()) {
			synchronized (buffer) {
				while (buffer.isEmpty()) {
					try {
						buffer.wait();
					} catch (InterruptedException e) {
						System.out.println(this + " Interrupted.");
					}
				}
				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " 取出字符: " + consume());
				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " :\t\t\t" + buffer);
				buffer.notifyAll();
			}
			try {
				TimeUnit.MILLISECONDS.sleep(100);
			} catch (InterruptedException e) {
				System.out.println(this + " Interrupted.");
			}
		}
		System.out.println("Exit from: " + this);

	}

	public String toString() {
		return "C[" + id + "]";
	}

}

  有限字符缓冲区: SharedCharBuffer.java

/**
 * SharedCharBuffer: 有限长度字符缓冲区
 *
 */

package threadprogramming.basic.simulation;

public class SharedCharBuffer {

	private char[] charBuffer;        // 用来生产和消费的有限长度字符缓冲区
	private int  front;               // 将要取字符的位置下标
	private int  rear;                // 将要写字符的位置下标

	public SharedCharBuffer(int capacity) {

		if (charBuffer == null) {
			charBuffer = new char[capacity];
		}
		front = rear = 0;
	}

	/**
	 * 判断缓冲区是否已满,满则生产者等待
	 */
	public synchronized boolean isFull()
	{
		return (rear+1) % charBuffer.length == front;
	}

	/**
	 * 判断缓冲区是否为空,空则消费者等待
	 */
	public synchronized boolean isEmpty()
	{
		return front == rear;
	}

	/**
	 * write: 将给定字符写入缓冲区中【改变了缓冲区内容】
	 * synchronized 关键字用于实现互斥访问缓冲区
	 * @param ch character that will be written into the buffer.
	 *
	 */
	public synchronized void write(char ch) {	

	      charBuffer[rear] = ch;
	      rear = (rear+1) % charBuffer.length;
	}

	/**
	 * read: 读取缓冲区中给定位置的字符【不改变缓冲区内容】
	 * synchronized 关键字用于实现互斥访问缓冲区
	 *
	 */
	public synchronized char read(int index) {

		return charBuffer[index];
	}

	/**
	 * fetch: 取出缓冲区给定位置的字符【改变了缓冲区内容】
	 * synchronized 关键字用于实现互斥访问缓冲区
	 *
	 */
	public synchronized char fetch() {  

		char ch = charBuffer[front];
		front = (front + 1) % charBuffer.length;
		return ch;
	}

	/**
	 * getStringOfBuffer: 缓冲区内容的字符串表示
	 * @return  string representation of the buffer‘s contents
	 *
	 */
    public synchronized String toString() {

		if (isEmpty()) {
			return "缓冲区为空!";
		}

		StringBuilder bufferstr = new StringBuilder("缓冲区内容: ");
		int i = front;
		while ((i+1)% charBuffer.length != rear) {
			bufferstr.append(charBuffer[i]);
			i = (i+1) % charBuffer.length;
		}
		bufferstr.append(charBuffer[i]);
		return bufferstr.toString();

	}

}

  

时间: 2024-12-16 00:25:31

生产者-消费者问题【Java实现】的相关文章

生产者消费者模型java

马士兵老师的生产者消费者模型,我感觉理解了生产者消费者模型,基本懂了一半多线程. public class ProducerConsumer { public static void main(String[] args) { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); new Thread(p).start(); new Thread(c).start

生产者消费者问题Java三种实现

生产者-消费者Java实现 2017-07-27 1 概述 生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品. 解决生产者/消费者问题的方法可分为两类: 采用某种机制保护生产者和消费者之间的同步: 在生产者和消费者之间建立一个管道. 第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式.第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强. 在Java中有四种方法支持同步,其中前三个是同步

生产者消费者模型Java实现

生产者消费者模型 生产者消费者模型可以描述为: ①生产者持续生产,直到仓库放满产品,则停止生产进入等待状态:仓库不满后继续生产: ②消费者持续消费,直到仓库空,则停止消费进入等待状态:仓库不空后,继续消费: ③生产者可以有多个,消费者也可以有多个: 生产者消费者模型 对应到程序中,仓库对应缓冲区,可以使用队列来作为缓冲区,并且这个队列应该是有界的,即最大容量是固定的:进入等待状态,则表示要阻塞当前线程,直到某一条件满足,再进行唤醒. 常见的实现方式主要有以下几种. ①使用wait()和notif

生产者消费者模型-Java代码实现

什么是生产者-消费者模式 比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者,生产者消费者要解决的问题就是如何处理公共资源. 生产者-消费者模式的特点 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据 当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据:当

生产者消费者的java实现

先看最简单的,也就是缓冲区的容量为1 缓冲区容量为1 import java.util.List; public class ProducerAndConsumer2 { static class AddThread implements Runnable { Plate plate; public AddThread(Plate p) { this.plate = p; } @Override public void run() { // TODO Auto-generated method

如何使用阻塞队列来实现生产者-消费者模型?

什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型? java.util.concurrent.BlockingQueue的特性是:当队列是空的时,从队列中获取或删除元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞. 阻塞队列不接受空值,当你尝试向队列中添加空值的时候,它会抛出NullPointerException. 阻塞队列的实现都是线程安全的,所有的查询方法都是原子的并且使用了内部锁或者其他形式的并发控制. BlockingQueue 接口是java colle

Java模拟生产者消费者问题

一.Syncronized方法详解 解决生产者消费这问题前,先来了解一下Java中的syncronized关键字. synchronized关键字用于保护共享数据.请大家注意"共享数据",你一定要分清哪些数据是共享数据,如下面程序中synchronized关键字保护的不是共享数据(其实在这个程序中synchronized关键字没有起到任何作用,此程序的运行结果是不可预先确定的).这个程序中的t1,t2是 两个对象(pp1,pp2)的线程.JAVA是面向对象的程序设计语言,不同的对象的数

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

Java多线程之~~~~使用wait和notify实现生产者消费者模型

在多线程开发中,最经典的一个模型就是生产者消费者模型,他们有一个缓冲区,缓冲区有最大限制,当缓冲区满 的时候,生产者是不能将产品放入到缓冲区里面的,当然,当缓冲区是空的时候,消费者也不能从中拿出来产品,这就 涉及到了在多线程中的条件判断,java为了实现这些功能,提供了wait和notify方法,他们可以在线程不满足要求的时候 让线程让出来资源等待,当有资源的时候再notify他们让他们继续工作,下面我们用实际的代码来展示如何使用wait和 notify来实现生产者消费者这个经典的模型. 首先是

java基础知识回顾之java Thread类学习(八)--java多线程通信等待唤醒机制经典应用(生产者消费者)

 *java多线程--等待唤醒机制:经典的体现"生产者和消费者模型 *对于此模型,应该明确以下几点: *1.生产者仅仅在仓库未满的时候生产,仓库满了则停止生产. *2.消费者仅仅在有产品的时候才能消费,仓空则等待. *3.当消费者发现仓储没有产品可消费的时候,会唤醒等待生产者生产. *4.生产者在生产出可以消费的产品的时候,应该通知等待的消费者去消费. 下面先介绍个简单的生产者消费者例子:本例只适用于两个线程,一个线程生产,一个线程负责消费. 生产一个资源,就得消费一个资源. 代码如下: pub