Java多线程编程模式实战指南(三):Two-phase Termination模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination。转载请注明作者: 黄文海 出处:http://viscent.iteye.com

停止线程是一个目标简单而实现却不那么简单的任务。首先,Java没有提供直接的API用于停止线程。此外,停止线程时还有一些额外的细节需要考虑,如待停止的线程处于阻塞(等待锁)或者等待状态(等待其它线程)、尚有未处理完的任务等。本文介绍的Two-phase Termination模式提供了一种通用的用于优雅地停止线程的方法。

Two-phase Termination模式简介

Java并没有提供直接的API用于停止线程。Two-phase Termination模式通过将停止线程这个动作分解为准备阶段和执行阶段这两个阶段,以应对停止线程过程中可能存在的问题。

准备阶段。该阶段主要动作是“通知”目标线程(欲停止的线程)准备进行停止。这一步会设置一个标志变量用于指示目标线程可以准备停止了。但是,由于目标线程可能正处于阻塞状态(等待锁的获得)、等待状态(如调用Object.wait)或者I/O(如InputStream.read)等待等状态,即便设置了这个标志,目标线程也无法立即“看到”这个标志而做出相应动作。因此,这一阶段还需要通过调用目标线程的interrupt方法,以期望目标线程能够通过捕获相关的异常侦测到该方法调用,从而中断其阻塞状态、等待状态。对于能够对interrupt方法调用作出响应的方法(参见表1),目标线程代码可以通过捕获这些方法抛出的InterruptedException来侦测线程停止信号。但也有一些方法(如InputStream.read)并不对interrupt调用作出响应,此时需要我们手工处理,如同步的Socket I/O操作中通过关闭socket,使处于I/O等待的socket抛出java.net.SocketException。

表 1. 能够对Thread.interrupt作出响应的一些方法


方法


响应interrupt调用抛出的异常


Object.wait() 、 Object.wait(long timeout) 、Object.wait(long timeout, int nanos)


InterruptedException


Thread.sleep(long millis) 、Thread.sleep(long millis, int nanos)


InterruptedException


Thread.join()、Thread.join(long millis) 、Thread.join(long millis, int nanos)


InterruptedException


java.util.concurrent.BlockingQueue.take()


InterruptedException


java.util.concurrent.locks.Lock.lockInterruptibly()


InterruptedException


java.nio.channels.InterruptibleChannel


java.nio.channels.ClosedByInterruptException

执行阶段。该阶段的主要动作是检查准备阶段所设置的线程停止标志和信号,在此基础上决定线程停止的时机,并进行适当的“清理”操作。

Two-phase Termination模式的架构

Two-phase Termination模式的主要参与者有以下几种。其类图如图1所示。

图 1. Two-phase Termination模式的类图

  • ThreadOwner:目标线程的拥有者。Java语言中,并没有线程的拥有者的概念,但是线程的背后是其要处理的任务或者其所提供的服务,因此我们不能在不清楚某个线程具体是做什么的情况下贸然将其停止。一般地,我们可以将目标线程的创建者视为该线程的拥有者,并假定其“知道”目标线程的工作内容,可以安全地停止目标线程。
  • TerminatableThread:可停止的线程。其主要方法及职责如下:
    • terminate:设置线程停止标志,并发送停止“信号”给目标线程。
    • doTerminate留给子类实现线程停止时所需的一些额外操作,如目标线程代码中包含Socket I/O,子类可以在该方法中关闭Socket以达到快速停止线程,而不会使目标线程等待I/O完成才能侦测到线程停止标记。
    • doRun留给子类实现线程的处理逻辑。相当于Thread.run,只不过该方法中无需关心停止线程的逻辑,因为这个逻辑已经被封装在TerminatableThread的run方法中了。
    • doCleanup留给子类实现线程停止后可能需要的一些清理动作。
  • TerminationToken线程停止标志。toShutdown用于指示目标线程可以停止了。reservations可用于反映目标线程还有多少数量未完成的任务,以支持等目标线程处理完其任务后再行停止。

