Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:

1. wait()/notify()

2. lock & condition

3. BlockingQueue

下面来逐一分析。

1. wait()/notify()

第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。


 1 public class WaitNotifyBroker<T> implements Broker<T> {
2
3 private final Object[] items;
4
5 private int takeIndex;
6 private int putIndex;
7 private int count;
8
9 public WaitNotifyBroker(int capacity) {
10 this.items = new Object[capacity];
11 }
12
13 @SuppressWarnings("unchecked")
14 @Override
15 public T take() {
16 T tmpObj = null;
17 try {
18 synchronized (items) {
19 while (0 == count) {
20 items.wait();
21 }
22 tmpObj = (T) items[takeIndex];
23 if (++takeIndex == items.length) {
24 takeIndex = 0;
25 }
26 count--;
27 items.notify();
28 }
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32
33 return tmpObj;
34 }
35
36 @Override
37 public void put(T obj) {
38 try {
39 synchronized (items) {
40 while (items.length == count) {
41 items.wait();
42 }
43
44 items[putIndex] = obj;
45 if (++putIndex == items.length) {
46 putIndex = 0;
47 }
48 count++;
49 items.notify();
50 }
51 } catch (InterruptedException e) {
52 e.printStackTrace();
53 }
54
55 }
56
57 }

这里利用Array构造一个Buffer去存取数据,并利用count,
putIndex和takeIndex来保证First-In-First-Out。

如果利用LinkedList来代替Array,相对来说会稍微简单些。

LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。

2. lock & condition

lock &
condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。

在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:


 1 public class LockConditionBroker<T> implements Broker<T> {
2
3 private final ReentrantLock lock;
4 private final Condition notFull;
5 private final Condition notEmpty;
6 private final int capacity;
7 private LinkedList<T> items;
8
9 public LockConditionBroker(int capacity) {
10 this.lock = new ReentrantLock();
11 this.notFull = lock.newCondition();
12 this.notEmpty = lock.newCondition();
13 this.capacity = capacity;
14
15 items = new LinkedList<T>();
16 }
17
18 @Override
19 public T take() {
20 T tmpObj = null;
21 lock.lock();
22 try {
23 while (items.size() == 0) {
24 notEmpty.await();
25 }
26
27 tmpObj = items.poll();
28 notFull.signalAll();
29
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 } finally {
33 lock.unlock();
34 }
35 return tmpObj;
36 }
37
38 @Override
39 public void put(T obj) {
40 lock.lock();
41 try {
42 while (items.size() == capacity) {
43 notFull.await();
44 }
45
46 items.offer(obj);
47 notEmpty.signalAll();
48
49 } catch (InterruptedException e) {
50 e.printStackTrace();
51 } finally {
52 lock.unlock();
53 }
54
55 }
56 }

3. BlockingQueue

最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。

实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。


 1 public class BlockingQueueBroker<T> implements Broker<T> {
2
3 private final BlockingQueue<T> queue;
4
5 public BlockingQueueBroker() {
6 this.queue = new LinkedBlockingQueue<T>();
7 }
8
9 @Override
10 public T take() {
11 try {
12 return queue.take();
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16
17 return null;
18 }
19
20 @Override
21 public void put(T obj) {
22 try {
23 queue.put(obj);
24 } catch (InterruptedException e) {
25 e.printStackTrace();
26 }
27 }
28
29 }

我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。

接下来,就是一个1P2C的例子:


 1 public interface Broker<T> {
2
3 T take();
4
5 void put(T obj);
6
7 }
8
9
10 public class Producer implements Runnable {
11
12 private final Broker<Integer> broker;
13 private final String name;
14
15 public Producer(Broker<Integer> broker, String name) {
16 this.broker = broker;
17 this.name = name;
18 }
19
20 @Override
21 public void run() {
22 try {
23 for (int i = 0; i < 5; i++) {
24 broker.put(i);
25 System.out.format("%s produced: %s%n", name, i);
26 Thread.sleep(1000);
27 }
28 broker.put(-1);
29 System.out.println("produced termination signal");
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 return;
33 }
34
35 }
36
37 }
38
39
40 public class Consumer implements Runnable {
41
42 private final Broker<Integer> broker;
43 private final String name;
44
45 public Consumer(Broker<Integer> broker, String name) {
46 this.broker = broker;
47 this.name = name;
48 }
49
50 @Override
51 public void run() {
52 try {
53 for (Integer message = broker.take(); message != -1; message = broker.take()) {
54 System.out.format("%s consumed: %s%n", name, message);
55 Thread.sleep(1000);
56 }
57 System.out.println("received termination signal");
58 } catch (InterruptedException e) {
59 e.printStackTrace();
60 return;
61 }
62
63 }
64
65 }
66
67
68 public class Main {
69
70 public static void main(String[] args) {
71 Broker<Integer> broker = new WaitNotifyBroker<Integer>(5);
72 // Broker<Integer> broker = new LockConditionBroker<Integer>(5);
73 // Broker<Integer> broker = new BlockingQueueBroker<Integer>();
74
75 new Thread(new Producer(broker, "prod 1")).start();
76 new Thread(new Consumer(broker, "cons 1")).start();
77 new Thread(new Consumer(broker, "cons 2")).start();
78
79 }
80
81 }

除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等

本文完。

参考:

《Java 7 Concurrency Cookbook》

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java),布布扣,bubuko.com

时间: 2024-10-12 08:37:49

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)的相关文章

