并发模式(三)——生产者-消费模式

生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。

一、架构模式图:

类图:

生产者:提交用户请求,提取用户任务,并装入内存缓冲区;

消费者:在内存缓冲区中提取并处理任务;

内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;

任务:生产者向内存缓冲区提交的数据结构;

Main:使用生产者和消费者的客户端。

二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

(1)Producer生产者线程:

<span style="font-size:18px;">package ProducerConsumer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer  implements Runnable{

	//Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。
	//而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。
	//这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
	private volatile  boolean isRunning= true;

	//内存缓冲区
	private BlockingQueue<PCData> queue;

	//总数,原子操作
	private static AtomicInteger count = new AtomicInteger();

	private static final int SLEEPTIME=1000;

	public Producer(BlockingQueue<PCData> queue) {

		this.queue = queue;
	}

	@Override
	public void run() {
		PCData data=null;
		Random r  = new Random();
		System.out.println("start producer id = "+ Thread .currentThread().getId());
		try{
			while(isRunning){
				Thread.sleep(r.nextInt(SLEEPTIME));
				//构造任务数据
				data= new PCData(count.incrementAndGet());
				System.out.println("data is put into queue ");
				//提交数据到缓冲区
				if(!queue.offer(data,2,TimeUnit.SECONDS)){
					System.out.println("faile to  put data:  "+ data);
				}
			}
		}catch (InterruptedException e){
			e.printStackTrace();
			Thread.currentThread().interrupt();

		}

	}

	public void stop(){

		isRunning=false;
	}

}
</span>

(2)Consumer消费者线程:

<span style="font-size:18px;">package ProducerConsumer;

import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
	//缓冲区
	private BlockingQueue<PCData> queue;
	private static final int SLEEPTIME=1000;

	public Consumer(BlockingQueue<PCData> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		System.out.println("start Consumer id= "+ Thread .currentThread().getId());
		Random r = new Random();

			try {
				//提取任务
				while(true){
					PCData data= queue.take();
					if(null!= data){
						//计算平方
						int re= data.getData()*data.getData();
						System.out.println(MessageFormat.format("{0}*{1}={2}",
									data.getData(),data.getData(),re
								));
						Thread.sleep(r.nextInt(SLEEPTIME));

					}
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
				Thread.currentThread().interrupt();
			}

	}

}
</span>

(3)PCData共享数据模型:

<span style="font-size:18px;">package ProducerConsumer;

public  final class PCData {

	private final int intData;

	public PCData(int d) {
		intData=d;
	}

	public PCData(String  d) {
		intData=Integer.valueOf(d);
	}

	public int getData(){

		return intData;

	}
	@Override
	public String toString(){
		return "data:"+ intData ;
	}

}
</span>

(4)Main函数:

<span style="font-size:18px;">package ProducerConsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

public class Main {

	/**
	 * @param args
	 */
	public static void main(String[] args)  throws InterruptedException{
		//建立缓冲区
		BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);
		//建立生产者
		Producer producer1 = new Producer(queue);
		Producer producer2 = new Producer(queue);
		Producer producer3 = new Producer(queue);

		//建立消费者
		Consumer consumer1 = new Consumer(queue);
		Consumer consumer2 = new Consumer(queue);
		Consumer consumer3 = new Consumer(queue);		

		//建立线程池
		ExecutorService service = Executors.newCachedThreadPool();

		//运行生产者
		service.execute(producer1);
		service.execute(producer2);
		service.execute(producer3);
		//运行消费者
		service.execute(consumer1);
		service.execute(consumer2);
		service.execute(consumer3);

		Thread.sleep(10*1000);

		//停止生产者
		producer1.stop();
		producer2.stop();
		producer3.stop();

		Thread.sleep(3000);
		service.shutdown();
	}

}
</span>

三、注意:

volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。

生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。

由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。

时间: 2024-11-10 16:51:59

并发模式(三)——生产者-消费模式的相关文章

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生

多线程:多线程设计模式(四):生产者-消费模式

生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程.生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务.两者之间通过共享内存缓冲去进行通信. 一.架构模式图: 类图: 生产者:提交用户请求,提取用户任务,并装入内存缓冲区: 消费者:在内存缓冲区中提取并处理任务: 内存缓冲区:缓存生产者提交的任务或数据,供消费者使用: 任务:生产者向内存缓冲区提交的数据结构: Main:使用生产者和消费者的客户端. 二.代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

