最近在学习java多线程方面的东西,在此希望把自己学到的东西做做总结,要想搞清楚实现原理,源码是最好的老师,因此这篇我打算从实践+源码角度来进行分析以下几个问题:
(1):Callable与Runnable的区别;
(2):Callable与Runnable的使用,并且通过Future对象获取Callable的返回值;
(3):JDK源码中对于Callable与Runnable是怎么使用的呢?
首先我们来看看源码中是怎么解释Callable和Runnable区别的:
Callable与Runnable的区别:
* <p>The <tt>Callable</tt> interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * <tt>Runnable</tt>, however, does not return a result and cannot * throw a checked exception.
从注释中可以看出Callable与Runnable的区别在于:
(1):Callcble是可以有返回值的,具体的返回值就是在Callable的接口方法call返回的,并且这个返回值具体是通过实现Future接口的对象的get方法获取的,这个方法是会造成线程阻塞的;而Runnable是没有返回值的,因为Runnable接口中的run方法是没有返回值的;
(2):Callable里面的call方法是可以抛出异常的,我们可以捕获异常进行处理;但是Runnable里面的run方法是不可以抛出异常的,异常要在run方法内部必须得到处理,不能向外界抛出;
Callable与Runnable的使用实例,同时使用Future对象获取Callable中call方法的返回值:
对于Callable的使用,有两种方式可以实现:
方式1:通过单线程的方式我们自己实现,首先创建一个FutureTask对象,在创建FutureTask对象的时候,需要传入一个实现了Callable接口的对象,接着以该FutureTask作为Thread的参数,调用Thread的start方法开启线程执行任务,最后使用FutureTask的get方法获取到任务的返回值就可以了;那么为什么FutureTask对象可以作为Thread的参数呢?原因就在于FutureTask类实现了RunnableFuture接口,而RunnableFuture接口实现了Runnable和Future接口,那么当然可以作为Thread的参数了,我们来看看这种实现方式;
public class CallableTest { public static void main(String[] args) { //创建实现了Callable接口的对象 MyCallable callable = new MyCallable(); //将实现Callable接口的对象作为参数创建一个FutureTask对象 FutureTask<String> task = new FutureTask<>(callable); //创建线程处理当前callable任务 Thread thread = new Thread(task); //开启线程 System.out.println("开始执行任务的时间: "+getNowTime()); thread.start(); //获取到call方法的返回值 try { String result = task.get(); System.out.println("得到返回值: "+result); System.out.println("结束执行get的时间: "+getNowTime()); } catch (Exception e) { e.printStackTrace(); } } public static String getNowTime() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(new Date()); } } class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(3000); return "call method result"; } }
查看输出:
开始执行任务的时间: 2016-08-31 12:21:43 得到返回值: call method result 结束执行get的时间: 2016-08-31 12:21:46
可以看到确实得到了call方法的返回值,但是在调用get方法的时候却造成了主线程的阻塞,因为我们在call方法里面让子线程暂停了3秒,这时候如果不阻塞主线程的话,输出语句中的第三行时间不应该是12:21:46的,应该是12:21:43,因此验证了上面我们给出的结论,但是使用Runnable接口是不会造成主线程阻塞的,具体实例马上给出;
方式2:采用线程池的方式,我们可以使用线程池的方式来将当前实现Callable任务通过ThreadPoolExecutor的submit方法添加到线程池,submit方法会返回一个FutureTask对象,而FutureTask是实现了Future接口的,我们可以使用submit返回的FutureTask对象的get方法获取到任务的返回值,其实等会在源码分析的过程中你会发现线程池实现方式只是对我们自己采用线程方式实现的一种封装而已,没什么特别的啦;
public class CallableTest { public static void main(String[] args) { //创建实现了Callable接口的对象 MyCallable callable = new MyCallable(); //创建用于处理任务的线程池 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); //将任务添加到线程池中并且获得返回的FutureTask对象 System.out.println("提交任务的时间: "+getNowTime()); FutureTask<String> task = (FutureTask<String>) threadPool.submit(callable); //获取到call方法的返回值 try { String result = task.get(); System.out.println("得到返回值: "+result); System.out.println("结束执行get的时间: "+getNowTime()); } catch (Exception e) { e.printStackTrace(); } } public static String getNowTime() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(new Date()); } } class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(3000); return "call method result"; } }
查看输出:
提交任务的时间: 2016-08-31 14:44:09 得到返回值: call method result 结束执行get的时间: 2016-08-31 14:44:12
可以看到,使用线程池方式和我们自己定义线程实现效果是一样的,这就是Callable使用的两种方式啦;如果你对线程池的实现原理不是很清楚的话,可以查看我的另一篇博客:我眼中的java线程池实现原理;
使用Runnable处理任务的情况:
public class RunnableTest { public static void main(String[] args) { MyRunnable runnable = new MyRunnable(); Thread thread = new Thread(runnable); System.out.println("开始执行任务时间: "+getNowTime()); thread.start(); System.out.println("启动任务之后时间: "+getNowTime()); } public static String getNowTime() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(new Date()); } } class MyRunnable implements Runnable { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
查看输出:
开始执行任务时间: 2016-08-31 14:36:01 启动任务之后时间: 2016-08-31 14:36:01
可以看到,两次时间是一致的,说明执行的runnable任务并没有影响主线程任务的执行;
至此,我们有三个疑问需要解答,(1):使用Callable的过程中是怎样通过Future对象来获取返回值的,虽然从方法上讲调用Future的get方法就可以了,但是这个get方法里面的值是怎么来的;(2):为什么Future的get方法会带来阻塞问题;(3):采用线程池的方式处理Callable任务,JDK为我们封装了什么?下面从源码角度一一解答:
首先来看第一和第二个问题:
我们以自己定义线程使用Callable为例,在调用了线程的start方法之后会使得该线程处于就绪状态,有了竞争CPU时间片的权限,当分配到时间片之后,就会执行创建他的参数的run方法,也就是FutureTask的run方法,查看FutureTask的run方法如下:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
第12行执行了c的call方法,而c就是我们在创建FutureTask对象的时候传递进来的实现了Callable接口的对象,也就是执行了我们任务的逻辑操作了,将返回的结果赋值给了result,并且在不发生异常的情况下会执行第20行的set方法,来看set方法:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
set方法做的工作比较简单,就是将结果赋给了outcome而已,outcome是Object类型的全局变量,同时将state状态设置为NORMAL,接着会执行finishCompletion,这个方法其实就是用来唤醒我们上面想要获得数据的get方法的,我们先来看看get方法里面的实现:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
可以看到首先是先去查看当前state状态是否小于等于COMPLETING,你查看FutureTask源码的话,会发现有这么几种状态:
private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
state状态小于等于COMPLETING就表示我们刚刚的Callable是还没有处理完成的,那么就会调用awaitDone方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
awaitDone的业务逻辑还是挺复杂的,我大致来分析下,第7行判断如果当前线程被中断的话,则抛出异常,第12行获取当前任务状态state,第13行判断如果任务状态大于COMPLETING的话,则直接返回state状态,当然从上面源码中的状态列表中可以发现大于COMPLETING的状态有5种,可能是有正常返回值的,也可能是抛出异常的,具体怎么处理等会在回答get方法的时候是会介绍的,第18行如果任务是正在执行的话,则让出一段CPU时间继续运行,接着第20和22行的判断其实就是判断等待结点和等待队列为空的话创建一个出来而已,接着如果我们在调用awaitDone的时候,设置的timed参数是true的话,则会执行26行处的if语句块,会设置线程等待我们设置的时间,等待时间到了会唤醒此线程,我们平常使用的get方法默认timed值是false的,因此会执行到第34行,将当前线程锁起来,你如果仔细点的话会发现第6到34行是个死循环,也就是当34行处的锁定在其他地方(其实就是set方法里面了)解开的话,仍然会继续执行第6行,那么因为此时state状态已经发生了改变,此时执行已经和之前执行流程不同啦,一般来讲的话,会执行到第13行进行判断,最后执行第16行返回状态码就可以了;
那么get调用了LockSupport.park将自己锁住了,那么由谁来解锁呢?答案就是在set方法里面了,在刚刚的set方法的最后会执行finishCompletion方法,这个方法其实就是来解锁的啦:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
可以看到第9行执行了LockSupport.unpark,相当于解锁了get方法中的LockSupport.park操作,这样的话awaitDone方法就会返回了,回答get方法里面:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
在通过awaitDone获得返回的state状态值之后,就会调用report方法,查看report方法:
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
其实很简单了,就是根据state状态来进行相应的操作了,如果state只等于NORMAL的话,会直接返回值了,也就是我们的get方法返回值了,这就是我们自己通过FutureTask获得Callable返回值的源码过程以及为什么get方法会阻塞;
那么最后一个问题就是使用线程池方式获得Callable返回值的话,JDK为我们封装了什么?
首选当然应该从submit方法看起了,这个方法是在AbstractExecutorService类里面的:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
可以看到使用线程池的方式实际上内部是通过为我们创建一个RunnableFuture对象并且返回这个对象的,这里的RunnableFuture对象实际上是FutureTask类型的,因为我们查看newTaskFor的实现可以发现:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
实际上就是将Callable对象封装成了FutureTask对象为我们返回而已了;
到此,使用线程池来获得Callable任务返回值的神秘面纱被揭开了,即使我们自己没有显式创建FutureTask对象,但是JDK为我们创建了一个出来,之后的执行过程就和我们自己创建线程实现一致啦;
至此,我对于Callable和Runnable的总结结束啦,谢谢大家访问!