Callable与Runnable的区别及其在JDK源码中的应用

最近在学习java多线程方面的东西,在此希望把自己学到的东西做做总结,要想搞清楚实现原理,源码是最好的老师,因此这篇我打算从实践+源码角度来进行分析以下几个问题:

(1):Callable与Runnable的区别;

(2):Callable与Runnable的使用,并且通过Future对象获取Callable的返回值;

(3):JDK源码中对于Callable与Runnable是怎么使用的呢?

首先我们来看看源码中是怎么解释Callable和Runnable区别的:

Callable与Runnable的区别:

 * <p>The <tt>Callable</tt> interface is similar to {@link
 * java.lang.Runnable}, in that both are designed for classes whose
 * instances are potentially executed by another thread.  A
 * <tt>Runnable</tt>, however, does not return a result and cannot
 * throw a checked exception.

从注释中可以看出Callable与Runnable的区别在于:

(1):Callcble是可以有返回值的,具体的返回值就是在Callable的接口方法call返回的,并且这个返回值具体是通过实现Future接口的对象的get方法获取的,这个方法是会造成线程阻塞的;而Runnable是没有返回值的,因为Runnable接口中的run方法是没有返回值的;

(2):Callable里面的call方法是可以抛出异常的,我们可以捕获异常进行处理;但是Runnable里面的run方法是不可以抛出异常的,异常要在run方法内部必须得到处理,不能向外界抛出;

Callable与Runnable的使用实例,同时使用Future对象获取Callable中call方法的返回值:

对于Callable的使用,有两种方式可以实现:

方式1:通过单线程的方式我们自己实现,首先创建一个FutureTask对象,在创建FutureTask对象的时候,需要传入一个实现了Callable接口的对象,接着以该FutureTask作为Thread的参数,调用Thread的start方法开启线程执行任务,最后使用FutureTask的get方法获取到任务的返回值就可以了;那么为什么FutureTask对象可以作为Thread的参数呢?原因就在于FutureTask类实现了RunnableFuture接口,而RunnableFuture接口实现了Runnable和Future接口,那么当然可以作为Thread的参数了,我们来看看这种实现方式;

