java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4

package org.rui.thread.block2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;

import org.rui.thread.LiftOff;

/**
 * 生产者-消费者与队列
 *
 * @author lenovo
 *
 */

class LiftOffRunner implements Runnable {

	private BlockingQueue<LiftOff> rockets;

	public LiftOffRunner(BlockingQueue<LiftOff> b) {
		rockets = b;
	}

	//添加一个任务到队列
	public void add(LiftOff lo) {
		//将指定元素插入此队列中(如果立即可行且不会违反容量限制),
		try {
			rockets.put(lo);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

	@Override
	public void run() {

		try {
			while (!Thread.interrupted()) {
				// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
				LiftOff rocket = rockets.take();
				rocket.run();
			}

		} catch (InterruptedException e) {
			System.out.println("中断退出");
		}
		System.out.println("x exiting liftOffRunner");

	}
}

public class TestBlockingQueues {

	static void getkey() {
		try {
			// compensate for windows/linux difference in the
			// 回车键产生的结果
			new BufferedReader(new InputStreamReader(System.in)).readLine();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	static void getkey(String message) {
		System.out.println(message);
		getkey();
	}

	static void tets(String msg, BlockingQueue<LiftOff> queue) {
		System.out.println(msg);
		LiftOffRunner runner = new LiftOffRunner(queue);

		//启动一个线程
		Thread t = new Thread(runner);
		t.start();

		for (int i = 0; i < 5; i++) {
			//加入任务到LiftOffRunner队列中
			runner.add(new LiftOff(5));
		}

		//输入控制台
		getkey("press 'enter' (" + msg + ")");
		t.interrupt();
		System.out.println(" 完了 " + msg + "test");

	}

	public static void main(String[] args) {
		tets("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());// unlimited																		// size
		tets("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// fied																		// size
		tets("SynchronousQueue", new SynchronousQueue<LiftOff>());// size of 1

	}

}

package org.rui.thread.block2;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 吐司BlockingQueue
 * @author lenovo
 *
 */

class Toast {
	public enum Status {
		DRY/* 干的 */, BUTTERED/* 涂黄油 */, JAMMED// 果酱
	}

	private Status status = Status.DRY;
	private final int id;

	public Toast(int idn) {
		id = idn;
	}

	public void butter() {
		status = Status.BUTTERED;
	}

	public void jam() {
		status = Status.JAMMED;
	}

	public Status getStatus() {
		return status;
	}

	public int getId() {
		return id;
	}

	public String toString() {
		return "Toast " + id + ":" + status;
	}
}

/**
 * 吐司队列
 *
 * @author lenovo
 *
 */
class ToastQueue extends LinkedBlockingQueue<Toast> {
}

class Toaster implements Runnable {
	private ToastQueue toastQueue;
	private int count = 0;
	private Random rand = new Random(47);

	public Toaster(ToastQueue tq) {
		toastQueue = tq;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
				// 制作 toast
				Toast t = new Toast(count++);
				System.out.println(t);
				// insert into queue
				toastQueue.put(t);

			}
		} catch (InterruptedException e) {
			System.out.println("Toaster interrupted");
		}
		System.out.println("toaster off");
	}
}

// apply butter to toast
class Butterer implements Runnable {
	private ToastQueue dryQueue, butteredQueue;

	public Butterer(ToastQueue dry, ToastQueue buttered) {
		dryQueue = dry;
		butteredQueue = buttered;
	}

	@Override
	public void run() {
		try {

			while (!Thread.interrupted()) {
				// blocks until next piece of toast is available 块,直到下一块面包
				Toast t = dryQueue.take();
				t.butter();
				System.out.println(t);
				butteredQueue.put(t);
			}
		} catch (InterruptedException e) {
			System.out.println("涂黄油 interrupted");
		}
		System.out.println("涂黄油 off");
	}

}

// apply jam to buttered toast
class Jammer implements Runnable {
	private ToastQueue butteredQueue, finishedQueue;

	public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
		this.butteredQueue = butteredQueue;
		this.finishedQueue = finishedQueue;
	}

	@Override
	public void run() {
		try {

			while (!Thread.interrupted()) {
				// blocks until next piece of toast is available 块,直到下一块面包
				Toast t = butteredQueue.take();
				t.jam();
				System.out.println(t);
				finishedQueue.put(t);

			}
		} catch (InterruptedException e) {
			System.out.println("涂果酱 interrupted");
		}
		System.out.println("涂果酱 off");
	}

}

// ////使用烤面包 consume the toast
class Eater implements Runnable {
	private ToastQueue finishedQueue;
	private int counter = 0;

	public Eater(ToastQueue finished) {
		finishedQueue = finished;
	}

	@Override
	public void run() {
		try {

			while (!Thread.interrupted()) {
				Toast t = finishedQueue.take();
				// verify that the toast is coming in order 确认面包来了
				// and that all pieces are getting jammed ,所有碎片越来越挤
				if (t.getId() != counter++
						|| t.getStatus() != Toast.Status.JAMMED) {
					System.out.println("===>>>>error" + t);
					System.exit(1);

				} else {
					System.out.println("吃!" + t);
				}

			}
		} catch (InterruptedException e) {
			System.out.println("食者 interrupted");
		}
		System.out.println(" 食者 off");
	}
}

/**
 * main
 *
 * @author lenovo
 *
 */
public class ToastOMatic {

	public static void main(String[] args) throws InterruptedException {
		ToastQueue dryQueue = new ToastQueue();
		ToastQueue butteredQueue = new ToastQueue();
		ToastQueue finishedQueue = new ToastQueue();

		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new Toaster(dryQueue));//烤面包
		exec.execute(new Butterer(dryQueue, butteredQueue));//涂黄油
		exec.execute(new Jammer(butteredQueue, finishedQueue));//上果酱
		exec.execute(new Eater(finishedQueue));//吃
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();

	}
}
/**output:
 Toast 0:DRY
Toast 0:BUTTERED
Toast 0:JAMMED
吃!Toast 0:JAMMED
Toast 1:DRY
Toast 1:BUTTERED
Toast 1:JAMMED
吃!Toast 1:JAMMED
Toast 2:DRY
Toast 2:BUTTERED
Toast 2:JAMMED
吃!Toast 2:JAMMED
...
...
Toast 10:DRY
Toast 10:BUTTERED
Toast 10:JAMMED
吃!Toast 10:JAMMED
Toast 11:DRY
Toast 11:BUTTERED
Toast 11:JAMMED
吃!Toast 11:JAMMED
Toast 12:DRY
Toast 12:BUTTERED
Toast 12:JAMMED
吃!Toast 12:JAMMED
Toast 13:DRY
Toast 13:BUTTERED
Toast 13:JAMMED
吃!Toast 13:JAMMED
Toast 14:DRY
Toast 14:BUTTERED
Toast 14:JAMMED
吃!Toast 14:JAMMED
食者 interrupted
Toaster interrupted
 食者 off
涂果酱 interrupted
涂果酱 off
涂黄油 interrupted
涂黄油 off
toaster off

 */

package org.rui.thread.block2;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 任务间使用管道进行输入、输出
 *
 * @author lenovo
 *
 */
class Sender implements Runnable {
	private Random rand = new Random(47);
	private PipedWriter out = new PipedWriter();

	public PipedWriter getPipedWriter() {
		return out;
	}

	@Override
	public void run() {
		try {
			while (true) {
				for (char c = 'A'; c <= 'z'; c++) {
					out.write(c);
					TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));

				}
			}
		} catch (IOException e) {
			System.out.println(e + " sender write Exception");
		} catch (InterruptedException e) {
			System.out.println(e + " sender sleep interrupted");
		}

	}

}

