实现Futrue接口
public class MsgFuture<V> implements java.util.concurrent.Future<V> { ... ... }
Future的主要特性为Future.get()、
get()
get(long timeout, TimeUnit unit) 主要思路如下:构造MsgFuture时,设置开始时间,这里是sendTime;设置timeout,默认get()方法的超时时间,我们的程序不可能会无限等待默认的get()对应的值域是result,默认为一个NULL对象,标识没有返回数据 result的值需要其他线程在做完任务后将值写到Future对象中,这里暴露了一个方法setResult(object)
/** * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)} * @param result */ public synchronized void setResult(Object result) { reentrantLock.lock(); try { this.result = result; condition.signalAll(); }finally { reentrantLock.unlock(); } }
使用ReentrantLock来进行数据可见性控制
condition.signalAll()可以唤醒condition.await的阻塞wait
至于其他线程如何调用到setResult(object)方法,可以使用ConcurrentHashMap,key为msgId,值为MsgFuture对象,设置成一个全局的,或两个线程都可访问,其他线程根据msgId获取到MsgFuture,然后调用setResult(object)方法
/** * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException * @param timeout * @param unit * @return * @throws InterruptedException * @throws TimeoutException */ @SuppressWarnings("all") public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long left = getLeftTime(timeout, unit); //根据timeout配置获取剩余的世界 if(left < 0){ //已经没有剩余时间 if(isDone()){ //如果已经完成,直接放回结果 return (V)this.result; }else{ //timeout throw new TimeoutException("返回超时,后续的响应将会被丢弃abort"); } }else{ reentrantLock.lock(); //同步 try { //获取锁后先判断是否已经完成,防止无意义的await if(isDone()){ //先判断是否已经完成 return (V)this.result; //直接返回 } logger.debug("await "+left+" ms"); condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS); //没有返回,阻塞等待,如果condition被唤醒,也会提前退出 }finally { reentrantLock.unlock(); } if(isDone()){ //被唤醒或超时时间已到,尝试判断是否完成 return (V)this.result; //返回 } throw new TimeoutException("未获取到结果"); //超时 } }
public boolean isDone() { return this.result != NULL; }
全部代码
public class MsgFuture<V> implements java.util.concurrent.Future<V> { private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class); /** * 全局的空对象,如果Future获取到值了,那么一定不是NULL */ private final static Object NULL = new Object(); /** * 主锁 */ private final ReentrantLock reentrantLock = new ReentrantLock(); /** * 条件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法来实现阻塞、唤醒 */ private final Condition condition = reentrantLock.newCondition(); private int timeout; private volatile Object result = NULL; private long sendTime; public MsgFuture(int timeout, long sendTime) { this.timeout = timeout; this.sendTime = sendTime; } public boolean cancel(boolean mayInterruptIfRunning) { return false; } public boolean isCancelled() { return false; } public boolean isDone() { return this.result != NULL; } /** * 获取future结果 * @return * @throws InterruptedException */ public V get() throws InterruptedException { logger.debug("sendTime:{}",sendTime); try { return get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { logger.error("获取future结果异常", e); } return null; } /** * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException * @param timeout * @param unit * @return * @throws InterruptedException * @throws TimeoutException */ @SuppressWarnings("all") public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long left = getLeftTime(timeout, unit); if(left < 0){ //已经没有剩余时间 if(isDone()){ return (V)this.result; }else{ //timeout throw new TimeoutException("返回超时,后续的响应将会被丢弃abort"); } }else{ reentrantLock.lock(); try { //获取锁后先判断是否已经完成,防止无意义的await if(isDone()){ return (V)this.result; } logger.debug("await "+left+" ms"); condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS); }finally { reentrantLock.unlock(); } if(isDone()){ return (V)this.result; } throw new TimeoutException("未获取到结果"); } } /** * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)} * @param result */ public synchronized void setResult(Object result) { reentrantLock.lock(); try { this.result = result; condition.signalAll(); }finally { reentrantLock.unlock(); } } /** * 计算剩余时间 * @param timeout * @param unit * @return */ private long getLeftTime(long timeout, TimeUnit unit){ long now = System.currentTimeMillis(); timeout = unit.toMillis(timeout); // 转为毫秒 return timeout - (now - sendTime); } /*public static void main(String[] args) { MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis()); //测试先唤醒、后get是否正常 msgFuture.setResult("yoxi"); try { System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { logger.error("Interrupt异常", e); } catch (TimeoutException e) { logger.error("测试先唤醒,后get出错", e); } }*/ }
时间: 2024-11-29 09:36:40