多线程编程-设计模式之保护性暂挂(Guarded Suspesion)模式

Guarded Suspension模式的架构

核心是一个受保护方法(Guarded Method).该方法需要执行其所要真正执行的操作时需要满足特定的条件(Predicate,以下称之为保护条件)。当该条件不满足时,执行受保护方法的线程会被挂起进入等待状态,直到该条件满足时该线程才会继续运行。此时,受保护方法才会正在执行其所要执行的操作(目标操作)。
GuardedObject:包含受保护方法的对象。主要方法和职责如下:
-guardedMethod:受保护方法。
-stateChanged:改变GuardedObject实例状态的方法,该方法负责在保护条件成立时唤醒受保护方法的执行线程。
GuardedAction:抽象了目标动作,并关联了目标动作所需的保护条件。主要方法如下
-call:用于表示目标动作的方法。
ConcreteGuardedAction:应用程序所实现的具体目标动作及其关联的保护条件。
Predicate:抽象了保护条件
-evaluate:用于表示保护条件的方法。
ConcretePredicate:应用程序所实现的具体保护条件。
Blocker:负责对执行guardedMethod方法的线程进行挂起和唤醒,并执行ConcreteGuardedAction所实现的目标操作。主要方法如下:
callWithGuarded:负责执行目标操作和暂挂当前线程。
singleAfter:负责执行其参数指定的动作和唤醒改方法所属Blocker实例所暂挂的线程中的一个线程。
signal:负责唤醒由该方法所属Blocker实例所暂挂的线程中的一个线程。
broadcastAfter:负责执行其参数指定的动作和唤醒由该方法所属Blocker实例所暂挂的所有线程。
broadcast:负责唤醒由该方法所属Blocker实例暂挂的所有线程。
ConditionVarBlocker:基于java条件变量Condition实现的Blocker。


调用过程:
1.client调用受保护方法guardedMethod。
2.guardedMethod方法创建GuardedAction实例guardedAction.
3.guardedMethod方法以guardedAction为参数调用Blocker实例的callWithGuarded方法。
4-5:callWithGuarded方法调用guardedAction的getGuarded方法获取保护条件Predicate。
6-8这几个步骤是个循环。该循环判断守护条件是否成立。若保护条件成立,则该循环退出。否则,该循环会将当前线程暂挂使其处于等待状态。当其他线程唤醒该被暂挂的线程后,该循环任然继续监测保护条件并重复上述逻辑。
9-10:callWithGuarded方法调用guardedAction的call方法来执行目标操作,并记录call方法的返回值actionReturnValue。
11.callWithGuarded方法将actionReturnValue作为其返回值返回给调用方。
12.guaredMethod方法返回。

案例分析:系统告警功能模块
主要功能是将其接收到的告警信息发送给告警服务器。类AlarmAgent负责与告警服务器进行对接。sendAlarm方法负责通过网络连接将告警信息发送到告警服务器。AlarmAgent创建一个专门的线程用于其与告警服务器建立网络连接。因此,sendAlarm方法被调用的时候,连接线程可能还没有完成网络连接的建立。此时,snedAlarm方法应该等待连接线程建立好网络连接。另外,即便连接线程建立好了网络连接,中途也可能由于某些原因出现与告警服务器断连的情况。此时,sendAlarm方法需要等待心跳任务重新建立好连接才能上报告警信息。也就是说,snedAlarm方法必须在AlarmAgent与告警服务器的网络连接建立成功的情况下才能执行所需要执行的操作。若AlarmAgent与告警服务器的连接未建立(或者连接中断),sendAlarm方法的执行线程应该暂挂直到连接建立完毕(或者恢复)。