准备阶段的序列图如图2所示:

图 2. 准备阶段的序列图

1、客户端代码调用线程拥有者的shutdown方法。

2、shutdown方法调用目标线程的terminate方法。

3~4、terminate方法将terminationToken的toShutdown标志设置为true。

5、terminate方法调用由TerminatableThread子类实现的doTerminate方法,使得子类可以为停止目标线程做一些其它必要的操作。

6、若terminationToken的reservations属性值为0,则表示目标线程没有未处理完的任务或者ThreadOwner在停止线程时不关心其是否有未处理的任务。此时,terminate方法会调用目标线程的interrupt方法。

7、terminate方法调用结束。

8、shutdown调用返回,此时目标线程可能还仍然在运行。

执行阶段由目标线程的代码去检查terminationToken的toShutdown属性、reservations属性的值,并捕获由interrupt方法调用抛出的相关异常以决定是否停止线程。在线程停止前由TerminatableThread子类实现的doCleanup方法会被调用。

Two-phase Termination模式实战案例

某系统需要对接告警系统以实现告警功能。告警系统是一个C/S结构的系统,它提供了一套客户端API(AlarmAgent)用于与其对接的系统给其发送告警。该系统将告警功能封装在一个名为AlarmMgr的单件类(Singleton)中,系统中其它代码需要发送告警的只需要调用该类的sendAlarm方法。该方法将告警信息缓存入队列,由专门的告警发送线程负责调用AlarmAgent的相关方法将告警信息发送至告警服务器。

告警发送线程是一个用户线程(User Thread),因此在系统的停止过程中,该线程若未停止则会阻止JVM正常关闭。所以,在系统停止过程中我们必须主动去停止告警发送线程,而非依赖JVM。为了能够尽可能快的以优雅的方式将告警发送线程停止,我们需要处理以下两个问题:

  1. 当告警缓存队列非空时,需要将队列中已有的告警信息发送至告警服务器。
  2. 由于缓存告警信息的队列是一个阻塞队列(LinkedBlockingQueue),在该队列为空的情况下,告警发送线程会一直处于等待状态。这会导致其无法响应我们的关闭线程的请求。

上述问题可以通过使用Two-phase Termination模式来解决。

AlarmMgr相当于图1中的ThreadOwner参与者实例,它是告警发送线程的拥有者。系统停止过程中调用其shutdown方法(AlarmMgr.getInstance().shutdown())即可请求告警发送线程停止。其代码如清单1所示:

清单 1. AlarmMgr源码

public class AlarmMgr {
	private final BlockingQueue<AlarmInfo> alarms = new LinkedBlockingQueue<AlarmInfo>();
	//告警系统客户端API
	private final AlarmAgent alarmAgent = new AlarmAgent();
	//告警发送线程
	private final AbstractTerminatableThread alarmSendingThread;

	private boolean shutdownRequested = false;

	private static final AlarmMgr INSTANCE = new AlarmMgr();

