多线程:多线程设计模式(四):生产者-消费模式

生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。

一、架构模式图:

类图:

生产者:提交用户请求,提取用户任务,并装入内存缓冲区;

消费者:在内存缓冲区中提取并处理任务;

内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;

任务:生产者向内存缓冲区提交的数据结构

Main:使用生产者和消费者的客户端。

二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

(1)Producer生产者线程:

[java] view plain copy

  1. <span style="font-size:18px;">package ProducerConsumer;
  2. import java.util.Random;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. public class Producer  implements Runnable{
  7. //Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。
  8. //而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。
  9. //这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
  10. private volatile  boolean isRunning= true;
  11. //内存缓冲区
  12. private BlockingQueue<PCData> queue;
  13. //总数,原子操作
  14. private static AtomicInteger count = new AtomicInteger();
  15. private static final int SLEEPTIME=1000;
  16. public Producer(BlockingQueue<PCData> queue) {
  17. this.queue = queue;
  18. }
  19. @Override
  20. public void run() {
  21. PCData data=null;
  22. Random r  = new Random();
  23. System.out.println("start producer id = "+ Thread .currentThread().getId());
  24. try{
  25. while(isRunning){
  26. Thread.sleep(r.nextInt(SLEEPTIME));
  27. //构造任务数据
  28. data= new PCData(count.incrementAndGet());
  29. System.out.println("data is put into queue ");
  30. //提交数据到缓冲区
  31. if(!queue.offer(data,2,TimeUnit.SECONDS)){
  32. System.out.println("faile to  put data:  "+ data);
  33. }
  34. }
  35. }catch (InterruptedException e){
  36. e.printStackTrace();
  37. Thread.currentThread().interrupt();
  38. }
  39. }
  40. public void stop(){
  41. isRunning=false;
  42. }
  43. }
  44. </span>

(2)Consumer消费者线程:

[java] view plain copy

  1. <span style="font-size:18px;">package ProducerConsumer;
  2. import java.text.MessageFormat;
  3. import java.util.Random;
  4. import java.util.concurrent.BlockingQueue;
  5. public class Consumer implements Runnable {
  6. //缓冲区
  7. private BlockingQueue<PCData> queue;
  8. private static final int SLEEPTIME=1000;
  9. public Consumer(BlockingQueue<PCData> queue) {
  10. this.queue = queue;
  11. }
  12. @Override
  13. public void run() {
  14. System.out.println("start Consumer id= "+ Thread .currentThread().getId());
  15. Random r = new Random();
  16. try {
  17. //提取任务
  18. while(true){
  19. PCData data= queue.take();
  20. if(null!= data){
  21. //计算平方
  22. int re= data.getData()*data.getData();
  23. System.out.println(MessageFormat.format("{0}*{1}={2}",
  24. data.getData(),data.getData(),re
  25. ));
  26. Thread.sleep(r.nextInt(SLEEPTIME));
  27. }
  28. }
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. Thread.currentThread().interrupt();
  32. }
  33. }
  34. }
  35. </span>

(3)PCData共享数据模型:

[java] view plain copy

  1. <span style="font-size:18px;">package ProducerConsumer;
  2. public  final class PCData {
  3. private final int intData;
  4. public PCData(int d) {
  5. intData=d;
  6. }
  7. public PCData(String  d) {
  8. intData=Integer.valueOf(d);
  9. }
  10. public int getData(){
  11. return intData;
  12. }
  13. @Override
  14. public String toString(){
  15. return "data:"+ intData ;
  16. }
  17. }
  18. </span>

(4)Main函数:

[java] view plain copy

  1. <span style="font-size:18px;">package ProducerConsumer;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.Executor;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.LinkedBlockingDeque;
  7. public class Main {
  8. /**
  9. * @param args
  10. */
  11. public static void main(String[] args)  throws InterruptedException{
  12. //建立缓冲区
  13. BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);
  14. //建立生产者
  15. Producer producer1 = new Producer(queue);
  16. Producer producer2 = new Producer(queue);
  17. Producer producer3 = new Producer(queue);
  18. //建立消费者
  19. Consumer consumer1 = new Consumer(queue);
  20. Consumer consumer2 = new Consumer(queue);
  21. Consumer consumer3 = new Consumer(queue);
  22. //建立线程池
  23. ExecutorService service = Executors.newCachedThreadPool();
  24. //运行生产者
  25. service.execute(producer1);
  26. service.execute(producer2);
  27. service.execute(producer3);
  28. //运行消费者
  29. service.execute(consumer1);
  30. service.execute(consumer2);
  31. service.execute(consumer3);
  32. Thread.sleep(10*1000);
  33. //停止生产者
  34. producer1.stop();
  35. producer2.stop();
  36. producer3.stop();
  37. Thread.sleep(3000);
  38. service.shutdown();
  39. }
  40. }
  41. </span>

三、注意:

volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。

生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。

由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。

转:http://blog.csdn.net/lmdcszh/article/details/39699261

时间: 2024-10-13 08:10:50

多线程:多线程设计模式(四):生产者-消费模式的相关文章