上述问题可以采用Guarded Suspension模式来解决。
import lombok.extern.slf4j.Slf4j;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
/**
* 负责连接告警服务器,并发送告警信息至告警服务器
*
*/
@Slf4j
public class AlarmAgent {
// 用于记录AlarmAgent是否连接上告警服务器
private volatile boolean connectedToServer = false;
public static void main(String[] args) {
AlarmAgent alarmAgent = new AlarmAgent();
alarmAgent.init();
}
// 模式角色:GuardedSuspension.Predicate
private final Predicate agentConnected = new Predicate() {
@Override
public boolean evaluate() {
return connectedToServer;
}
};
// 模式角色:GuardedSuspension.Blocker
private final Blocker blocker = new ConditionVarBlocker();
// 心跳定时器
private final Timer heartbeatTimer = new Timer(true);
// 省略其它代码
/**
* 发送告警信息
* @param alarm 告警信息
* @throws Exception
*/
public void sendAlarm(final AlarmInfo alarm) throws Exception {
// 可能需要等待,直到AlarmAgent连接上告警服务器(或者连接中断后重新连连上服务器)
// 模式角色:GuardedSuspension.GuardedAction
GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) {
public Void call() throws Exception {
doSendAlarm(alarm);
return null;
}
};
blocker.callWithGuard(guardedAction);
}
// 通过网络连接将告警信息发送给告警服务器
private void doSendAlarm(AlarmInfo alarm) {
// 省略其它代码
log.info("sending alarm " + alarm);
// 模拟发送告警至服务器的耗时
try {
Thread.sleep(50);
} catch (Exception e) {
}
}
public void init() {
// 省略其它代码
// 告警连接线程
Thread connectingThread = new Thread(new ConnectingTask());
connectingThread.start();
heartbeatTimer.schedule(new HeartbeatTask(), 60000, 2000);
}
public void disconnect() {
// 省略其它代码
log.info("disconnected from alarm server.");
connectedToServer = false;
}
protected void onConnected() {
try {
blocker.signalAfter(new Callable<Boolean>() {
@Override
public Boolean call() {
connectedToServer = true;
log.info("connected to server");
return Boolean.TRUE;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
protected void onDisconnected() {
connectedToServer = false;
}
// 负责与告警服务器建立网络连接
private class ConnectingTask implements Runnable {
@Override
public void run() {
// 省略其它代码
// 模拟连接操作耗时
try {
Thread.sleep(100);
} catch (InterruptedException e) {
;
}
onConnected();
}
}
/**
* 心跳定时任务:定时检查与告警服务器的连接是否正常,发现连接异常后自动重新连接
*/
private class HeartbeatTask extends TimerTask {
// 省略其它代码
@Override
public void run() {
// 省略其它代码
if (!testConnection()) {
onDisconnected();
reconnect();
}
}
private boolean testConnection() {
// 省略其它代码
return true;
}
private void reconnect() {
ConnectingTask connectingThread = new ConnectingTask();
// 直接在心跳定时器线程中执行
connectingThread.run();
}
}
}
Blocker接口:
import java.util.concurrent.Callable;

/**
* @author
* @create 2017 -09-30 下午1:15
*/
public interface Blocker {
/**
* 在保护条件成立时执行目标动作,否则阻塞当前线程,直到保护条件成立。
* @param guardedAction 带保护条件的目标动作
* @return
* @throws Exception
*/
<V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception;
/**
* 执行stateOperation所指定的操作后,决定是否唤醒本Blocker
* 所暂挂的所有线程中的一个线程。
*
* @param stateOperation
* 更改状态的操作,其call方法的返回值为true时,该方法才会唤醒被暂挂的线程
*/
void signalAfter(Callable<Boolean> stateOperation) throws Exception;

void signal() throws InterruptedException;

/**
* 执行stateOperation所指定的操作后,决定是否唤醒本Blocker
* 所暂挂的所有线程。
*
* @param stateOperation
* 更改状态的操作,其call方法的返回值为true时,该方法才会唤醒被暂挂的线程
*/
void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}

ConditionVarBlocker
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author
* @create 2017 -09-30 下午1:18
*/
@Slf4j
public class ConditionVarBlocker implements Blocker {
private final Lock lock;

private final Condition condition;

private final boolean allowAccess2Lock;

public ConditionVarBlocker(Lock lock) {
this(lock, true);
}

private ConditionVarBlocker(Lock lock, boolean allowAccess2Lock) {
this.lock = lock;
this.allowAccess2Lock = allowAccess2Lock;
this.condition = lock.newCondition();
}

public ConditionVarBlocker() {
this(false);
}

public ConditionVarBlocker(boolean allowAccess2Lock) {
this(new ReentrantLock(), allowAccess2Lock);
}

public Lock getLock() {
if (allowAccess2Lock) {
return this.lock;
}
throw new IllegalStateException("Access to the lock disallowed.");
}

public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
//获得锁
lock.lockInterruptibly();
V result;
try {
//临界区代码
//获得保护条件
final Predicate guard = guardedAction.guard;
while (!guard.evaluate()) {
//保护条件不成立 线程挂起 暂挂ConditionVarBlocker实例
log.info("waiting...");
condition.await();
}
//执行目标动作
result = guardedAction.call();
return result;
} finally {
//释放锁 保证锁总是被释放的
lock.unlock();
}
}

public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()) {
condition.signal();
}
} finally {
lock.unlock();
}

}

public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()) {
condition.signalAll();
}
} finally {
lock.unlock();
}

}