class Receiver implements Runnable {

	private PipedReader in;

	public Receiver(Sender sender) throws IOException {
		in = new PipedReader(sender.getPipedWriter());
	}

	@Override
	public void run() {
		try {
			while (true) {
				// blocks until characters are there
				System.out.println("Read:" + (char) in.read() + ",");

			}
		} catch (IOException e) {
			System.out.println(e+"receiver read execption");
		}

	}

}

public class PipedIO {
	// 接收器 Receiver
	public static void main(String[] args) throws IOException, InterruptedException {
		Sender sender = new Sender();
		Receiver receiver = new Receiver(sender);

		ExecutorService exec=Executors.newCachedThreadPool();
		exec.execute(sender);
		exec.execute(receiver);

		TimeUnit.SECONDS.sleep(4);
		exec.shutdownNow();

	}
}

/**outpt:
Read:A,
Read:B,
Read:C,
Read:D,
Read:E,
Read:F,
Read:G,
Read:H,
Read:I,
Read:J,
Read:K,
Read:L,
Read:M,
Read:N,
Read:O,
Read:P,
java.lang.InterruptedException: sleep interrupted sender sleep interrupted
Read:Q,
java.io.IOException: Write end deadreceiver read execption

 */

时间: 2024-12-15 10:53:10

java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4的相关文章

