转自:java并发编程实战
5.3阻塞队列和生产者-消费者模式
BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到空间可用;如果队列为空,那么take方法将阻塞直到有元素可用。队列可以是有界的也可以是无界的。
如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存。同样,put方法的阻塞特性也极大地简化了生产者的编码。如果使用有界队列,当队列充满时,生产者将阻塞并不能继续生产工作,而消费者就有时间来赶上工作的进度。阻塞队列同样提供了一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败的状态。这样你就能创建更多灵活的策略来处理负荷过载的情况。
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:他们能一直并防止产生过多的工作项,使应用程序在负荷过载的情况下边的更加健壮。
/** * java并发编程实战 * 5.3.1桌面搜索 * 爬虫查找所有文件并放入队列 * Created by mrf on 2016/3/7. */ public class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) { this.fileQueue = fileQueue; this.fileFilter = fileFilter; this.root = root; } @Override public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries!=null){ for (File entry : entries) { if (entry.isDirectory()){ crawl(entry); }else if (!alreadyIndexed(entry)){ fileQueue.put(entry); } } } } private boolean alreadyIndexed(File entry){ //检查是否加入索引 return false; } } /** * 消费者 * 将爬虫结果队列取出并加入索引 */ class Indexer implements Runnable{ private static final int BOUND = 100; private static final int N_CONSUMERS = 2; private final BlockingQueue<File> queue; Indexer(BlockingQueue<File> queue) { this.queue = queue; } @Override public void run() { try { while (true){ indexFile(queue.take()); } }catch (InterruptedException e){ Thread.currentThread().interrupt(); } } private void indexFile(File take) { //将文件加入索引 } public static void startIndexing(File[] roots){ BlockingQueue<File> queue = new LinkedBlockingDeque<>(BOUND); FileFilter fileFilter = new FileFilter() { @Override public boolean accept(File pathname) { return true; } }; for (File root:roots) { new Thread(new FileCrawler(queue,fileFilter,root)).start(); } for (int i = 0; i < N_CONSUMERS; i++) { new Thread(new Indexer(queue)).start(); } } }
时间: 2024-12-22 22:05:49