并发编程—— 阻塞队列和生产者-消费者模式

Java并发编程实践 目录

并发编程—— ConcurrentHashMap

并发编程—— 阻塞队列和生产者-消费者模式

概述

第1部分 为什么要使用生产者和消费者模式

第2部分 什么是生产者消费者模式

第3部分 代码示例

第1部分 为什么要使用生产者和消费者模式

  在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

第2部分 什么是生产者消费者模式

  生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

  虽然生产者-消费者模式能够将生产者和消费者的代码彼此解耦开来,但它们的行为仍然会通过共享工作队列间接地耦合在一起。开发人员总是会假设消费者处理工作的速率能赶上生产者生成工作项的速率,因此通常不会为工作队列的大小设置边界,但这将导致在之后需要重新设计系统架构。因此,应该尽早地通过阻塞队列在设计中构建资源管理机制——这件事做得越早,就越容易。

  在类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性嫩。PriorityBlockingQueue是一个按优先级排序的队列,当不希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。

第3部分 代码示例

  有一种类型的程序适合被分解为生产者和消费者,例如代理程序,它将扫描本地驱动上的文件并建立索引以便随后进行搜索,类似于某些桌面搜索程序或者Windows索引服务。在下面程序中,FileCrawler 给出一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。而且,在Indexer中给出一个消费者任务,即从队列中取出文件的名称并对它们建立索引。

 1 package com.concurrency.BasicBuildingBlocks_5;
 2
 3 import java.io.File;
 4 import java.util.concurrent.BlockingQueue;
 5
 6
 7 /**
 8  * 5.8 Producer and consumer tasks in a desktop search application
 9  *
10  * @ClassName: ProducerConsumer
11  * @author Xingle
12  * @date 2014-9-3 下午3:43:49
13  */
14 public class ProducerConsumer {
15
16     //生产者任务,在某个文件层次结构中搜索符合索引标准的文件,并将它们放入工作队列
17     static class FileCrawler implements Runnable {
18         private final BlockingQueue<File> fileQueue;
19         private final File root;
20
21         public FileCrawler(BlockingQueue<File> fileQueue,File root) {
22             this.fileQueue = fileQueue;
23
24             this.root = root;
25
26         }
27
28         @Override
29         public void run() {
30             crawl(root);
31         }
32
33         /**
34          * 搜索没有被索引过的文件放入阻塞队列
35          * @author xingle
36          * @data 2014-9-3 下午5:13:52
37          */
38         private void crawl(File root) {
39             File[] entries = root.listFiles();
40             if (entries != null) {
41                 for (File entrie : entries) {
42                     if (entrie.isDirectory()) {
43                         crawl(entrie);
44                     } else {
45                         if (!alreadyIndexed(entrie)){
46                             System.out.println("放入生产者队列文件:"+entrie.getName()+"来自线程:"+Thread.currentThread().getName());
47                             fileQueue.add(entrie);
48                         }
49
50                     }
51                 }
52             }
53         }
54
55         /**
56          * 是否已经被索引
57          * @param entrie
58          * @return
59          * @author xingle
60          * @data 2014-9-3 下午5:26:04
61          */
62         private boolean alreadyIndexed(File entrie) {
63             return false;
64         }
65
66     }
67
68     //消费者
69     static class Indexer implements Runnable {
70         private final BlockingQueue<File> queue;
71
72         public Indexer(BlockingQueue<File> queue){
73             this.queue = queue;
74         }
75         @Override
76         public void run() {
77             while(true){
78                 try {
79                     indexFile(queue.take());
80                 } catch (InterruptedException e) {
81                     Thread.currentThread().interrupt();
82                 }
83             }
84         }
85
86         private void indexFile(File file) {
87             System.out.println("消费者取出文件:"+file.getName()+"来自线程:"+Thread.currentThread().getName());
88         };
89     }
90
91 }

测试程序:

 1 package com.concurrency.BasicBuildingBlocks_5;
 2
 3 import java.io.File;
 4 import java.util.concurrent.BlockingQueue;
 5 import java.util.concurrent.LinkedBlockingQueue;
 6
 7 import com.concurrency.BasicBuildingBlocks_5.ProducerConsumer.FileCrawler;
 8 import com.concurrency.BasicBuildingBlocks_5.ProducerConsumer.Indexer;
 9
10 /**
11  * 测试生成者和消费者
12  * @ClassName: testMain
13  * TODO
14  * @author Xingle
15  * @date 2014-9-3 下午6:03:56
16  */
17 public class testMain {
18
19     public static void main(String[] args) {
20
21         File file = new File("D:\\test1/");
22         File[] roots = file.listFiles();
23         final int BOUND = 10;
24         final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
25         BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
26
27         //消费
28         for (int i = 0; i < N_CONSUMERS; i++) {
29             new Thread(new Indexer(queue)).start();
30         }
31         //生产
32         for (File root : roots) {
33             new Thread(new FileCrawler(queue, root)).start();
34         }
35     }
36
37 }

执行结果:



参考:

1.《Java 并发编程实战》5.3节

2.聊聊并发——生产者消费者模式

时间: 2024-12-21 02:10:06

并发编程—— 阻塞队列和生产者-消费者模式的相关文章

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue,           (基于数组的并发阻塞队列) 6.LinkedBlockingQueue,        (基

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

阻塞队列实现生产者消费者模式

阻塞队列的特点:当队列元素已满的时候,阻塞插入操作: 当队列元素为空的时候,阻塞获取操作: 生产者线程:Producer 1 package test7; 2 3 import java.util.concurrent.BlockingQueue; 4 5 public class Producer implements Runnable{ 6 7 private final BlockingQueue queue; 8 public Producer(BlockingQueue queue){

使用阻塞队列实现生产者-消费者模型

生产者-消费者模问题 /** * 使用阻塞队列实现生产者-消费者模型 * 阻塞队列只允许元素以FIFO的方式来访问 * @author Bingyue * */ public class ProducerCustomerPattern { public static void main(String[] args) { //生产者和消费者共享的存储区域 BlockingQueue<Integer> blockQueue=new LinkedBlockingQueue(); /** * 此处外部

基于阻塞队列的生产者消费者C#并发设计

这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code: 调用code: static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueu

【译】使用阻塞队列解决生产者-消费者问题

如果你想避免使用错综复杂的wait–notify的语句,BlockingQueue非常有用.BlockingQueue可用于解决生产者-消费者问题,如下代码示例.对于每个开发人员来说,生产者消费者问题已经非常熟悉了,这里我将不做详细描述. 为什么BlockingQueue适合解决生产者消费者问题 任何有效的生产者-消费者问题解决方案都是通过控制生产者put()方法(生产资源)和消费者take()方法(消费资源)的调用来实现的,一旦你实现了对方法的阻塞控制,那么你将解决该问题. Java通过Blo

JAVA并发编程6_线程协作/生产者-消费者

前面通过同步锁来同步任务的行为,两个任务在交替访问共享资源的时候,可以通过使用同步锁使得任何时候只有一个任务可以访问该资源,见博客:线程同步之synchronized关键字.下面主要讲的是如何使任务彼此间可以协作,使得多个任务可以一起工作去解决木某个问题,因为有些问题中,某些部分必须在其他部分被解决之前解决,就像在餐厅服务员要端菜就必须有厨师做好了菜.在任务协作时,可以让任务自身挂起,直至某些外部条件发生变化,表示是时候让这个任务向前推动了为止. wait/notify wait方法会在等待外部