生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。
一、架构模式图:
类图:
生产者:提交用户请求,提取用户任务,并装入内存缓冲区;
消费者:在内存缓冲区中提取并处理任务;
内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;
任务:生产者向内存缓冲区提交的数据结构;
Main:使用生产者和消费者的客户端。
二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:
(1)Producer生产者线程:
<span style="font-size:18px;">package ProducerConsumer; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable{ //Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。 //而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。 //这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。 private volatile boolean isRunning= true; //内存缓冲区 private BlockingQueue<PCData> queue; //总数,原子操作 private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME=1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data=null; Random r = new Random(); System.out.println("start producer id = "+ Thread .currentThread().getId()); try{ while(isRunning){ Thread.sleep(r.nextInt(SLEEPTIME)); //构造任务数据 data= new PCData(count.incrementAndGet()); System.out.println("data is put into queue "); //提交数据到缓冲区 if(!queue.offer(data,2,TimeUnit.SECONDS)){ System.out.println("faile to put data: "+ data); } } }catch (InterruptedException e){ e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop(){ isRunning=false; } } </span>
(2)Consumer消费者线程:
<span style="font-size:18px;">package ProducerConsumer; import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { //缓冲区 private BlockingQueue<PCData> queue; private static final int SLEEPTIME=1000; public Consumer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { System.out.println("start Consumer id= "+ Thread .currentThread().getId()); Random r = new Random(); try { //提取任务 while(true){ PCData data= queue.take(); if(null!= data){ //计算平方 int re= data.getData()*data.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re )); Thread.sleep(r.nextInt(SLEEPTIME)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } </span>
(3)PCData共享数据模型:
<span style="font-size:18px;">package ProducerConsumer; public final class PCData { private final int intData; public PCData(int d) { intData=d; } public PCData(String d) { intData=Integer.valueOf(d); } public int getData(){ return intData; } @Override public String toString(){ return "data:"+ intData ; } } </span>
(4)Main函数:
<span style="font-size:18px;">package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class Main { /** * @param args */ public static void main(String[] args) throws InterruptedException{ //建立缓冲区 BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10); //建立生产者 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); //建立消费者 Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); //建立线程池 ExecutorService service = Executors.newCachedThreadPool(); //运行生产者 service.execute(producer1); service.execute(producer2); service.execute(producer3); //运行消费者 service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(10*1000); //停止生产者 producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); service.shutdown(); } } </span>
三、注意:
volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。
由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。
时间: 2024-11-10 16:51:59