在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景。
Toast 被生产和消费的对象。
ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast 。
Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中。
Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中。
Consumer 消费 Toast,从 finishedToastQ 中获得加工完成的 Toast。
ThreadHelper 工具类,用于输出线程相关信息。
ProducerConsumerDemo 演示这个场景
代码实现:
Toast 实现
public class Toast { private int id; public Toast(int id){ this.id = id; } public String toString(){ return " toast#" + id; } }
ToastQueue 实现
import java.util.concurrent.LinkedBlockingQueue; public class ToastQueue extends LinkedBlockingQueue<Toast> { private static final long serialVersionUID = 1L; }
Producer 循环生产 Toast
import java.util.concurrent.TimeUnit; public class Producer implements Runnable { private ToastQueue toastQueue; private int count; public Producer(ToastQueue toastQueue){ this.toastQueue = toastQueue; this.count = 0; } @Override public void run() { try { while (true){ TimeUnit.MILLISECONDS.sleep(100); Toast toast = new Toast(count); count++; toastQueue.put(toast); ThreadHelper.print(" produced " + toast); } }catch (InterruptedException e) { e.printStackTrace(); } } }
Processor 从 intialToastQ 获得 Toast ,对其加工,并放进 finishedToastQ 中。
import java.util.concurrent.TimeUnit; public class Processor implements Runnable { private ToastQueue initialToastQ; private ToastQueue finishedToastQ; public Processor(ToastQueue initialToastQ, ToastQueue finishedToastQ){ this.initialToastQ = initialToastQ; this.finishedToastQ = finishedToastQ; } @Override public void run() { try { while (true){ Toast toast = initialToastQ.take(); ThreadHelper.print(" processing " + toast); TimeUnit.MILLISECONDS.sleep(180); finishedToastQ.put(toast); } }catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer 消耗 Toast
public class Consumer implements Runnable { private ToastQueue finishedToastQ; public Consumer(ToastQueue finishedToastQ){ this.finishedToastQ = finishedToastQ; } @Override public void run() { try { while (true){ Toast toast = finishedToastQ.take(); ThreadHelper.print(" consumed " + toast); } }catch (InterruptedException e) { e.printStackTrace(); } } }
ThreadHelper 线程帮助类
public class ThreadHelper { public static void print(String msg){ System.out.println("[" + Thread.currentThread().getName() + " ] " + msg); } }
演示烤面包的生产、加工、消费的场景
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ProducerConsumerDemo { public static void main() throws InterruptedException{ ToastQueue initialToastQ = new ToastQueue(); ToastQueue finishedToastQ = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Producer(initialToastQ)); exec.execute(new Processor(initialToastQ, finishedToastQ)); exec.execute(new Consumer(finishedToastQ)); TimeUnit.SECONDS.sleep(2); exec.shutdownNow(); } }
输出结果:
[pool-1-thread-2 ] processing toast#0 [pool-1-thread-1 ] produced toast#0 [pool-1-thread-1 ] produced toast#1 [pool-1-thread-2 ] processing toast#1 [pool-1-thread-3 ] consumed toast#0 [pool-1-thread-1 ] produced toast#2 [pool-1-thread-1 ] produced toast#3 [pool-1-thread-2 ] processing toast#2 [pool-1-thread-3 ] consumed toast#1 [pool-1-thread-1 ] produced toast#4 [pool-1-thread-1 ] produced toast#5 [pool-1-thread-2 ] processing toast#3 [pool-1-thread-3 ] consumed toast#2 [pool-1-thread-1 ] produced toast#6 [pool-1-thread-1 ] produced toast#7 [pool-1-thread-2 ] processing toast#4 [pool-1-thread-3 ] consumed toast#3 [pool-1-thread-1 ] produced toast#8 [pool-1-thread-2 ] processing toast#5 [pool-1-thread-3 ] consumed toast#4 [pool-1-thread-1 ] produced toast#9 [pool-1-thread-1 ] produced toast#10 [pool-1-thread-2 ] processing toast#6 [pool-1-thread-3 ] consumed toast#5 [pool-1-thread-1 ] produced toast#11 [pool-1-thread-1 ] produced toast#12 [pool-1-thread-2 ] processing toast#7 [pool-1-thread-3 ] consumed toast#6 [pool-1-thread-1 ] produced toast#13 [pool-1-thread-1 ] produced toast#14 [pool-1-thread-2 ] processing toast#8 [pool-1-thread-3 ] consumed toast#7 [pool-1-thread-1 ] produced toast#15 [pool-1-thread-1 ] produced toast#16 [pool-1-thread-2 ] processing toast#9 [pool-1-thread-3 ] consumed toast#8 [pool-1-thread-1 ] produced toast#17 [pool-1-thread-2 ] processing toast#10 [pool-1-thread-3 ] consumed toast#9 [pool-1-thread-1 ] produced toast#18 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at concurrencyProducerConsumer.Consumer.run(Consumer.java:15) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at concurrencyProducerConsumer.Producer.run(Producer.java:19) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at concurrencyProducerConsumer.Processor.run(Processor.java:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
参考资料
Page 868, Produer-consumers and queue, Thinking in Java
时间: 2024-10-01 05:18:46