public class CallableTest {
	public static void main(String[] args) {
		//创建实现了Callable接口的对象
		MyCallable callable = new MyCallable();
		//将实现Callable接口的对象作为参数创建一个FutureTask对象
		FutureTask<String> task = new FutureTask<>(callable);
		//创建线程处理当前callable任务
		Thread thread = new Thread(task);
		//开启线程
		System.out.println("开始执行任务的时间: "+getNowTime());
		thread.start();
		//获取到call方法的返回值
		try {
			String result = task.get();
			System.out.println("得到返回值: "+result);
			System.out.println("结束执行get的时间: "+getNowTime());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static String getNowTime()
	{
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		return format.format(new Date());
	}
}
class MyCallable implements Callable<String>
{
	@Override
	public String call() throws Exception {
		Thread.sleep(3000);
		return "call method result";
	}
}

查看输出:

开始执行任务的时间: 2016-08-31 12:21:43
得到返回值: call method result
结束执行get的时间: 2016-08-31 12:21:46

可以看到确实得到了call方法的返回值,但是在调用get方法的时候却造成了主线程的阻塞,因为我们在call方法里面让子线程暂停了3秒,这时候如果不阻塞主线程的话,输出语句中的第三行时间不应该是12:21:46的,应该是12:21:43,因此验证了上面我们给出的结论,但是使用Runnable接口是不会造成主线程阻塞的,具体实例马上给出;

方式2:采用线程池的方式,我们可以使用线程池的方式来将当前实现Callable任务通过ThreadPoolExecutor的submit方法添加到线程池,submit方法会返回一个FutureTask对象,而FutureTask是实现了Future接口的,我们可以使用submit返回的FutureTask对象的get方法获取到任务的返回值,其实等会在源码分析的过程中你会发现线程池实现方式只是对我们自己采用线程方式实现的一种封装而已,没什么特别的啦;

public class CallableTest {
	public static void main(String[] args) {
		//创建实现了Callable接口的对象
		MyCallable callable = new MyCallable();
		//创建用于处理任务的线程池
		ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
		//将任务添加到线程池中并且获得返回的FutureTask对象
		System.out.println("提交任务的时间: "+getNowTime());
		FutureTask<String> task = (FutureTask<String>) threadPool.submit(callable);
		//获取到call方法的返回值
		try {
			String result = task.get();
			System.out.println("得到返回值: "+result);
			System.out.println("结束执行get的时间: "+getNowTime());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static String getNowTime()
	{
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		return format.format(new Date());
	}
}
class MyCallable implements Callable<String>
{
	@Override
	public String call() throws Exception {
		Thread.sleep(3000);
		return "call method result";
	}
}

查看输出:

提交任务的时间: 2016-08-31 14:44:09
得到返回值: call method result
结束执行get的时间: 2016-08-31 14:44:12

可以看到,使用线程池方式和我们自己定义线程实现效果是一样的,这就是Callable使用的两种方式啦;如果你对线程池的实现原理不是很清楚的话,可以查看我的另一篇博客:我眼中的java线程池实现原理

使用Runnable处理任务的情况:

public class RunnableTest {
	public static void main(String[] args) {
		MyRunnable runnable = new MyRunnable();
		Thread thread = new Thread(runnable);
		System.out.println("开始执行任务时间:  "+getNowTime());
		thread.start();
		System.out.println("启动任务之后时间:  "+getNowTime());
	}

	public static String getNowTime()
	{
		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		return format.format(new Date());
	}
}
class MyRunnable implements Runnable
{
	@Override
	public void run() {
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

查看输出:

开始执行任务时间:  2016-08-31 14:36:01
启动任务之后时间:  2016-08-31 14:36:01

可以看到,两次时间是一致的,说明执行的runnable任务并没有影响主线程任务的执行;

至此,我们有三个疑问需要解答,(1):使用Callable的过程中是怎样通过Future对象来获取返回值的,虽然从方法上讲调用Future的get方法就可以了,但是这个get方法里面的值是怎么来的;(2):为什么Future的get方法会带来阻塞问题;(3):采用线程池的方式处理Callable任务,JDK为我们封装了什么?下面从源码角度一一解答:

首先来看第一和第二个问题:

我们以自己定义线程使用Callable为例,在调用了线程的start方法之后会使得该线程处于就绪状态,有了竞争CPU时间片的权限,当分配到时间片之后,就会执行创建他的参数的run方法,也就是FutureTask的run方法,查看FutureTask的run方法如下:

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

第12行执行了c的call方法,而c就是我们在创建FutureTask对象的时候传递进来的实现了Callable接口的对象,也就是执行了我们任务的逻辑操作了,将返回的结果赋值给了result,并且在不发生异常的情况下会执行第20行的set方法,来看set方法:

 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

set方法做的工作比较简单,就是将结果赋给了outcome而已,outcome是Object类型的全局变量,同时将state状态设置为NORMAL,接着会执行finishCompletion,这个方法其实就是用来唤醒我们上面想要获得数据的get方法的,我们先来看看get方法里面的实现:

   public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

可以看到首先是先去查看当前state状态是否小于等于COMPLETING,你查看FutureTask源码的话,会发现有这么几种状态:

    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

state状态小于等于COMPLETING就表示我们刚刚的Callable是还没有处理完成的,那么就会调用awaitDone方法:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

awaitDone的业务逻辑还是挺复杂的,我大致来分析下,第7行判断如果当前线程被中断的话,则抛出异常,第12行获取当前任务状态state,第13行判断如果任务状态大于COMPLETING的话,则直接返回state状态,当然从上面源码中的状态列表中可以发现大于COMPLETING的状态有5种,可能是有正常返回值的,也可能是抛出异常的,具体怎么处理等会在回答get方法的时候是会介绍的,第18行如果任务是正在执行的话,则让出一段CPU时间继续运行,接着第20和22行的判断其实就是判断等待结点和等待队列为空的话创建一个出来而已,接着如果我们在调用awaitDone的时候,设置的timed参数是true的话,则会执行26行处的if语句块,会设置线程等待我们设置的时间,等待时间到了会唤醒此线程,我们平常使用的get方法默认timed值是false的,因此会执行到第34行,将当前线程锁起来,你如果仔细点的话会发现第6到34行是个死循环,也就是当34行处的锁定在其他地方(其实就是set方法里面了)解开的话,仍然会继续执行第6行,那么因为此时state状态已经发生了改变,此时执行已经和之前执行流程不同啦,一般来讲的话,会执行到第13行进行判断,最后执行第16行返回状态码就可以了;

那么get调用了LockSupport.park将自己锁住了,那么由谁来解锁呢?答案就是在set方法里面了,在刚刚的set方法的最后会执行finishCompletion方法,这个方法其实就是来解锁的啦:

 private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

可以看到第9行执行了LockSupport.unpark,相当于解锁了get方法中的LockSupport.park操作,这样的话awaitDone方法就会返回了,回答get方法里面:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

在通过awaitDone获得返回的state状态值之后,就会调用report方法,查看report方法:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

其实很简单了,就是根据state状态来进行相应的操作了,如果state只等于NORMAL的话,会直接返回值了,也就是我们的get方法返回值了,这就是我们自己通过FutureTask获得Callable返回值的源码过程以及为什么get方法会阻塞;

那么最后一个问题就是使用线程池方式获得Callable返回值的话,JDK为我们封装了什么?

首选当然应该从submit方法看起了,这个方法是在AbstractExecutorService类里面的:

  public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

可以看到使用线程池的方式实际上内部是通过为我们创建一个RunnableFuture对象并且返回这个对象的,这里的RunnableFuture对象实际上是FutureTask类型的,因为我们查看newTaskFor的实现可以发现:

   protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

实际上就是将Callable对象封装成了FutureTask对象为我们返回而已了;

到此,使用线程池来获得Callable任务返回值的神秘面纱被揭开了,即使我们自己没有显式创建FutureTask对象,但是JDK为我们创建了一个出来,之后的执行过程就和我们自己创建线程实现一致啦;

至此,我对于Callable和Runnable的总结结束啦,谢谢大家访问!

时间: 2024-10-11 16:08:50

Callable与Runnable的区别及其在JDK源码中的应用的相关文章

JDK源码分析之concurrent包(二) -- 线程池ThreadPoolExecutor

上一篇我们简单描述了Executor框架的结构,本篇正式开始并发包中部分源码的解读. 我们知道,目前主流的商用虚拟机在线程的实现上可能会有所差别.但不管如何实现,在开启和关闭线程时一定会耗费很多CPU资源,甚至在线程的挂起和恢复JDK1.6都做了自旋锁的优化.所以,使用线程池来管理和执行多线程任务会大大提高程序执行效率.关于使用线程池的优点这里不做过多说明,我们直接进入Java5并发包中ThreadPoolExecutor的实现的源码. 在解读源码前,我们先来看看创建线程池的一般做法和线程池的几

JDK源码分析之String篇

------------------------------String在内存中的存储情况(一下内容摘自参考资料1)----------------------------------- 前提:先了解下什么是声明,什么时候才算是产生了对象实例 其中x并未看到内存分配,变量在使用前必须先声明,再赋值,然后才可以使用.java基础数据类型会用对应的默认值进行初始化 一.首先看看Java虚拟机JVM的内存块及其变量.对象内存空间是怎么存储分配的 1.栈:存放基本数据类型及对象变量的引用,对象本身不存放

结合JDK源码看设计模式——简单工厂、工厂方法、抽象工厂

三种工厂模式的详解: 简单工厂模式: 适用场景:工厂类负责创建的对象较少,客户端只关心传入工厂类的参数,对于如何创建对象的逻辑不关心 缺点:如果要新加产品,就需要修改工厂类的判断逻辑,违背软件设计中的开闭原则,且产品类多的话,就会使得简单工厂类比较复杂 在jdk源码中的具体实例(注意看代码中的中文注释) private static Calendar createCalendar(TimeZone zone,Locale aLocale) { CalendarProvider provider

从JDK源码角度看java并发的原子性如何保证

JDK源码中,在研究AQS框架时,会发现很多地方都使用了CAS操作,在并发实现中CAS操作必须具备原子性,而且是硬件级别的原子性,java被隔离在硬件之上,明显力不从心,这时为了能直接操作操作系统层面,肯定要通过用C++编写的native本地方法来扩展实现.JDK提供了一个类来满足CAS的要求,sun.misc.Unsafe,从名字上可以大概知道它用于执行低级别.不安全的操作,AQS就是使用此类完成硬件级别的原子操作. Unsafe是一个很强大的类,它可以分配内存.释放内存.可以定位对象某字段的

【图解JDK源码】HashMap的基本原理与它的线程安全性

1. 前言 能用图说清楚的,就坚决不用代码.能用代码撸清楚的,就坚决不写解释(不是不写注释哦). 以下所有仅针对JDK 1.7及之前中的HashMap. 2. 数据结构 HashMap内部通过维护一个Entry<K, V>数组(变量为table),来实现其基本功能,而Entry<K, V>是HashMap的内部类,其主要作用便是存储键值对,其数据结构大致如下图所示. 从Entry的数据结构可以看出,多个Entry是可以形成一个单向链表的,HashMap中维护的Entry<K,

【图解JDK源码】HashMap的容量大小增长原理(JDK1.6/1.7/1.8)

1. 前言 HashMap的容量大小会根据其存储数据的数量多少而自动扩充,即当HashMap存储数据的数量到达一个阈值(threshold)时,再往里面增加数据,便可能会扩充HashMap的容量. 可能? 事实上,由于JDK版本的不同,其阈值(threshold)的默认大小也变得不同(主要是计算公式的改变),甚至连判断条件也变得不一样,所以如果说threshold = capacity * loadFactor(容量 * 负载因子)将不再绝对正确,甚至说超过阈值容量就会增长也不再绝对正确,下面就

【ThreadLocal】深入JDK源码之ThreadLocal类

学习JDK中的类,首先看下JDK API对此类的描述,描述如下: 该类提供了线程局部 (thread-local) 变量.这些变量不同于它们的普通对应物,因为访问某个变量(通过其 get 或 set 方法)的每个线程都有自己的局部变量,它独立于变量的初始化副本.ThreadLocal其实就是一个工具类,用来操作线程局部变量,ThreadLocal 实例通常是类中的 private static 字段.它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联. 例如,以下类生成对每个线程

JDK源码分析之concurrent包(三) -- Future方式的实现

上一篇我们基于JDK的源码对线程池ThreadPoolExecutor的实现做了分析,本篇来对Executor框架中另一种典型用法Future方式做源码解读.我们知道Future方式实现了带有返回值的程序的异步调用,关于异步调用的场景大家可以自行脑补Ajax的应用(获取返回结果的方式不同,Future是主动询问获取,Ajax是回调函数),这里不做过多说明. 在进入源码前,首先来看下Future方式相关的API: 接口Callable:有返回结果并且可能抛出异常的任务: 接口Future:表示异步

阅读JDK源码有感

最近加班不是很严重,爱上了查看JDK源码,每天回来,准备一杯咖啡,开始阅读,受益良多.从上周开始阅读,觉得还是写下感想和学习心得比较好.以后每天阅读,每天记下收获.总体来说,我觉得JDK源码写得十分漂亮,无论是从代码风格还是从重用性来说,都是相当出色的.之前阅读过Thinking in java,感觉很多东西都不能深入理解,太过于理论化,现在结合JDK看来,又别有一番感悟.以前每次有面试,都会从网上收集一些面试题,死记硬背一些知识,HashMap与HashTable的区别啊之类的,但是看了JDK