生产者、消费者模式是多线程中的经典问题。通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节。
对于比较常见的单生产者、多消费者的情况,主要有以下两种策略:
- 通过volatile boolean producerDone =false 来标示是否完成。生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出。 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑。
- 比较经典的“毒丸”策略,生产者结束后,把一个特别的对象:“毒丸”对象放入队列。消费者从队列中拿到对象后,判断是否是毒丸对象。如果是普通非毒丸对象,则正常消费。如果是毒丸对象,则放回队列(杀死其他消费者),然后结束自己。这种方式不会对结束状态产生争用,是比较好的方式。
由于“毒丸”策略是在单生产者多消费者情况下的。对于多生产者的情况,需要对之进行一些修改。我的想法是这样的。用Countdownlatch作为生产者计数器。所有生产者结束后,由协调者放入毒丸对象,消费者退出过程是一样的。上代码:
Coordinator: 启动生产者消费者,提供队列、计数器。生产者全部结束后,放入毒丸。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; public class Coordinator { public static final Object POISON_PILL = new Object();//special object to kill consumers private int productCount = 3; private int consumerCount = 5; public void startAll() throws Exception{ BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(20); CountDownLatch noMoreToProduce = new CountDownLatch(productCount); //start consumers; for(int i = 0; i < consumerCount; i++){ new Thread(new Consumer("consumer " + i, queue)).start(); } //start producers; for(int i = 0; i < productCount; i++){ new Thread(new Producer("producer " + i, queue, noMoreToProduce)).start(); } //wait until all producer down noMoreToProduce.await(); System.out.println("All producer finished, putting POISON_PILL to the queue to stop consumers!"); //put poison pill queue.put(POISON_PILL); } public static void main(String[] args) throws Exception{ new Coordinator().startAll(); } }
Producer: 随机生产和结束,结束前使countdownlatch + 1
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; public class Producer implements Runnable { private String name; private CountDownLatch noMoreToProduce; private BlockingQueue<Object> queue; private Random random = new Random(); public Producer(String name, BlockingQueue<Object> queue, CountDownLatch noMoreToProduce){ this.name = name; this.queue = queue; this.noMoreToProduce = noMoreToProduce; } @Override public void run() { System.out.println(name + " started."); try { while (true) { Object item = randomProduce(); if (item == null) { break; //break if no more item } queue.put(item); System.out.println(name + " produced one."); } } catch (InterruptedException e) { //log } finally{ System.out.println(name + " finished."); noMoreToProduce.countDown();//count down to signal "I finished." } } private Object randomProduce() { if (random.nextBoolean()) { return new Object(); } return null; } }
Consumer: 判断毒丸对象。如果是毒丸,放回队列(杀死其他消费者),然后自己退出。
import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private String name; private BlockingQueue<Object> queue; public Consumer(String name, BlockingQueue<Object> queue){ this.name = name; this.queue = queue; } @Override public void run() { try { System.out.println(name + " started."); while (true) { Object item = queue.take(); //poison pill processing if (item == Coordinator.POISON_PILL) { queue.put(item);//put back to kill others System.out.println(name + " finished"); break; } item = null;//pretend to consume the item; System.out.println(name + " consumed one"); } } catch (InterruptedException e) { } } }
执行结果:
consumer 0 started.
consumer 4 started.
consumer 3 started.
consumer 2 started.
consumer 1 started.
producer 0 started.
producer 1 started.
producer 0 finished.
producer 1 produced one.
producer 2 started.
producer 1 produced one.
producer 1 finished.
consumer 3 consumed one
consumer 4 consumed one
consumer 0 consumed one
producer 2 produced one.
producer 2 produced one.
producer 2 produced one.
consumer 1 consumed one
consumer 2 consumed one
producer 2 finished.
All producer finished, putting POISON_PILL to the queue to stop consumers!
consumer 3 finished
consumer 4 finished
consumer 0 finished
consumer 2 finished
consumer 1 finished