多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者、消费者模式是多线程中的经典问题。通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节。

对于比较常见的单生产者、多消费者的情况,主要有以下两种策略:

  1. 通过volatile boolean producerDone =false 来标示是否完成。生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出。 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑。
  2. 比较经典的“毒丸”策略,生产者结束后,把一个特别的对象:“毒丸”对象放入队列。消费者从队列中拿到对象后,判断是否是毒丸对象。如果是普通非毒丸对象,则正常消费。如果是毒丸对象,则放回队列(杀死其他消费者),然后结束自己。这种方式不会对结束状态产生争用,是比较好的方式。

由于“毒丸”策略是在单生产者多消费者情况下的。对于多生产者的情况,需要对之进行一些修改。我的想法是这样的。用Countdownlatch作为生产者计数器。所有生产者结束后,由协调者放入毒丸对象,消费者退出过程是一样的。上代码:

Coordinator: 启动生产者消费者,提供队列、计数器。生产者全部结束后,放入毒丸。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class Coordinator {
	public static final Object POISON_PILL = new Object();//special object to kill consumers
	private int productCount = 3;
	private int consumerCount = 5;

	public void startAll() throws Exception{
		BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(20);
		CountDownLatch noMoreToProduce = new CountDownLatch(productCount);
		//start consumers;
		for(int i = 0; i < consumerCount; i++){
			new Thread(new Consumer("consumer " + i, queue)).start();
		}
		//start producers;
		for(int i = 0; i < productCount; i++){
			new Thread(new Producer("producer " + i, queue, noMoreToProduce)).start();
		}
		//wait until all producer down
		noMoreToProduce.await();
		System.out.println("All producer finished, putting POISON_PILL to the queue to stop consumers!");
		//put poison pill
		queue.put(POISON_PILL);
	}

	public static void main(String[] args) throws Exception{
		new Coordinator().startAll();
	}
}

Producer: 随机生产和结束,结束前使countdownlatch + 1

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

public class Producer implements Runnable {
	private String name;
	private CountDownLatch noMoreToProduce;
	private BlockingQueue<Object> queue;
	private Random random = new Random();

	public Producer(String name, BlockingQueue<Object> queue, CountDownLatch noMoreToProduce){
		this.name = name;
		this.queue = queue;
		this.noMoreToProduce = noMoreToProduce;
	}

	@Override
	public void run() {
		System.out.println(name + " started.");
		try {
			while (true) {
				Object item = randomProduce();
				if (item == null) {
					break; //break if no more item
				}
				queue.put(item);
				System.out.println(name + " produced one.");
			}
		} catch (InterruptedException e) {
			//log
		} finally{
			System.out.println(name + " finished.");
			noMoreToProduce.countDown();//count down to signal "I finished."
		}
	}

	private Object randomProduce() {
		if (random.nextBoolean()) {
			return new Object();
		}
		return null;
	}
}

Consumer: 判断毒丸对象。如果是毒丸,放回队列(杀死其他消费者),然后自己退出。

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
	private String name;
	private BlockingQueue<Object> queue;

	public Consumer(String name, BlockingQueue<Object> queue){
		this.name = name;
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			System.out.println(name + " started.");
			while (true) {
				Object item = queue.take();
				//poison pill processing
				if (item == Coordinator.POISON_PILL) {
					queue.put(item);//put back to kill others
					System.out.println(name + " finished");
					break;
				}
				item = null;//pretend to consume the item;
				System.out.println(name + " consumed one");
			}
		} catch (InterruptedException e) {

		} 
	}
}

执行结果:

consumer 0 started.

consumer 4 started.

consumer 3 started.

consumer 2 started.

consumer 1 started.

producer 0 started.

producer 1 started.

producer 0 finished.

producer 1 produced one.

producer 2 started.

producer 1 produced one.

producer 1 finished.

consumer 3 consumed one

consumer 4 consumed one

consumer 0 consumed one

producer 2 produced one.

producer 2 produced one.

producer 2 produced one.

consumer 1 consumed one

consumer 2 consumed one

producer 2 finished.

