最近用到了AsyncTask,这玩意每个写android程序的都会用,可是不见得每个人都能用的好。如果想要用好,那么首先势必对基本原理有个大概了解。其实网上对这类问题的说明已经很多很多了,这里我就用自己的思维整理一下。
AsyncTask概述
AsyncTask是google公司封装的一个轻量级的异步任务类。实际上它内部也是通过Thread + handler实现的。如果没有AsyncTask类,我们完全可以用thread+handler来处理。这个时候就很可能自己回去封装一下thread+handler了。正是因为这类需求很多,google就帮我们封装了一下。其实我们也可以自己封装,但是我相信99%程序员自己封装的东西比不上google的。所以还是有必要学习一下AsyncTask。
AsyncTask相关的其他类
首先我们看一下AsyncTask用到了哪些主要的框架?注意,每个版本的sdk可能对AsyncTask的实现有所不同,我这里用的是Android-23.
下面是从sdk里面copy出来的AsyncTask一部分内容。
public abstract class AsyncTask<Params, Progress, Result> { private static final String LOG_TAG = "AsyncTask"; private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); /** * An {@link Executor} that can be used to execute tasks in parallel. */ public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); /** * An {@link Executor} that executes tasks one at a time in serial * order. This serialization is global to a particular process. */ public static final Executor SERIAL_EXECUTOR = new SerialExecutor(); private static final int MESSAGE_POST_RESULT = 0x1; private static final int MESSAGE_POST_PROGRESS = 0x2; private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR; private static InternalHandler sHandler; private final WorkerRunnable<Params, Result> mWorker; private final FutureTask<Result> mFuture; private volatile Status mStatus = Status.PENDING; ... };
AsyncTask的数据成员主要有:
1. static Executor THREAD_POOL_EXECUTOR
2. static Executor SERIAL_EXECUTOR;
3. static Executor sDefaultExecutor; 这玩意默认指向了SERIAL_EXECUTOR。看源代码就知道了:
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
4. static InternalHandler sHandler;
5. WorkerRunnable<Params, Result> mWorker;
6. FutureTask<Result> mFuture;
7. Status mStatus;
这7个是AsyncTask主要的数据成员,大部分功能都跟这些成员有关。主要上述7个数据成员中,前面4个都是静态的。
THREAD_POOL_EXECUTOR & SERIAL_EXECUTOR
这两个静态实例的类分别是:ThreadPoolExecutor和SerialExecutor.
类结构看起来像:
其中SerialExecutor很简单,全部代码也就:
private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }
SerialExecutor是AsyncTask的一个内嵌类。超简单,里面就2个数据成员:mTasks和mActive。每次caller调用那个execute,就创建一个Runnable匿名内嵌类对象,这个对象存入mTasks,在匿名内嵌类的run函数里面调用传入参数r.run()。然后通过一个scheduleNext函数把mTasks里面的所有对象通过THREAD_POOL_EXECUTOR.execute(mActive)执行一遍。说穿了,也就是说SerialExecutor类会把所有的任务丢入一个容器,之后把容器里面的所有对象一个一个的排队执行THREAD_POOL_EXECUTOR.execute(mActive);
ThreadPoolExecutor就相对复杂一点了。它有几个比较重要的数据成员:核心线程数,最大线程数,缓存队列。以下上AsyncTask的一部分代码。
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<runnable> sPoolWorkQueue = new LinkedBlockingQueue<runnable>(128); public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);</runnable></runnable>
当创建ThreadPoolExecutor对象的时候,构造函数里面第一个参数就是核心线程数量,这里取得是CPU个数 + 1, 第二个参数是最大线程数量,这里是CPU个数 * 2 + 1,第五个参数是缓冲区的队列,这里是个LinkedBlockingQueue,这个队列的最大容量是128.
实际上ThreadPoolExecutor内部有个线程池概念。它的大概工作原理如下:
1. 如果正在运行的线程数量小于核心线程数量(由调用者设置,像AsyncTask就设置了cpu个数+1),那么就新创建要给线程,来执行任务(execute的传入参数)
2. 如果正在运行的线程数量大于等于核心线程数量,这个时候就分两种情况:
a. 可以把任务丢进缓冲区,那就丢进去,等待空闲线程来执行。
b. 如果缓冲区满了,那就看最大线程数 - 运行线程数是不是>0。如果 > 0, 就创建线程来运行新的任务。如果=0,那就丢出异常,也就是ThreadPoolExecutor不接受这个任务了。(所以使用ThreadPoolExecutor的时候需要注意异常,因为它有可能不接受任务)以下是ThreadPoolExecutor的execute代码。
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
addWork也是蛮关键的一个函数,实现如下:大概就是可以的情况下,创建线程来执行任务。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
那么针对AsyncTask的情况,AsyncTask的任务是通过SerialExecutor来调用ThreadPoolExecutor的execute函数的,而SerialExecute已经通过要给容器控制任务一个一个执行了,所以这种情况下ThreadPoolExecutor只会有两种情况;
1. 没有任何线程在运行
2. 只有一个线程在运行,执行完一个任务就继续执行SerialExecutor的容器里面的下一个任务,如果有,就在当前线程里面继续执行,如果没有线程结束或者空闲。当serialExecutor有新任务来的时候,就再启动一个线程(或者用某个空闲线程)来执行任务。
总体来讲,针对AsyncTask,它有个静态数据成员SerialExecutor,
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
也就是说所有的AsyncTask对象,不管有多少个,都共享同一个SerialExecutor对象(因为它是个静态成员)。
mWorker & mFuture
其实,这两个家伙只是对Runnable和callback的一个封装。结构图:
具体细节我们不见得要去关心。当AsyncTask构造函数调用的时候,mWorker和mFuture会被创建,同时mWoker会被传入到mFuture对象里面去
public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; }
这两个家伙的定义如下;
WorkerRunnable<Params, Result> mWorker;
FutureTask<Result> mFuture;
WorkerRunnable超简单:
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; }
就放了个数据成员:Params[] mParams。
那么整个流程大概是什么样子的呢?
1. 首先AsyncTask的构造函数会创建mWorker和mFuture。
2. 调用AsyncTask的execute过程如:
public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } public final AsyncTask<params result="" progress=""> executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; exec.execute(mFuture); return this; } </params>
execute的函数params被丢到了mWorker里面去,然后exec.execute(mFuture)执行任务,这里exec是sDefaultExecutor,而sDefaultExecutor就是SerialExecutor,
(private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;)
有关WorkerRunnable和FutureTask的具体封装技巧,这里不做过多的描述,有兴趣可以自己去看,没兴趣就跳过。
调用例子
一个典型的调用如下:
AsyncTask<Void, Void, String> asyncTask = new AsyncTask<Void, Void, String>(){ @Override protected String doInBackground(Void... param) { Log.v("AsyncTask", "doInBackground"); return "hello asyncTask"; } @Override public void onPostExecute(String response) { // callback.onSendRequestFinished(JsonUtil.jsonToBean(response, mBeanType)); Toast.makeText(MainActivity.this, "result: " + response, Toast.LENGTH_LONG).show(); } }; asyncTask.execute();
比如我们可以在onCreate里面调用这段代码。如果我们在doInBackground里面下断点,就会看到如下调用堆栈:
最终的doInBackground是在AsyncTask的构造函数里面创建的匿名内嵌类里面被调用的。
public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; }
mWorker是WorkerRunnable的子类(匿名内嵌子类)对象,mWorker被传给了mFuture,FutureTask的 callable就是mWorker。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
mFuture被传到了AsyncTask的execute里面,mFuture实际上就是Runnable的一个子类,mFuture被被SerialExecute传给ThreadPoolExecute来执行。大概流程:
1. ThreadPoolExecutor里面的一个线程执行任务(mFuture)
2. FutureTask的run()会在线程里面被执行
3. Future的run()里面会尝试获得callable,然后调用callable的call()函数。callable就是mWorker,也就是WorkRunnable,而WorkRunnable实现了接口Callable,Callable里面有个方法就是call()。
public void run() { if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
4. 这样,mWorker是WorkerRunnable的子类对象,而且刚好实现了call函数,而call函数通过接口Callable在FutureTask的run()里面被调用了。所以AsyncTask的构造函数里面的匿名内嵌类里面的call实现被ThreadPoolExecutor的线程调用了。也就是这段代码:
mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } };
就这样,AsyncTask的doInBackgroun被一个线程给调用了。
整个AsyncTask的大致流程就是这样,当然还有其他一些内容,如cancel, onPreExecute, onPostExecute, onCancelled等等,还有ThreadPoolExecutor执行完任务后,怎么通知主线程的等等问题。
画了个总体类图:可能不是很准确,但是可以看出各个类之间的关系,作为参考。