java如何实现一个Future

实现Futrue接口

public class MsgFuture<V> implements java.util.concurrent.Future<V> {
    ...
    ...
}

  

Future的主要特性为Future.get()、

  get()
 get(long timeout, TimeUnit unit)

主要思路如下:构造MsgFuture时,设置开始时间,这里是sendTime;设置timeout,默认get()方法的超时时间,我们的程序不可能会无限等待默认的get()对应的值域是result,默认为一个NULL对象,标识没有返回数据

result的值需要其他线程在做完任务后将值写到Future对象中,这里暴露了一个方法setResult(object)
    /**
     * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)}
     * @param result
     */
    public synchronized void setResult(Object result) {
        reentrantLock.lock();
        try {
            this.result = result;
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }

    }

  使用ReentrantLock来进行数据可见性控制

condition.signalAll()可以唤醒condition.await的阻塞wait

至于其他线程如何调用到setResult(object)方法,可以使用ConcurrentHashMap,key为msgId,值为MsgFuture对象,设置成一个全局的,或两个线程都可访问,其他线程根据msgId获取到MsgFuture,然后调用setResult(object)方法

    /**
     * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @SuppressWarnings("all")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long left = getLeftTime(timeout, unit);                      //根据timeout配置获取剩余的世界
        if(left < 0){
            //已经没有剩余时间
            if(isDone()){                                            //如果已经完成,直接放回结果
                return (V)this.result;
            }else{
                //timeout
                throw new TimeoutException("返回超时,后续的响应将会被丢弃abort");
            }
        }else{

            reentrantLock.lock();                                    //同步
            try {
                //获取锁后先判断是否已经完成,防止无意义的await
                if(isDone()){                                        //先判断是否已经完成
                    return (V)this.result;                           //直接返回
                }
                logger.debug("await "+left+" ms");
                condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);       //没有返回,阻塞等待,如果condition被唤醒,也会提前退出
            }finally {
                reentrantLock.unlock();
            }
            if(isDone()){                                            //被唤醒或超时时间已到,尝试判断是否完成
                return (V)this.result;                               //返回
            }

            throw new TimeoutException("未获取到结果");                //超时
        }
    }

  

    public boolean isDone() {
        return this.result != NULL;
    }

  


全部代码
public class MsgFuture<V> implements java.util.concurrent.Future<V> {

    private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class);

    /**
     * 全局的空对象,如果Future获取到值了,那么一定不是NULL
     */
    private final static Object NULL = new Object();
    /**
     * 主锁
     */
    private final ReentrantLock reentrantLock = new ReentrantLock();

    /**
     * 条件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法来实现阻塞、唤醒
     */
    private final Condition condition = reentrantLock.newCondition();

    private int timeout;

    private volatile Object result = NULL;

    private long sendTime;

    public MsgFuture(int timeout, long sendTime) {
        this.timeout = timeout;
        this.sendTime = sendTime;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    public boolean isCancelled() {
        return false;
    }

    public boolean isDone() {
        return this.result != NULL;
    }

    /**
     * 获取future结果
     * @return
     * @throws InterruptedException
     */
    public V get() throws InterruptedException {
        logger.debug("sendTime:{}",sendTime);
        try {
            return get(timeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            logger.error("获取future结果异常", e);
        }
        return null;
    }

    /**
     * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @SuppressWarnings("all")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long left = getLeftTime(timeout, unit);
        if(left < 0){
            //已经没有剩余时间
            if(isDone()){
                return (V)this.result;
            }else{
                //timeout
                throw new TimeoutException("返回超时,后续的响应将会被丢弃abort");
            }
        }else{

            reentrantLock.lock();
            try {
                //获取锁后先判断是否已经完成,防止无意义的await
                if(isDone()){
                    return (V)this.result;
                }
                logger.debug("await "+left+" ms");
                condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);
            }finally {
                reentrantLock.unlock();
            }
            if(isDone()){
                return (V)this.result;
            }

            throw new TimeoutException("未获取到结果");
        }
    }

    /**
     * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)}
     * @param result
     */
    public synchronized void setResult(Object result) {
        reentrantLock.lock();
        try {
            this.result = result;
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }

    }

    /**
     * 计算剩余时间
     * @param timeout
     * @param unit
     * @return
     */
    private long getLeftTime(long timeout, TimeUnit unit){
        long now = System.currentTimeMillis();
        timeout = unit.toMillis(timeout); // 转为毫秒
        return timeout - (now - sendTime);
    }

    /*public static void main(String[] args) {
        MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis());

        //测试先唤醒、后get是否正常
        msgFuture.setResult("yoxi");

        try {
            System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            logger.error("Interrupt异常", e);
        } catch (TimeoutException e) {
            logger.error("测试先唤醒,后get出错", e);
        }
    }*/
}

  

				
时间: 2024-11-29 09:36:40

java如何实现一个Future的相关文章

java多线程之Future和FutureTask

Executor框架使用Runnable 作为其基本的任务表示形式.Runnable是一种有局限性的抽象,然后可以写入日志,或者共享的数据结构,但是他不能返回一个值. 许多任务实际上都是存在延迟计算的:执行数据库查询,从网络上获取资源,或者某个复杂耗时的计算.对于这种任务,Callable是一个更好的抽象,他能返回一个值,并可能抛出一个异常.Future表示一个任务的周期,并提供了相应的方法来判断是否已经完成或者取消,以及获取任务的结果和取消任务. public interface Callab

Java多线程编程中Future模式的详解&lt;转&gt;

Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker模式.Guarded Suspeionsion模式.不变模式和生产者-消费者模式等.这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:关于其他多线程设计模式的地址如下:关于Master-Worker模式的详解: Java多线程编程中Master-Worker模式的详解关于Guarded Suspeionsion模式的详解: Java多线程编程中Guarded Suspeionsion模式

Java并发——Callable和Future

Callable和Future Executor框架将工作单元划分为任务,即任务是逻辑上的工作单元,而线程是任务异步执行的机制.Runnable是任务的一个抽象,并且理想状态下任务是独立的执行,但是Runnable的run( )不能返回一个结果或者抛出一个受检查的异常,这与我们有些实际任务是不相符的.在通过线程或者executor执行Runnable任务中,不仅仅是不能返回任务的执行结果,有时我们希望可以控制某个任务,或取消或终止,但在executor中一旦提交任务,我们将很难单一的控制任务的生

java中,一个简单但出错率又大的‘加法’题,1+1+&#39;1&#39;+1+1+1+1+&quot;1&quot;=?

1+1+'1'+1+1+1+1+"1"=? 结果是多少?很多人看了题之后,可能会说结果是71.  当然有的童鞋可能会说很简单,放工具里运行一下就知道结果了,如果不运行代码,你会得出一个什么样的结果呢? 如果告诉你答案是551,会迷惑么?怎么会得出551? 下面我们来看看怎么算的: 1.我们大家都知道1 .'1'."1"的区别,1 表示一个int类型,’1'是表示一个char类型,"1" 表示一个字符串类型. 2.1+1+'1'+1+1+1+1+&

Android 和Java API的一个坑:SimpleDateFormat

今天上班遇到这么一个意料之外的异常: 出问题的代码是这样的(已去除上下文信息): Log.i(LOG_TAG, new SimpleDateFormat("YYYY-MM-dd HH:mm:ss", Locale.CHINA) .format(System.currentTimeMillis())); 反复检查,感觉没有问题,于是新建一个Java Project,直接输出同样的代码: public class Main{ public static void main(String[]

java 随机生成一个中文、判断某个string是否是中文以及打印出全部的中文

现在网上大多数用于判断中文字符的是 U+4E00..U+9FA5 这个范围是只是"中日韩统一表意文字"这个区间,但这不是全部,如果要全部包含,则还要他们的扩展集.部首.象形字.注间字母等等; 2E80-A4CF: 包含了中日朝部首补充.康熙部首.表意文字描述符.中日朝符号和标点.日文平假名.日文片假名.注音字母.谚文兼容字母.象形字注释标志.注音字母扩展.中日朝笔画.日文片假名语音扩展.带圈中日朝字母和月份.中日朝兼容.中日朝统一表意文字扩展A.易经六十四卦符号.中日韩统一表意文字.彝

java模拟而一个电话本操作

哈哈,大家平时都在使用电话本,下面使用java来模拟而一个简单的电话本吧... 首先给出联系人的抽象类 package net.itaem.po; /** * * 电话人的信息 * */ public class User { private String name; private String phoneNumber; private String companyName; private String email; private String address; private Strin

java实现把一个大文件切割成N个固定大小的文件

//java实现把一个大文件切割成N个固定大小的文件 package com.johnny.test; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; public class FenGeFile { p

Java实现的一个简单的下载器

package com.shawearn.download;import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.EOFException;import java.io.File;http://www.huiyi8.com/jiaoben/import j