Android开发者们应该都知道AsyncTask这个类,它是系统提供的一个异步任务类,可以方便的让我们实现异步操作。在本篇文章中,我将带大家进入源码,简单分析一下AsyncTask的实现。
首先,贴上AsyncTask类的源码:
package android.os; import java.util.ArrayDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public abstract class AsyncTask<Params, Progress, Result> { private static final String LOG_TAG = "AsyncTask"; // 获取可用CPU的数目 private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; // 线程工厂类,供线程池创建新线程时使用 private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); // 创建一个线程池,用来进行我们的任务 public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); public static final Executor SERIAL_EXECUTOR = new SerialExecutor(); private static final int MESSAGE_POST_RESULT = 0x1; private static final int MESSAGE_POST_PROGRESS = 0x2; private static final InternalHandler sHandler = new InternalHandler(); private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR; private final WorkerRunnable<Params, Result> mWorker; // 可以理解为:要执行的任务 private final FutureTask<Result> mFuture; private volatile Status mStatus = Status.PENDING; private final AtomicBoolean mCancelled = new AtomicBoolean(); // 原子布尔值,指示任务是否已经取消 private final AtomicBoolean mTaskInvoked = new AtomicBoolean(); // 指示任务是否被调用执行 // 实现自己的任务执行器,其实作用就是安排任务到线程中执行 // 在下面的具体实现中,我们可以知道,任务是按顺序进行执行 // 的;也就是当一个任务执行完成,才会部署开始下一个任务, // 即:这个AsyncTask是串行处理任务的。 // private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; // 当前正在执行的任务 public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); // 上一个任务完成后,部署进行下一个任务 } } }); if (mActive == null) { // 当前没有正在执行的任务,那就启动下一个任务 scheduleNext(); } } protected synchronized void scheduleNext() { // 从任务队列中抽取出队头任务 if ((mActive = mTasks.poll()) != null) { // 在这里可以看到,任务实际上还是安排到线程池中进行 THREAD_POOL_EXECUTOR.execute(mActive); } } } // 定义任务的三种状态,只有处于预备状态的任务才能够调用execute执行, // 否则将会报错 public enum Status { PENDING, // 预备状态 RUNNING, // 正在执行状态 FINISHED, // 完成 } // 进行消息循环 public static void init() { sHandler.getLooper(); } // 默认执行器 public static void setDefaultExecutor(Executor exec) { sDefaultExecutor = exec; } public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); // 在这里调用了doInBackground,也即是在后台线程中执行我们的任务。 // 执行完doInBackground,调用postResult把执行结果发送出去 return postResult(doInBackground(mParams)); } }; mFuture = new FutureTask<Result>(mWorker) { // 当任务执行完成的时候调用,我会在后面讲到这个方法在何处调用。 @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occured while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; } private void postResultIfNotInvoked(Result result) { final boolean wasTaskInvoked = mTaskInvoked.get(); if (!wasTaskInvoked) { postResult(result); } } // 通过Handler把我们完成的结果发送到主线程的消息队列中,然后由指定的 // sHandler进行处理。注意:这个方法是在子线程中进行操作的 private Result postResult(Result result) { @SuppressWarnings("unchecked") // 任务执行完成,把执行完的结果封装成AsyncTaskResult Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT, new AsyncTaskResult<Result>(this, result)); message.sendToTarget(); return result; } public final Status getStatus() { return mStatus; } /** * 这个方法由子类实现,我们要把程序中想要异步进行的任务放到这个方法中执行 * 注意:这个方法是在子线程中执行的,所以我们不能在这里进行UI相关的操作。 */ protected abstract Result doInBackground(Params... params); // 这个方法在doInBackground之前执行;主要是进行异步任务前的准备操作 protected void onPreExecute() { } // 这个方法在任务执行完成后调用,参数result就是任务执行完成后的结果 @SuppressWarnings({"UnusedDeclaration"}) protected void onPostExecute(Result result) { } // 这个方法能提供任务的执行进度 @SuppressWarnings({"UnusedDeclaration"}) protected void onProgressUpdate(Progress... values) { } // 这个方法在任务取消时调用的 @SuppressWarnings({"UnusedParameters"}) protected void onCancelled(Result result) { onCancelled(); } // 如果我们想要在任务取消时进行一些操作,可以重写这个方法中进行处理 protected void onCancelled() { } // 判断当前任务是否被取消了。返回的值就是前面的原子布尔值中的值 public final boolean isCancelled() { return mCancelled.get(); } // 取消正在执行的任务,主要是通过Future类中的cancel方法来处理的, // 后面我会对这个方法进行分析 public final boolean cancel(boolean mayInterruptIfRunning) { mCancelled.set(true); return mFuture.cancel(mayInterruptIfRunning); } public final Result get() throws InterruptedException, ExecutionException { return mFuture.get(); } public final Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return mFuture.get(timeout, unit); } // 调用这个方法,则开始了我们的异步任务。该方法需要传入异步任务需要的参数 public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } // 前面提到了,AsyncTask是串行的,不过,实际上也是可以改成并行的。下面这个方法可以指定 // 我们自己的执行器来执行操作,因此,只需要传入并行的执行器,则可以使AsyncTask并行起来 public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { // 前面已经提到了,只有在预备状态(PENDING)下,才能够调用execute来执行任务 // 否则将会出现异常,这里就是它抛出异常的源头。 case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; // 任务状态改为执行 onPreExecute(); // 主要是让调用者在任务执行前做一些准备操作。 mWorker.mParams = params; exec.execute(mFuture); return this; } // 使用默认的执行器来执行任务 public static void execute(Runnable runnable) { sDefaultExecutor.execute(runnable); } // 调用这个方法,可以通知调用者任务执行的进度,注意:这个方法在子线程中进行 protected final void publishProgress(Progress... values) { if (!isCancelled()) { // 把任务进度封装成AsyncTaskRusult类 sHandler.obtainMessage(MESSAGE_POST_PROGRESS, new AsyncTaskResult<Progress>(this, values)).sendToTarget(); } } // 任务完成时调用;任务完成有两种情况: // 1. 任务被取消,这时将会回调onCanceled()方法,因此,如果我们在任务取消时, // 想要执行一些处理操作,那么就要重写onCanceled()方法了。 // 2. 任务顺利执行完成。这时会调用onPostExecute方法,同时把执行结果传递给这个 // 方法处理。 private void finish(Result result) { if (isCancelled()) { onCancelled(result); } else { onPostExecute(result); } mStatus = Status.FINISHED; } // 定义一个Handler,处理任务相关的操作 private static class InternalHandler extends Handler { @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult result = (AsyncTaskResult) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // 任务执行完成, result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: // 任务执行进度 result.mTask.onProgressUpdate(result.mData); break; } } } // 这个类封装了任务所需要的参数。它继承了Callable接口,后面我会讲继承这个接口的作用 private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; } @SuppressWarnings({"RawUseOfParameterizedType"}) private static class AsyncTaskResult<Data> { final AsyncTask mTask; final Data[] mData; AsyncTaskResult(AsyncTask task, Data... data) { mTask = task; mData = data; } } }
我在代码上面做了一些注释。基本上我们需要了解的也就是这些注释到的地方。下面就来开始分析了:
首先,先看看SerialExecutor。
private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; // 当前正在执行的任务 public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); // 上一个任务完成后,部署进行下一个任务 } } }); if (mActive == null) { // 当前没有正在执行的任务,那就启动下一个任务 scheduleNext(); } } protected synchronized void scheduleNext() { // 从任务队列中抽取出队头任务 if ((mActive = mTasks.poll()) != null) { // 在这里可以看到,任务实际上还是安排到线程池中进行 THREAD_POOL_EXECUTOR.execute(mActive); } } }
在这个类是AsyncTask默认的执行器,也就是说我们的任务都是在这里面执行的。在execute()方法中,可以知道,使用了一个ArrayDeque来存放我们的任务。它先构建了一个Runnable对象,然后在Runnable中的run方法调用了execute传入的Runable r参数的run方法,然后调用scheduleNext(),这样做的作用是:在完成上一个任务后,接着启动下一个任务。
execute()方法的最后,会判断当前是否有执行的任务,即mActive是否为空,如果不为空,那就调用scheduleNext()来启动下一个任务。现在重点分析scheduleNext()方法。在该方法中,可以看出,任务最终还是被安排到已经创建好的线程池中执行。这里就是实现AsyncTask串行的地方,可以注意一下。
再来分析一下AsyncTask的构造方法:
public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); // 在这里调用了doInBackground,也即是在后台线程中执行我们的任务。 // 执行完doInBackground,调用postResult把执行结果发送出去 return postResult(doInBackground(mParams)); } }; mFuture = new FutureTask<Result>(mWorker) { // 当任务执行完成的时候调用,我会在后面讲到这个方法在何处调用。 @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occured while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; }
可以知道,构造方法中生成了两个对象,mWorker和mFuture。先看看WorkerRunnable<Params, Result>是如何定义的:
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; }
它封装了任务所需要的参数,然后就是继承了Callable接口(Callable中只定义了call()方法)。因此,在mWorker对象的初始化的时候,我们必须要实现call()方法;好,现在回到mWorker中的call方法,它调用了doInBackground(),原来,我们在使用AsyncTask的时候,在doINBackground方法中,实现的操作就是在这里被调用的!!!此时,它已经是在子线程中进行的了。为什么呢?我后面再讲!再看看mFuture对象,它需要一个传入一个Callable类型的参数,而mWorker恰好实现了Callable接口!那我们不妨看一下Future类吧。
public class FutureTask<V> implements RunnableFuture<V> { private Callable<V> callable; ...... public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } ......
在Future的构造方法中,可以知道,它把我们传进来的Callable对象保存在变量callable中了;我们先简单记住这个变量!后面它还会为我们解答一些问题。回到mFuture的实现上,它重写了done()方法,这里我们只需要知道它在任务完成的时候回调用就行了!这样,我们就把AsyncTask的构造方法简单的分析好了。
接下来,分析AsyncTask的execute(Params... params)方法;
public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; // 任务状态改为执行 onPreExecute(); // 主要是让调用者在任务执行前做一些准备操作。 mWorker.mParams = params; exec.execute(mFuture); return this; }
execute(Params... params)方法实际上是调用了executeOnExecutor(Executor exec, Params... params)方法,那我们具体看看这个它里面做了些什么。它调用了onPostExecute(),可以让调用者做一些执行任务前的准备操作、把传入的params参数保存到mWorker中,然后再调用exec的execute(mFuture)方法。其中,exec就是我们传进来的默认执行器sDefaultExecutor!!!sDefaultExecutor的execute()方法我们在前面已经分析了!它是使用线程池来执行我们的任务的。由于TheadPoolExecutor的实现比较复杂,所以这里暂时不做分析。我们只需知道它接下来会调用到mFuture的run()方法即可,那我们先看看这个run()方法中,做了些什么。
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
我们只看try里面的代码。可以看出,如果mFuture中的callable对象不为null,那么就会回调它的call方法了。在前面,我们已经知道了mFuture把我们传进来的mWorker对象保存在了Callable变量里面了,因此,这里调用的call()方法就是mWorker的。而这个时候,call()方法是在子线程中调用的,由此就可以知道,doInBackground()也就是在子线程进行的!如果任务正常完成,那么变量ran将会是true,那么它会调用set(result),看看set()方法的源码:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
它把任务完成的结果保存在outcome变量中,接着调用了finishCompletion()方法,了解下这个方法的实现:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
finishCompletion中,它会调用done()方法,让调用者对完成的任务结果进行处理。
我们接下来分析一下,AsyncTask是如何取消任务的!
找到AsyncTask的cancel方法:
public final boolean cancel(boolean mayInterruptIfRunning) { mCancelled.set(true); return mFuture.cancel(mayInterruptIfRunning); }
它调用 了mFuture的cancel方法。下面是改方法的实现:
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
从代码中可以知道,取消操作实际上是让运行当前要取消的任务的线程中断了,同样,该方法也会调用finishCompletion()方法,然后在finishCompletion方法中调用done()方法,让调用者能够对取消任务后进行相关处理。
至此,AsyncTask执行异步任务的基本流程就简单的分析完了。在文章前面的AsyncTask源码里面,我已经做了比较详细的注释,读者可以结合注释,然后对AsyncTask整个流程执行分析,相信读者能够掌握这些步骤!
欢迎大家交流和转载,转载请标明:http://blog.csdn.net/wuzhipeng1991