	private AlarmMgr() {
		alarmSendingThread = new AbstractTerminatableThread() {
			@Override
			protected void doRun() throws Exception {
				if (alarmAgent.waitUntilConnected()) {
					AlarmInfo alarm;
					alarm = alarms.take();
					terminationToken.reservations.decrementAndGet();
					try {
						alarmAgent.sendAlarm(alarm);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}

			@Override
			protected void doCleanup(Exception exp) {
				if (null != exp) {
					exp.printStackTrace();
				}
				alarmAgent.disconnect();
			}

		};

		alarmAgent.init();
	}

	public static AlarmMgr getInstance() {
		return INSTANCE;
	}

	public void sendAlarm(AlarmType type, String id, String extraInfo) {
		final TerminationToken terminationToken = alarmSendingThread.terminationToken;
		if (terminationToken.isToShutdown()) {
			// log the alarm
			System.err.println("rejected alarm:" + id + "," + extraInfo);
			return;

		}
		try {
			AlarmInfo alarm = new AlarmInfo(id, type);
			alarm.setExtraInfo(extraInfo);
			terminationToken.reservations.incrementAndGet();
			alarms.add(alarm);
		} catch (Throwable t) {
			t.printStackTrace();
		}
	}

	public void init() {
		alarmSendingThread.start();
	}

	public synchronized void shutdown() {
		if (shutdownRequested) {
			throw new IllegalStateException("shutdown already requested!");
		}

		alarmSendingThread.terminate();
		shutdownRequested = true;
	}

	public int pendingAlarms() {
		return alarmSendingThread.terminationToken.reservations.get();
	}

}

class AlarmAgent {
	// 省略其它代码
	private volatile boolean connectedToServer = false;

	public void sendAlarm(AlarmInfo alarm) throws Exception {
		// 省略其它代码
		System.out.println("Sending " + alarm);
		try {
			Thread.sleep(50);
		} catch (Exception e) {

		}
	}

	public void init() {
		// 省略其它代码
		connectedToServer = true;
	}

	public void disconnect() {
		// 省略其它代码
		System.out.println("disconnected from alarm server.");
	}

	public boolean waitUntilConnected() {
		// 省略其它代码
		return connectedToServer;
	}
}

从上面的代码可以看出,AlarmMgr每接受一个告警信息放入缓存队列便将terminationToken的reservations值增加1,而告警发送线程每发送一个告警到告警服务器则将terminationToken的reservations值减少1。这为我们可以在停止告警发送线程前确保队列中现有的告警信息会被处理完毕提供了线索:AbstractTerminatableThread的run方法会根据terminationToken的reservations是否为0来判断待停止的线程已无未处理的任务,或者无需关心其是否有待处理的任务。

AbstractTerminatableThread的源码见清单2:

清单 2. AbstractTerminatableThread源码

public abstract class AbstractTerminatableThread extends Thread
     implements Terminatable {
	public final TerminationToken terminationToken;

	public AbstractTerminatableThread() {
		super();
		this.terminationToken = new TerminationToken();
	}

      /**
	 *
	 * @param terminationToken 线程间共享的线程终止标志实例
	*/
	public AbstractTerminatableThread(TerminationToken terminationToken) {
		super();
		this.terminationToken = terminationToken;
	}

	protected abstract void doRun() throws Exception;

	protected void doCleanup(Exception cause) {}

	protected void doTerminiate() {}

	@Override
	public void run() {
		Exception ex = null;
		try {
			while (true) {
                                    /*
				 * 在执行线程的处理逻辑前先判断线程停止的标志。
				 */
				if (terminationToken.isToShutdown()
				    && terminationToken.reservations.get() <= 0) {
					break;
				}
				doRun();
			}

		} catch (Exception e) {
			// Allow the thread to terminate in response of a interrupt invocation
			ex = e;
		} finally {
			doCleanup(ex);
		}
	}

	@Override
	public void interrupt() {
		terminate();
	}

	@Override
	public void terminate() {
		terminationToken.setToShutdown(true);
		try {
			doTerminiate();
		} finally {
			// 若无待处理的任务,则试图强制终止线程
			if (terminationToken.reservations.get() <= 0) {
				super.interrupt();
			}
		}
	}

}

AbstractTerminatableThread是一个可复用的TerminatableThread参与者实例。其terminate方法完成了线程停止的准备阶段。该方法首先将terminationToken的toShutdown变量设置为true,指示目标线程可以准备停止了。但是,此时目标线程可能处于一些阻塞(Blocking)方法的调用,如调用Object.sleep、InputStream.read等,无法侦测到该变量。调用目标线程的interrupt方法可以使一些阻塞方法(参见表1)通过抛出异常从而使目标线程停止。但也有些阻塞方法如InputStream.read并不对interrupt方法调用作出响应,此时需要由TerminatableThread的子类实现doTerminiate方法,在该方法中实现一些关闭目标线程所需的额外操作。例如,在Socket同步I/O中通过关闭socket使得使用该socket的线程若处于I/O等待会抛出SocketException。因此,terminate方法下一步调用doTerminate方法。接着,若terminationToken.reservations的值为非正数(表示目标线程无待处理任务、或者我们不关心其是否有待处理任务),则terminate方法会调用目标线程的interrupt方法,强制目标线程的阻塞方法中断,从而强制终止目标线程。

执行阶段在AbstractTerminatableThread的run方法中完成。该方法通过对TerminationToken的toShutdown属性和reservations属性的判断或者通过捕获由interrupt方法调用而抛出的异常来终止线程。并在线程终止前调用由TerminatableThread子类实现的doCleanup方法用于执行一些清理动作。

在执行阶段,由于AbstractTerminatableThread.run方法每次执行线程处理逻辑(通过调用doRun方法实现)前都先判断下toShutdown属性和reservations属性的值,在目标线程处理完其待处理的任务后(此时reservations属性的值为非正数)目标线程run方法也就退出了while循环。因此,线程的处理逻辑代码(doRun方法)将不再被调用,从而使本案例在不使用Two-phase Termination模式的情况下停止目标线程存在的两个问题得以解决(目标线程停止前可以保证其处理完待处理的任务——发送队列中现有的告警信息到服务器)和规避(目标线程发送完队列中现有的告警信息后,doRun方法不再被调用,从而避免了队列为空时BlockingQueue.take调用导致的阻塞)。

从上可知,准备阶段、执行阶段需要通过TerminationToken作为“中介”来协调二者的动作。TerminationToken的源码如清单3所示:

清单 3. TerminationToken源码

public class TerminationToken {
        //使用volatile修饰,以保证无需显示锁的情况下该变量的内存可见性
	protected volatile boolean toShutdown = false;
	public final AtomicInteger reservations = new AtomicInteger(0);

	public boolean isToShutdown() {
		return toShutdown;
	}

	protected void setToShutdown(boolean toShutdown) {
		this.toShutdown = true;
	}

}

Two-phase Termination模式的评价与实现考量

Two-phase Termination模式使得我们可以对各种形式的目标线程进行优雅的停止。如目标线程调用了能够对interrupt方法调用作出响应的阻塞方法、目标线程调用了不能对interrupt方法调用作出响应的阻塞方法、目标线程作为消费者处理其它线程生产的“产品”在其停止前需要处理完现有“产品”等。Two-phase Termination模式实现的线程停止可能出现延迟,即客户端代码调用完ThreadOwner.shutdown后,该线程可能仍在运行。

本文案例展示了一个可复用的Two-phase Termination模式实现代码。读者若要自行实现该模式,可能需要注意以下几个问题。

线程停止标志

本文案例使用了TerminationToken作为目标线程可以准备停止的标志。从清单3的代码我们可以看到,TerminationToken使用了toShutdown这个boolean变量作为主要的停止标志,而非使用Thread.isInterrupted()。这是因为,调用目标线程的interrupt方法无法保证目标线程的isInterrupted()方法返回值为true:目标线程可能调用一些能够捕获InterruptedException而不保留线程中断状态的代码。另外,toShutdown这个变量为了保证内存可见性而又能避免使用显式锁的开销,采用了volatile修饰。这点也很重要,笔者曾经见过一些采用boolean变量作为线程停止标志的代码,只是这些变量没有用volatile修饰,对其访问也没有加锁,这就可能无法停止目标线程。

生产者——消费者问题中的线程停止

在多线程编程中,许多问题和一些多线程编程模式都可以看作生产者——消费者问题。停止处于生产者——消费者问题中的线程,需要考虑更多的问题:需要注意线程的停止顺序,如果消费者线程比生产者线程先停止则会导致生产者生产的新”产品“无法被处理,而如果先停止生产者线程又可能使消费者线程处于空等待(如生产者消费者采用阻塞队列中转”产品“)。并且,停止消费者线程前是否考虑要等待其处理完所有待处理的任务或者将这些任务做个备份也是个问题。本文案例部分地展示生产者——消费者问题中线程停止的处理,其核心就是通过使用TerminationToken的reservations变量:生产者每”生产“一个产品,Two-phase Termination模式的调用方代码要使reservations变量值增加1(terminationToken.reservations.incrementAndGet());消费者线程每处理一个产品,Two-phase Termination模式的调用方代码要使reservations变量值减少1(terminationToken.reservations.decrementAndGet())。当然,在停止消费者线程时如果我们不关心其待处理的任务,Two-phase Termination模式的调用方代码可以忽略对reservations变量的操作。清单4展示了一个完整的停止生产者——消费者问题中的线程的例子:

清单 4. 停止生产者——消费者问题中的线程的例子

public class ProducerConsumerStop {
	class SampleConsumer<P> {
		private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();

		private AbstractTerminatableThread workThread
				= new AbstractTerminatableThread() {
			@Override
			protected void doRun() throws Exception {
				terminationToken.reservations.decrementAndGet();
				P product = queue.take();
				// ...
				System.out.println(product);
			}

		};

		public void placeProduct(P product) {
			if (workThread.terminationToken.isToShutdown()) {
				throw new IllegalStateException("Thread shutdown");
			}
			try {
				queue.put(product);
				workThread.terminationToken.reservations.incrementAndGet();
			} catch (InterruptedException e) {

			}
		}

		public void shutdown() {
			workThread.terminate();
		}

		public void start() {
			workThread.start();
		}
	}

	public void test() {
		final SampleConsumer<String> aConsumer = new SampleConsumer<String>();

		AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {
			private int i = 0;

			@Override
			protected void doRun() throws Exception {
				aConsumer.placeProduct(String.valueOf(i));
			}

			@Override
			protected void doCleanup(Exception cause) {
				// 生产者线程停止完毕后再请求停止消费者线程
				aConsumer.shutdown();
			}

		};

		aProducer.start();
		aConsumer.start();
	}
}

隐藏而非暴露可停止的线程

为了保证可停止的线程不被其它代码误停止,一般我们将可停止线程隐藏在线程拥有者背后,而使系统中其它代码无法直接访问该线程,正如本案例代码(见清单1)所展示:AlarmMgr定义了一个private字段alarmSendingThread用于引用告警发送线程(可停止的线程),系统中的其它代码只能通过调用AlarmMgr的shutdown方法来请求该线程停止,而非通过引用该线程对象自身来停止它。

总结

本文介绍了Two-phase Termination模式的意图及架构。并结合笔者工作经历提供了一个实际的案例用于展示一个可复用的Two-phase Termination模式实现代码,在此基础上对该模式进行了评价并分享在实际运用该模式时需要注意的事项。

参考资源

  • 本文的源代码在线阅读:https://github.com/Viscent/JavaConcurrencyPattern/
  • Brian Göetz et al.,Java Concurrency In Practice
  • Mark Grand,Patterns in Java,Volume 1, 2nd Edition
时间: 2024-09-30 11:43:44

Java多线程编程模式实战指南(三):Two-phase Termination模式--转载的相关文章

Java多线程编程模式实战指南(一):Active Object模式--转载

本文由黄文海首次发布在infoq中文站上:http://www.infoq.com/cn/articles/Java-multithreaded-programming-mode-active-object-part1 .转载请注明作者: 黄文海 出处:http://viscent.iteye.com. Active Object模式简介 Active Object模式是一种异步编程模式.它通过对方法的调用与方法的执行进行解耦来提高并发性.若以任务的概念来说,Active Object模式的核心

Java多线程编程模式实战指南一:Active Object模式(上)

Active Object模式简介 Active Object模式是一种异步编程模式.它通过对方法的调用与方法的执行进行解耦来提高并发性.若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离.这有点类似于System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc()方法调用返回后的某段时间才开始执行任务--回收垃圾.我们知

Java多线程编程模式实战指南一:Active Object模式(下)

Active Object模式的评价与实现考量 Active Object模式通过将方法的调用与执行分离,实现了异步编程.有利于提高并发性,从而提高系统的吞吐率. Active Object模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步方法)和任务的执行策略(Execution Policy)分离.任务的执行策略被封装在Scheduler的实现类之内,因此它对外是不"可见"的,一旦需要变动也不会影响其它代码,降低了系统的耦合性.任务的执行策略可以反映以下一些

Java多线程编程核心技术,第三章

1,notify的同步块完了,才会运行wait的同步块 2,interrupt()不是静态方法,用在wait的线程上会有InteruptException,锁也会被释放 3,notify()唤醒的线程是随机的,以前不知道唤醒哪个,了解到notify是object对象有的,哪个用相关的对象锁了,就会解锁哪个 4,object.wait(5000)期间没有notify,自己也会自动唤醒 原文地址:https://www.cnblogs.com/vhyc/p/9180796.html

Java多线程编程模式实战指南(二):Immutable Object模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-immutable-object.转载请注明作者: 黄文海 出处:http://viscent.iteye.com. 多线程共享变量的情况下,为了保证数据一致性,往往需要对这些变量的访问进行加锁.而锁本身又会带来一些问题和开销.Immutable Object模式使得我们可以在不使用锁的情况下,既保证共享变量访问的线程安

汪大神Java多线程编程实战

课程目录:├─1│  ├─Java并发编程.png│  ├─源码+ppt.rar│  ├─高并发编程第一阶段01讲.课程大纲及主要内容介绍.wmv│  ├─高并发编程第一阶段02讲.简单介绍什么是线程.wmv│  ├─高并发编程第一阶段03讲.创建并启动线程.mp4│  ├─高并发编程第一阶段04讲.线程生命周期以及start方法源码剖析.mp4│  ├─高并发编程第一阶段05讲.采用多线程方式模拟银行排队叫号.mp4│  ├─高并发编程第一阶段06讲.用Runnable接口将线程的逻辑执行单元

Java多线程编程核心技术(三)多线程通信

线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体.线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督.在本章中需要着重掌握的技术点如下: 使用wait/notify实现线程间的通信 生产者/消费者模式的实现 方法join的使用 ThreadLocal类的使用 1.等待 / 通知机制 通过本节可以学习到,线程与线程之间不是独立的个体,它们彼此

Java多线程编程中Future模式的详解&lt;转&gt;

Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker模式.Guarded Suspeionsion模式.不变模式和生产者-消费者模式等.这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:关于其他多线程设计模式的地址如下:关于Master-Worker模式的详解: Java多线程编程中Master-Worker模式的详解关于Guarded Suspeionsion模式的详解: Java多线程编程中Guarded Suspeionsion模式

Java多线程编程详解

线程的同步 由于同一进程的多个线程共享同一片存储空间,在带来方便的同时,也带来了访问冲突这个严重的问题.Java语言提供了专门机制以解决这种冲突,有效避免了同一个数据对象被多个线程同时访问. 由于我们可以通过 private 关键字来保证数据对象只能被方法访问,所以我们只需针对方法提出一套机制,这套机制就是 synchronized 关键字,它包括两种用法:synchronized 方法和 synchronized 块. 1. synchronized 方法:通过在方法声明中加入 synch

JAVA读书推荐----《深入分析Java Web技术内幕》--《java多线程编程核心技术》--《大型网站技术架构 核心原理与案例分析》-《Effective Java中文版》

(1)  首先推荐的不是一本书,而是一个博客,也是我们博客园另外一位博友java_my_life. 目前市面上讲解设计模式的书很多,虽然我前面讲了看书是最好的,但是对设计模式感兴趣的朋友们,我推荐的是这个博客.这位博友的设计模式讲得非常非常好,我认为90%的内容都是没有问题且很值得学习的,其讲解设计模式的大体路线是: 1.随便开篇点明该设计模式的定义 2.图文并茂讲解该设计模式中的结构 3.以详细的代码形式写一下该种设计模式的实现 4.补充内容 5.讲解该设计模式的优缺点 对于一个设计模式我们关