public void signal() throws InterruptedException {
lock.lockInterruptibly();
try {
condition.signal();

} finally {
lock.unlock();
}

}
}
package com.credithc.finance.com.design.guardedmethod;

import java.util.concurrent.Callable;

/**
* @author
* @create 2017 -09-30 下午1:16
*/

public abstract class GuardedAction <V>implements Callable<V> {

protected final Predicate guard;

public GuardedAction(Predicate guard) {
this.guard = guard;
}
}
package com.credithc.finance.com.design.guardedmethod;

/**
* 保护条件判断
*
* @author
* @create 2017 -09-30 下午1:17
*/
public interface Predicate {
boolean evaluate();
}
线程Timer-0一直等待线程Thread-0所持有的锁(helper) 而线程Thread-0持有的helper一直未释放
可复用代码:Predicate,GuardedAction,Blocker,ConditionVarBlocker
需要自己实现的代码:GuaredObject ConcretePredicate ConcreteGuardedAction 这几个参与者实例

需要关注的问题

内存可见性和锁泄露
线程被过早唤醒
嵌套监视器锁死
可能增加JVM垃圾回收的负担

相关连接:http://blog.csdn.net/huzhiqiangCSDN/article/details/55045110

时间: 2024-08-26 14:41:33

多线程编程-设计模式之保护性暂挂(Guarded Suspesion)模式的相关文章

多线程编程-设计模式之不可变对象模式

Immutable Object设计模式适用场景:1.被建模对象的状态变化不频繁:设置一个专门的线程用于被建模对象状态发生变化时创建新的不可变对象.而其他线程只是读取不可变对象的状态.此场景下一个小技巧就是Manipulator对不可变对象的引用使用volatile关键字进行修饰,既可以避免使用显示锁比如synchronize,又可以保证多线程间的内存可见性.2.同时对一组相关的数据进行写操作,因此需要保证原子性此场景下为了保证操作的原子性,通常的做法是使用显示锁,但若采用Immutable O

Java多线程编程实战指南(设计模式篇,黄文海)-之管道线模式

不得不说,本人工作上很少有使用多线程编程技术的地方.由于本人工作上经常使用的是类似SSH等框架搭建MVC架构,所以更加习惯于编写一些优秀程序员所唾弃的样板式的代码.最近看了文海的多线程编程实战指南,瞬间眼前一亮.觉得有很多自己可以学习的,事实上,我已经在最近的项目中使用上了那本书介绍的两相终止模式.串行封闭模式.生产者消费者模式以及线程池等技术,确实在许多方面改进了我们服务端的吞吐量.说到这里本人吐槽一下,由于是毕业后转行,目前也才工作一年还不满2个月.所以原谅我的得瑟,但我相信我以后会做的更好