两种高效的并发模式(半同步/半异步和领导者/追随者)

一.并发编程与并发模式 并发编程主要是为了让程序同时执行多个任务,并发编程对计算精密型没有优势,反而由于任务的切换使得效率变低.如果程序是IO精密型的,则由于IO操作远没有CPU的计算速度快,所以让程序阻塞于IO操作将浪费大量的CPU时间.如果程序有多个线程,则当前被IO操作阻塞的线程可主动放弃CPU,将执行权转给其它线程.(*IO精密型和cpu精密型可以参考此文:CPU-bound(计算密集型) 和I/O bound(I/O密集型)) 并发编程主要有多线程和多进程,这里我们先讨论并发模式,并发

go---&gt;共享内存和通信两种并发模式原理探究

共享内存和通信两种并发模式原理探究 并发理解 人类发明计算机编程的本质目的是为了什么呢?毫无疑问是为了解决人类社会中的各种负责业务场景问题.ok,有了这个出发点,那么想象一下,比如你既可以一心一意只做一件事,你也可以同时做多件事,比如,你计划今天上午计划就是看足球比赛,ok,你今天的工作就是串行的,单进程的,你只需要完成一件事.但是不巧呢,你妈妈说让你帮她切肉,你妈妈上午要出门有点事,同时不巧呢,你老婆说她上午也要出门,让你帮着打扫家里卫生,这时你今天就要同时做三件事,看比赛,切肉,打扫卫生.这

漫谈并发编程(三):共享受限资源

解决共享资源竞争 一个不正确的访问资源示例 考虑下面的例子,其中一个任务产生偶数,而其他任务消费这些数字.这里,消费者任务的唯一工作就是检查偶数的有效性. 我们先定义一个偶数生成器的抽象父类. public abstract class IntGenerator { private volatile boolean canceled = false; public abstract int next( ); public void cancle( ) { canceled = true; } p

C++设计模式之建造者模式(三)

4.引入钩子方法的建造者模式 建造者模式除了逐步构建一个复杂产品对象外,还可以通过Director类来更加精细地控制产品的创建过程,例如增加一类称之为钩子方法(HookMethod)的特殊方法来控制是否对某个buildPartX()的调用,也就是判断产品中某个部件是否需要被建造.钩子方法的返回类型通常为boolean类型,方法名一般为isXXX(),钩子方法定义在抽象建造者类中.在抽象建造者类中提供钩子方法的默认实现,具体建造者类如果不需要建造某个部件,则该建造者类覆盖抽象建造者类的钩子方法.

Java经典23种设计模式之行为型模式(三)

本文接着介绍11种行为型模式里的备忘录模式.观察者模式.状态模式. 一.备忘录模式 在不破坏封装性的前提下,捕获一个对象的内部状态,并在该对象之外保存这个状态.这样以后就可以将该对象恢复到原先保存的状态.还是比较好理解的. 1.Memento 备忘录存储原发器对象的内部状态,这个类就是要存储的对象的状态.状态需要多少个变量,在Memento里就写多少个变量. public class Memento { private String state; public Meme*to(String st

Java经典23种设计模式之结构型模式(三)------附代理模式、适配器模式、外观模式区别

本文介绍7种结构型模式里的剩下两种:享元模式.代理模式. 一.享元模式FlyWeight 享元模式比较简单且重要,在很多场合都被用到,只不过封装起来了用户看不到.其概念:运用共享内存技术最大限度的支持大量细粒度的对象.这个概念给的有些抽象,说白了就是如果内存中存在某个对象A,如果再次需要使用对象A的时候如果内存中有A这个对象就直接使用它,不要再次new了.如果没有,则重新new一个.基于这个特点,享元模式使用时一般会给待访问对象传递一个Tag,用来标识这个对象,而且要同时使用抽象工厂的方法进行访

windows下多进程加协程并发模式

好久没更新博客了.正好最近要整理一下最近这段时间做过的项目以及学习python的一些心得.如标题所示,今天就来说说windows下多进程加协程并发模式.其实网上还是蛮多在linux下的多进程加协程并发模式,本身linux对python的支持更好吧.但是由于本人的开发环境是windows的,而且网上关于这方面的资料还是少了一点,不过经过一番折腾,也算是弄出来了.废话不多说,先贴代码吧: # coding=utf-8 # windows下多进程加协程并发模式 # 打入gevent的monkey补丁