Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列

阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法。如果队列已经满了,那么put方法将阻塞直到有空间可以用;如果队列为空,那么take方法将一直阻塞直到有元素可用。队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞。一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式。

意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用该数据的过程解耦开来以简化工作负载的管理。对于I/O密集型和 CPU密集型的生产者和消费者,可以带来许多性能优势

阻塞队列简化了消费者程序的编码,因为take操作会一直阻塞直到有可用的数据。在某些情况下,这种方式是非常合适的(例如:服务器应用程序中,没有客户端请求时便一直等待),在网络爬虫等有无穷工作需要完成时,实现更高的资源利用率。

类库中包含了 BlockingQueue 的多种实现,其中,LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO队列,二者分别与 LinkedList 和 ArrayList 类似,但比同步 List 拥有更好的并发性能。PriorityBlockingQueue 是一个按优先级排序的队列,要求实现 Comparable 或者使用 Comparator。

最后一个实现是 SynchronousQueue ,实际上他不是一个真正的队列,它不会为队列中的元素维护存储空间。采用直接交付的机制,put 和take 会一直阻塞,除非put之后出现了take 或者 take之后出现了put操作。

生产者-消费者模式

下面我们利用生产者消费者模式建立一个类似于 Windows 索引服务。

package org.bupt.xiaoye.chapter5_8;

import java.io.File;
import java.util.concurrent.BlockingQueue;

public class Crawler implements Runnable {
	private final BlockingQueue<File> b;
	private final File root;

	@Override
	public void run() {
		System.out.println("Crawler begins to run!");
		if (root == null|| !root.exists())
			return;
		crawl(root);
		System.out.println("Crawler is shutdown!");

	}

	public void crawl(File root) {
		try {
			if (root.isFile()) {
				System.out.println("Crawling " + root);
				b.put(root);
			} else {
				for (File f : root.listFiles()) {
					crawl(f);
				}
			}
		} catch (InterruptedException e) {
			System.out.println(e);
		}
	}

	public Crawler(BlockingQueue<File> b, File root) {
		this.b = b;
		this.root = root;
	}

}
package org.bupt.xiaoye.chapter5_8;

import java.io.File;
import java.util.concurrent.BlockingQueue;

public class Indexer implements Runnable {
	private final BlockingQueue<File> blockingQueue;
	@Override
	public void run() {
		System.out.println("Indexer begins to run!");

		try{
		while(true){
			indexer(blockingQueue.take());
		}
		}
		catch (InterruptedException e) {
			System.out.println(e);
		}
		System.out.println("Indexer is shutdown!");
	}

	private void indexer(File file ){
		System.out.println("Indexing "+file.getAbsolutePath());
	}

	public Indexer(BlockingQueue<File> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

}
package org.bupt.xiaoye.chapter5_8;

import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class Starter {
	public static final int BOUND = 1;
	public static void main(String[] args) throws InterruptedException{
		BlockingQueue<File> blockingQueue = new SynchronousQueue<File>();
		File file = new File("E:\\BaiduYunDownload");
		Crawler c1 = new Crawler(blockingQueue,file);
		Indexer indexer = new Indexer(blockingQueue);
		Thread t1 = new Thread(c1);
		Thread t2 = new Thread(indexer);
		t1.start();
		t2.start();
		t1.join();
		t2.join();
	}

}

双端队列-工作密取

Java6 增加了两种容器类型,Deque 和 BlockingDeque,他们分别对 Queue 和 BlockingQueue 进行了拓展。Deque 是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括 ArrayDeque 和 LinkedBlockingDeque。双端队列同样适用于另一种相关模式,即工作密取(Working stealing)。在生产者-消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己的双端队列中的全部工作,那么它可以从其它消费者双端队列末尾秘密地获取工作。密取工作模式比传统的生产者-消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减小了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争程度。

工作密取非常适用于既是消费者也是生产者问题—当执行某个工作时可能导致出现更多的工作。例如,在网页爬虫程序在处理一个页面时,通常会发现有更多的页面需要处理。

阻塞和中断

线程可能会阻塞或者暂停执行,原因有很多种:等待I/O操作结束,等待获得一个锁,等待从 Thread.sleep 中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIMED_WAITING)。阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受它控制的时间发生后才能继续执行。

BlockingQueue 的 put 和take 方法会抛出受检查异常(Checked Exception) InterruptedException,这与类库中其他方法的做法相同,例如 Thread.sleep .当某方法抛出 InterruptedException时,表示该方法是一个阻塞方法。当方法抛出InterruptedException 时,有两种基本选择:

        传递 InterruptedException 避开这个异常通常是最明智的策略-只需要将 InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常或者捕获后再次抛出这个异常。(通常如果在自定义的方法中调用了阻塞方法,那么捕获到阻塞方法的中断异常后应该将其重新抛出)

        恢复中断 有时候不能抛出InterruptedException,例如代码是Runnable的一部分(因为run方法被定义为不抛出任何异常)。在这种情况下你,必须捕获InterruptedException,并通过调用当前线程的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。

时间: 2024-12-26 06:10:50

Java 并发编程(四)阻塞队列和生产者-消费者模式的相关文章

Java并发编程:阻塞队列

在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者

12、Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

[转]Java并发编程:阻塞队列

在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动被唤醒

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

Java并发编程:阻塞队列(转载)

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

Java并发编程:阻塞队列 &lt;转&gt;

在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者

(转)Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

【转】Java并发编程:阻塞队列

在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用了非阻塞队列的时候有一个很大的问题就是:它不会对当前线程产生阻塞,那么在面对类似消

并发编程—— 阻塞队列和生产者-消费者模式

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 概述 第1部分 为什么要使用生产者和消费者模式 第2部分 什么是生产者消费者模式 第3部分 代码示例 第1部分 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取