多线程编程基础知识

多线程编程基础知识 http://www.cnblogs.com/cy163/archive/2006/11/02/547428.html 当前流行的Windows操作系统能同时运行几个程序(独立运行的程序又称之为进程),对于同一个程序,它又可以分成若干个独立的执行流,我们称之为线程,线程提供了多任务处理的能力.用进程和线程的观点来研究软件是当今普遍采用的方法,进程和线程的概念的出现,对提高软件的并行性有着重要的意义.现在的大型应用软件无一不是多线程多任务处理,单线程的软件是不可想象的.因此掌握

Windows下多线程编程(一)

前言 熟练掌握Windows下的多线程编程,能够让我们编写出更规范多线程代码,避免不要的异常.Windows下的多线程编程非常复杂,但是了解一些常用的特性,已经能够满足我们普通多线程对性能及其他要求. 进程与线程 1. 进程的概念 进程就是正在运行的程序.主要包括两部分: • 一个是操作系统用来管理进程的内核对象.内核对象也是系统用来存放关于进程的统计信息的地方. • 另一个是地址空间,它包含所有可执行模块或 D L L模块的代码和数据.它还包含动态内 2. 线程的概念 线程就是描述进程的一条执

java多线程12设计模式

1.Single Threaded Execution Pattern(单线程运行模式) 2.Immutable Pattern(一成不变的模式) 3.Guarded Suspension Pattern(国防暂停模式) 4.Balking Pattern(止步模式,阻行模式) 5.Producer-Consumer Pattern(生产者-消费者模式) 6.Read-Write Lock Pattern(读-写锁模式) 7.Thread-Per-Message Pattern(每一个消息一个线

Java多线程编程详解

线程的同步 由于同一进程的多个线程共享同一片存储空间,在带来方便的同时,也带来了访问冲突这个严重的问题.Java语言提供了专门机制以解决这种冲突,有效避免了同一个数据对象被多个线程同时访问. 由于我们可以通过 private 关键字来保证数据对象只能被方法访问,所以我们只需针对方法提出一套机制,这套机制就是 synchronized 关键字,它包括两种用法:synchronized 方法和 synchronized 块. 1. synchronized 方法:通过在方法声明中加入 synch

Java多线程编程中Future模式的详解&lt;转&gt;

Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker模式.Guarded Suspeionsion模式.不变模式和生产者-消费者模式等.这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:关于其他多线程设计模式的地址如下:关于Master-Worker模式的详解: Java多线程编程中Master-Worker模式的详解关于Guarded Suspeionsion模式的详解: Java多线程编程中Guarded Suspeionsion模式

【转】Lua coroutine 不一样的多线程编程思路

Lua coroutine 不一样的多线程编程思路 Sunday, Apr 26th, 2009 by Tim | Tags: coroutine, Lua 上周末开始看<Lua程序设计>第二版,目前体会到其中比较有趣的有两点,一是强大的table数据结构,另外就是coroutine.也许Lua 中的coroutine是一种很好的设计模式,但我初步的体会还是没想到其他语言和场合能非常适合用到coroutine的场景. 一.简介 协同程序与线程差不多,也就是一条执行序列,拥有自己独立的栈,局部变

汪大神Java多线程编程实战

课程目录:├─1│  ├─Java并发编程.png│  ├─源码+ppt.rar│  ├─高并发编程第一阶段01讲.课程大纲及主要内容介绍.wmv│  ├─高并发编程第一阶段02讲.简单介绍什么是线程.wmv│  ├─高并发编程第一阶段03讲.创建并启动线程.mp4│  ├─高并发编程第一阶段04讲.线程生命周期以及start方法源码剖析.mp4│  ├─高并发编程第一阶段05讲.采用多线程方式模拟银行排队叫号.mp4│  ├─高并发编程第一阶段06讲.用Runnable接口将线程的逻辑执行单元