All producer finished, putting POISON_PILL to the queue to stop consumers!

consumer 3 finished

consumer 4 finished

consumer 0 finished

consumer 2 finished

consumer 1 finished

时间: 2024-10-26 08:45:08

多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。的相关文章

(转)不停止Nginx服务的情况下平滑变更Nginx配置

在不停止Nginx服务的情况下平滑变更Nginx配置 1.修改/usr/local/webserver/nginx/conf/nginx.conf配置文件后,请执行以下命令检查配置文件是否正确: /usr/local/webserver/nginx/sbin/nginx -t 如果屏幕显示以下两行信息,说明配置文件正确: the configuration file /usr/local/webserver/nginx/conf/nginx.conf syntax is ok the confi

生产者消费者模式中条件判断是使用while而不是if

永远在循环(loop)里调用 wait 和 notify,不是在 If 语句现在你知道wait应该永远在被synchronized的背景下和那个被多线程共享的对象上调用,下一个一定要记住的问题就是,你应该永远在while循环,而不是if语句中调用wait.因为线程是在某些条件下等待的--在我们的例子里,即"如果缓冲区队列是满的话,那么生产者线程应该等待",你可能直觉就会写一个if语句.但if语句存在一些微妙的小问题,导致即使条件没被满足,你的线程你也有可能被错误地唤醒.所以如果你不在线

virtualBox中有线和无线两种情况下centos虚拟机和本地机互ping的方案

之前写微信点餐系统的时候,刚开始是无线连接,然后每次进去虚拟机ip和本地ip都会改变,所以每次都需要配置一下nginx,还有本地的路径.之后换有线连接,就研究了一下桥接模式有线情况下虚拟机静态ip设置,最近到学校来了,今天用手机热点配置服务的时候调试了很久,下面给出有线和无线情况下虚拟机和主机互ping的策略.(本人才疏学浅,原理不识,有问题留言我能帮会帮的) 参考博客:https://www.cnblogs.com/chenhaoqiang/p/9491902.html#4191178 1.W

java中几种Map在什么情况下使用,并简单介绍原因及原理

一.Map用于保存具有映射关系的数据,Map里保存着两组数据:key和value,它们都可以使任何引用类型的数据,但key不能重复.所以通过指定的key就可以取出对应的value.Map接口定义了如下常用的方法:1.void clear():删除Map中所以键值对.2.boolean containsKey(Object key):查询Map中是否包含指定key,如果包含则返回true.3.boolean containsValue(Object value):查询Map中是否包含指定value

在Node.js中在保持目录结构的情况下压缩指定目录

最近在做一个文件升级的功能,需要从下载服务器中指定目录下的文件.在学习了zlib后发现这个模块达不到这个功能 在查找资料后发现后发现 archiver 模块很好用,不过我也发现大部分中文资料没有如果查询压缩进度,所以在此分享一下: archiver的github地址: https://github.com/archiverjs/node-archiver API文档地址: https://archiverjs.com/docs/ 压缩等级说明: var archive = archiver('z

python中不引入第三方变量的情况下交换a\b的值

a = 1b = 2a=a+b #a是3b=a-b # b是1a=a-b # a是2print('a:',a)print('b:',b)

不停止kfaka服务的情况下修改kafka某个topic的数据保存时间

./kafka-topics.sh --zookeeper zk1:2181 -topic topicname --alter --config retention.ms=86400 原文地址:http://blog.51cto.com/haoyonghui/2119142

多线程之生产者消费者模式

最近在项目中需要使用使用多线程实现一种功能,和生产者消费者模式类似,因此,学习了下生产者消费者模式的多线程实现.在生产者消费者模式中,通常有两类线程, 即若干个生产者线程和若干个消费者线程.生产者线程负责提交用户请求,消费者线程则负责处理生产者提交的任务.生产者和消费者之间则通过共享内存缓冲区进行通信. 在这里我们选择BlockingQueue做为共享内存缓冲区. 首先,我们构建生产者生产的,和消费者需要处理的数据PCData,即相关任务数据. public class PCData { pri

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生