生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:
1. wait()/notify()
2. lock & condition
3. BlockingQueue
下面来逐一分析。
1. wait()/notify()
第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。
1 public class WaitNotifyBroker<T> implements Broker<T> {
2
3 private final Object[] items;
4
5 private int takeIndex;
6 private int putIndex;
7 private int count;
8
9 public WaitNotifyBroker(int capacity) {
10 this.items = new Object[capacity];
11 }
12
13 @SuppressWarnings("unchecked")
14 @Override
15 public T take() {
16 T tmpObj = null;
17 try {
18 synchronized (items) {
19 while (0 == count) {
20 items.wait();
21 }
22 tmpObj = (T) items[takeIndex];
23 if (++takeIndex == items.length) {
24 takeIndex = 0;
25 }
26 count--;
27 items.notify();
28 }
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32
33 return tmpObj;
34 }
35
36 @Override
37 public void put(T obj) {
38 try {
39 synchronized (items) {
40 while (items.length == count) {
41 items.wait();
42 }
43
44 items[putIndex] = obj;
45 if (++putIndex == items.length) {
46 putIndex = 0;
47 }
48 count++;
49 items.notify();
50 }
51 } catch (InterruptedException e) {
52 e.printStackTrace();
53 }
54
55 }
56
57 }
这里利用Array构造一个Buffer去存取数据,并利用count,
putIndex和takeIndex来保证First-In-First-Out。
如果利用LinkedList来代替Array,相对来说会稍微简单些。
LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。
2. lock & condition
lock &
condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。
在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:
1 public class LockConditionBroker<T> implements Broker<T> {
2
3 private final ReentrantLock lock;
4 private final Condition notFull;
5 private final Condition notEmpty;
6 private final int capacity;
7 private LinkedList<T> items;
8
9 public LockConditionBroker(int capacity) {
10 this.lock = new ReentrantLock();
11 this.notFull = lock.newCondition();
12 this.notEmpty = lock.newCondition();
13 this.capacity = capacity;
14
15 items = new LinkedList<T>();
16 }
17
18 @Override
19 public T take() {
20 T tmpObj = null;
21 lock.lock();
22 try {
23 while (items.size() == 0) {
24 notEmpty.await();
25 }
26
27 tmpObj = items.poll();
28 notFull.signalAll();
29
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 } finally {
33 lock.unlock();
34 }
35 return tmpObj;
36 }
37
38 @Override
39 public void put(T obj) {
40 lock.lock();
41 try {
42 while (items.size() == capacity) {
43 notFull.await();
44 }
45
46 items.offer(obj);
47 notEmpty.signalAll();
48
49 } catch (InterruptedException e) {
50 e.printStackTrace();
51 } finally {
52 lock.unlock();
53 }
54
55 }
56 }
3. BlockingQueue
最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。
实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。
1 public class BlockingQueueBroker<T> implements Broker<T> {
2
3 private final BlockingQueue<T> queue;
4
5 public BlockingQueueBroker() {
6 this.queue = new LinkedBlockingQueue<T>();
7 }
8
9 @Override
10 public T take() {
11 try {
12 return queue.take();
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16
17 return null;
18 }
19
20 @Override
21 public void put(T obj) {
22 try {
23 queue.put(obj);
24 } catch (InterruptedException e) {
25 e.printStackTrace();
26 }
27 }
28
29 }
我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。
接下来,就是一个1P2C的例子:
1 public interface Broker<T> {
2
3 T take();
4
5 void put(T obj);
6
7 }
8
9
10 public class Producer implements Runnable {
11
12 private final Broker<Integer> broker;
13 private final String name;
14
15 public Producer(Broker<Integer> broker, String name) {
16 this.broker = broker;
17 this.name = name;
18 }
19
20 @Override
21 public void run() {
22 try {
23 for (int i = 0; i < 5; i++) {
24 broker.put(i);
25 System.out.format("%s produced: %s%n", name, i);
26 Thread.sleep(1000);
27 }
28 broker.put(-1);
29 System.out.println("produced termination signal");
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 return;
33 }
34
35 }
36
37 }
38
39
40 public class Consumer implements Runnable {
41
42 private final Broker<Integer> broker;
43 private final String name;
44
45 public Consumer(Broker<Integer> broker, String name) {
46 this.broker = broker;
47 this.name = name;
48 }
49
50 @Override
51 public void run() {
52 try {
53 for (Integer message = broker.take(); message != -1; message = broker.take()) {
54 System.out.format("%s consumed: %s%n", name, message);
55 Thread.sleep(1000);
56 }
57 System.out.println("received termination signal");
58 } catch (InterruptedException e) {
59 e.printStackTrace();
60 return;
61 }
62
63 }
64
65 }
66
67
68 public class Main {
69
70 public static void main(String[] args) {
71 Broker<Integer> broker = new WaitNotifyBroker<Integer>(5);
72 // Broker<Integer> broker = new LockConditionBroker<Integer>(5);
73 // Broker<Integer> broker = new BlockingQueueBroker<Integer>();
74
75 new Thread(new Producer(broker, "prod 1")).start();
76 new Thread(new Consumer(broker, "cons 1")).start();
77 new Thread(new Consumer(broker, "cons 2")).start();
78
79 }
80
81 }
除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等
本文完。
参考:
《Java 7 Concurrency Cookbook》
Java里的生产者-消费者模型(Producer and Consumer Pattern in Java),布布扣,bubuko.com