java并发之生产者消费者模型

生产者和消费者模型是操作系统中经典的同步问题.该问题最早由Dijkstra提出,用以演示它提出的信号量机制. 经典的生产者和消费者模型的描写叙述是:有一群生产者进程在生产产品.并将这些产品提供给消费者进程去消费.为使生产者进程与消费者进程能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池,生产者进程将它所生产的产品放入一个缓冲区中.消费者进程可从一个缓冲区中取走产品去消费.虽然全部的生产者进程和消费者进程都是以异步方式执行的.但它们之间必须保持同步,即不同意消费者进程到一个空缓冲区去取产品,

java多线程解决生产者消费者问题

import java.util.ArrayList; import java.util.List; /** * Created by ccc on 16-4-27. */ public class Test { public static void main(String[] args) { GunClip clip = new GunClip(); Producer p = new Producer(clip); customer c = new customer(clip); p.star

Java多线程之~~~~使用wait和notify实现生产者消费者模型

在多线程开发中,最经典的一个模型就是生产者消费者模型,他们有一个缓冲区,缓冲区有最大限制,当缓冲区满 的时候,生产者是不能将产品放入到缓冲区里面的,当然,当缓冲区是空的时候,消费者也不能从中拿出来产品,这就 涉及到了在多线程中的条件判断,java为了实现这些功能,提供了wait和notify方法,他们可以在线程不满足要求的时候 让线程让出来资源等待,当有资源的时候再notify他们让他们继续工作,下面我们用实际的代码来展示如何使用wait和 notify来实现生产者消费者这个经典的模型. 首先是

java生产者消费者模型

import java.util.Queue;import java.util.concurrent.LinkedBlockingQueue; public class Consumer extends Thread {    private String product;    private Queue<String> storeHouse = new LinkedBlockingQueue<String>();        public Consumer(){      

生产者消费者模型java

马士兵老师的生产者消费者模型,我感觉理解了生产者消费者模型,基本懂了一半多线程. public class ProducerConsumer { public static void main(String[] args) { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); new Thread(p).start(); new Thread(c).start

Java多线程14:生产者/消费者模型

什么是生产者/消费者模型 一种重要的模型,基于等待/通知机制.生产者/消费者模型描述的是有一块缓冲区作为仓库,生产者可将产品放入仓库,消费者可以从仓库中取出产品,生产者/消费者模型关注的是以下几个点: 1.生产者生产的时候消费者不能消费 2.消费者消费的时候生产者不能生产 3.缓冲区空时消费者不能消费 4.缓冲区满时生产者不能生产 生产者/模型作为一种重要的模型,它的优点在于: 1.解耦.因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这一点很容易想到,这样生产者和消费者的代码发生变化,

Java线程:并发协作-生产者消费者模型

对于多线程程序来说,不管任何编程语言,生产者消费者模型都是最经典的. 实际上,准确的说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确以下几点: 生产者仅仅在仓储未满时候生产,仓满则停止生产. 消费者仅仅在仓储有产品时候才能消费,仓空则等待. 当消费者发现仓储没有产品的时候会通知生产者生产. 生产者在生产出可消费产品时候,应该通知消费者去消费. 此模型将要结合java.lang.Object的wait与notify,notify

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也.-<论语> PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~ Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue.Queue 用来临时保存一组等待处理的元素.BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作. BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型. 这里写图

生产者消费者模型Java实现

生产者消费者模型 生产者消费者模型可以描述为: ①生产者持续生产,直到仓库放满产品,则停止生产进入等待状态:仓库不满后继续生产: ②消费者持续消费,直到仓库空,则停止消费进入等待状态:仓库不空后,继续消费: ③生产者可以有多个,消费者也可以有多个: 生产者消费者模型 对应到程序中,仓库对应缓冲区,可以使用队列来作为缓冲区,并且这个队列应该是有界的,即最大容量是固定的:进入等待状态,则表示要阻塞当前线程,直到某一条件满足,再进行唤醒. 常见的实现方式主要有以下几种. ①使用wait()和notif