进程同步概念简介 多线程上篇(四)

进程同步概念 临界资源 一旦有对资源的共享,就必然涉及竞争限制 比如尽管有两个人去水井打水,但是水井却只有一个:合理安排的话刚好错开,但是如果安排不合理,那就会出现冲突,出现冲突怎么办?总有一个先来后到,等下就好了. 这个水井就是一个临界资源 临界资源用来表示一种公共资源或者说是共享数据,可以被多个线程使用. 但是每一次,只能有一个线程使用它,一旦临界资源被占用,其他线程要想使用这个资源,就必须等待. 当多进程访问临界资源时,比如打印机 假设A进程和B进程轮流获得CPU时间片执行,A打印数学,B

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生

Java多线程基础(四)Java传统线程同步通信技术

Java多线程基础(四)Java传统线程同步通信技术 编写代码实现以下功能 子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程又循环100次,如此循环50次. 分析 1)子线程循环10次与主线程循环100次必须是互斥的执行,不能出现交叉,下面代码中通过synchronized关键字实现此要求: 2)子线程与主线程必须交替出现,可以通过线程同步通信技术实现,下面代码中通过bShouldSub变量实现此要求: 其他需要注意的地方 1)其中business变量必须声

iOS多线程编程(四)------ GCD(Grand Central Dispatch)

一.简介 是基于C语言开发的一套多线程开发机制,也是目前苹果官方推荐的多线程开发方法,用起来也最简单,只是它基于C语言开发,并不像NSOperation是面向对象的开发,而是完全面向过程的.如果使用GCD,完全由系统管理线程,我们不需要编写线程代码.只需定义想要执行的任务,然后添加到适当的调度队列(dispatch_queue).GCD会负责创建线程和调度你的任务,系统会直接提供线程管理. 二.任务和队列 GCD中有两个核心概念 (1)任务:执行什么操作 (2)队列:用来存放任务 GCD的使用就

java中多线程通信实例:生产者消费者模式

线程间的通信: 其实就是多个线程再操作同一个资源,但是操作的动作不同   当某个线程进入synchronized块后,共享数据的状态不一定满足该线程的需要,需要其他线程改变共享数据的状态后才能运行,而由于当时线程对共享资源时独占的,它必须解除对共享资源的锁定的状态,通知其他线程可以使用该共享资源. Java中的 wait(),notify(),notifyAll()可以实现线程间的通信. 生产者--消费者问题是典型的线程同步和通信问题 /** * 生产者和消费者问题,生产者生成出产品,消费者去购

联想高级Java研发面经+面试题:Spring+多线程+MySQL+设计模式

上个礼拜,之前的一个同事突然联系我说他去面了联想的JAVA开发工程师,想分享一下面试经历和面试题.我当时就拍板说,好啊! 然后就整理了一下,写了这篇文章:和大家分享一下这次面试经验和面试题. 薪资还可以啊,年薪40W+啊!多少人的梦想啊! 言归正传,和大家分享一下这次联想的面经和面试题: 联想面经: 第一轮:电话初面 第二轮:技术面谈 第三轮:高管复试 第四轮:HR最后确认 No.1:第一轮面试--电话初面 首先确认对联想的意向度(如果异地更会考虑对工作地点(北京)的意向度!联想很看重这个):其

【白话设计模式四】单例模式(Singleton)

转自:https://my.oschina.net/xianggao/blog/616385 0 系列目录 白话设计模式 工厂模式 单例模式 [白话设计模式一]简单工厂模式(Simple Factory) [白话设计模式二]外观模式(Facade) [白话设计模式三]适配器模式(Adapter) [白话设计模式四]单例模式(Singleton) [白话设计模式五]工厂方法模式(Factory Method) [白话设计模式六]抽象工厂模式(Abstract Factory) [白话设计模式七]策

大话设计模式(四)单例模式的优与劣

大话设计模式(四)单例模式的优与劣 前言 首先来明确一个问题,那就是在某些情况下,有些对象,我们只需要一个就可以了,比如,一台计算机上可以连好几个打印机,但是这个计算机上的打印程序只能有一个,这里就可以通过单例模式来避免两个打印作业同时输出到打印机中,即在整个的打印过程中我只有一个打印程序的实例.     简单说来,单例模式(也叫单件模式)的作用就是保证在整个应用程序的生命周期中,任何一个时刻,单例类的实例都只存在一个(当然也可以不存在). 下图是单例模式的结构图. 下面就来看一种情况(这里先假

IOS开发多线程-多线程技术1

一.基本概念 1.什么是进程 进程就是指在系统中正在运行的一个应用程序 每个应用之间是相互独立的 每个进程都运行在其专有的并且受保护的内存空间内. 2.什么是线程 一个进程想要执行程序,就必须需要一个线程, 线程是程序执行的基本单元,应用的所有的任务都在线程中执行的. 当程序启动之后,系统会自动为进程创建一条线程,称之为 “主线程”  “UI线程” 3.线程的串行 一个线程执行任务是串行的,也就是说,在一个线程内,执行任务是从顺序执行的 同一时间,线程只能执行一个任务----> CPU在同一时间