TIJ -- 任务间使用管道进行输入/输出

1. 通过输入/输出在线程间进行通信通常很有用.提供线程功能的类库以"管道"的形式对线程间的输入/输出提供了支持.它们在Java输入/输出类库中的对应物就是PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道中读取).这个模型可以看成是"生产者 - 消费者"问题的变体,这里的管道就是一个封装好的解决方案.管道基本上是一个阻塞队列,存在于多个引入BlockingQueue之前的Java版本中. 2. 下面是一个简单例子,两个

JAVA线程-生产者消费者模式

package cn.tt; public class ProducerConsumer { public static void main(String[] args) throws InterruptedException { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); consumer c = new consumer(ss); new Thread(p).start(); Thread.sleep(2000

Java实现生产者消费者问题与读者写者问题

摘要: Java实现生产者消费者问题与读者写者问题 1.生产者消费者问题 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品.解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步:(2)在生产者和消费者之间建立一个管道.第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式.第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强. 同步问题核心在

Java模拟生产者消费者问题

一.Syncronized方法详解 解决生产者消费这问题前,先来了解一下Java中的syncronized关键字. synchronized关键字用于保护共享数据.请大家注意"共享数据",你一定要分清哪些数据是共享数据,如下面程序中synchronized关键字保护的不是共享数据(其实在这个程序中synchronized关键字没有起到任何作用,此程序的运行结果是不可预先确定的).这个程序中的t1,t2是 两个对象(pp1,pp2)的线程.JAVA是面向对象的程序设计语言,不同的对象的数

java:生产者消费者问题

记得第一次做java的题的时候,看到"写出生产者消费者问题",还以为是和工厂模式有关系.现在想想也是被雷倒了. java的生产者消费者问题其实是讲多线程并发操作同一资源缓冲区,当资源缓冲区满的时候,线程继续添加数据,则应该使其等待,有空间时再发消息通知:当资源缓冲区没有资源,线程继续取数据时,应该使其等待,有资源是再发消息通知: 先看一下运行截图: 以下是代码: Main.java:(主类) package com.vrinux.setandget; public class Main

LabVIEW之生产者/消费者模式--队列操作 彭会锋

LabVIEW之生产者/消费者模式--队列操作 彭会锋 本文章主要是对学习LabVIEW之生产者/消费者模式的学习笔记,其中涉及到同步控制技术-队列.事件.状态机.生产者-消费者模式,这几种技术在在本章中都会有侧重点的进行介绍和总结! 队列同步技术-操作函数 同步控制技术可以实现在多个VI之间或者同一VI 不同县城之间同步任务和交换数据:在LabVIEW中提供了‘同步’函数选板,包括通知器.队列.信号量.集合点.事件.首次调用函数,本文主要关注同步控制技术之队列技术: 队列操作函数: 1 “获取

java实现生产者消费者问题(转)

引言 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况: 生产者消费者图 存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品.互相等待,从而发生死锁. JAVA解决线程模型的三种方式 1.wait()和notify() import java.util.LinkedList; public class P

java实现生产者消费者问题

引言 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况: 生产者消费者图 存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品.互相等待,从而发生死锁. JAVA解决线程模型的三种方式   1.wait()和notify() import java.util.LinkedList; public class

[转载] Java实现生产者消费者问题

转载自http://www.cnblogs.com/happyPawpaw/archive/2013/01/18/2865957.html 引言 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况: 生产者消费者图 存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品.互相等待,从而发生死锁. JAVA解决线