

在一个消息驱动的系统中,最重要的就是消息队列和消息获取和处理,从上一篇文章可以看出handler的消息机制主要是靠MessageQueue进行消息列队,靠Looper进行消息循环,Looper的loop方法中进行轮询消息的实际操作还是依靠MessageQueue的next方法来获取消息,也就是说在这个消息驱动机制中最重要的就是MessageQueue这个类了。在Android 2.3之前,只有java层中可以往MessageQueue中添加消息使得消息驱动正常的运作,在2.3之后,MessageQueue的核心部分移到了native层,MessageQueue兼顾了两个世界的来保证消息的运作。


MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();


static void android_os_MessageQueue_nativeInit(JNIEnv* env,     jobject obj) {
    NativeMessageQueue* nativeMessageQueue = new    NativeMessageQueue();
    if (! nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
    android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);

NativeMessageQueue::NativeMessageQueue() {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);



Message next() {
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {

        // We can assume mPtr != 0 because the loop is obviously still running.
        // The looper will not call this method after the loop quits.
        nativePollOnce(mPtr, nextPollTimeoutMillis);

        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    msg.next = null;
                    if (false) Log.v("MessageQueue", "Returning message: " + msg);
                    return msg;
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;

            // Process the quit message now that all pending messages have been handled.
            if (mQuitting) {
                return null;

            // If first time idle, then get the number of idlers to run.
            // Idle handles only run if the queue is empty or if the first message
            // in the queue (possibly a barrier) is due to be handled in the future.
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                pendingIdleHandlerCount = mIdleHandlers.size();
            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
                mBlocked = true;

            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);

        // Run the idle handlers.
        // We only ever reach this code block during the first iteration.
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // release the reference to the handler

            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf("MessageQueue", "IdleHandler threw exception", t);

            if (!keep) {
                synchronized (this) {

        // Reset the idle handler count to 0 so we do not run them again.
        pendingIdleHandlerCount = 0;

        // While calling an idle handler, a new message could have been delivered
        // so go back and look again for a pending message without waiting.
        nextPollTimeoutMillis = 0;



boolean enqueueMessage(Message msg, long when) {
    if (msg.isInUse()) {
        throw new AndroidRuntimeException(msg + " This message is already in use.");
    if (msg.target == null) {
        throw new AndroidRuntimeException("Message must have a target.");

    synchronized (this) {
        if (mQuitting) {
            RuntimeException e = new RuntimeException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w("MessageQueue", e.getMessage(), e);
            return false;

        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don‘t have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;

        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
    return true;



static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    return nativeMessageQueue->wake();

void NativeMessageQueue::wake() {


void Looper::wake() {
        LOGD("%p ~ wake", this);

        // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
        if (mPendingWakeCount++ == 0) {
            mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);

        ssize_t nWrite;
        do {
            nWrite = write(mWakeWritePipeFd, "W", 1);
        } while (nWrite == -1 && errno == EINTR);

        if (nWrite != 1) {
            if (errno != EAGAIN) {
                LOGW("Could not write wake signal, errno=%d", errno);



void NativeMessageQueue::pollOnce(int timeoutMillis) {


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            if (! response.request.callback) {
            #if DEBUG_POLL_AND_WAKE
                LOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p", this,
                        response.request.ident, response.request.fd,
                        response.events, response.request.data);
                if (outFd != NULL) *outFd = response.request.fd;
                if (outEvents != NULL) *outEvents = response.events;
                if (outData != NULL) *outData = response.request.data;
                return response.request.ident;

        if (result != 0) {
            #if DEBUG_POLL_AND_WAKE
            LOGD("%p ~ pollOnce - returning result %d", this, result);
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = NULL;
            if (outData != NULL) *outData = NULL;
            return result;

        result = pollInner(timeoutMillis);

在Looper::pollOnce方法中你会发现使用了#if 和 #endif,这就代表着looper采用了编译选项来控制是否使用epoll机制来进行I/O复用。在linux的网络编程中,很长的一段时间都在使用select来做事件触发,在linux新的内核中使用了epoll来替换它,相比于select,epoll最大的好处在于它不会随着监听文件描述符数目的增长而效率降低,select机制是采用轮询来处理的,轮询的fd数目越多,效率也就越低。epoll的接口非常简单就只有三个函数:

  1. int epoll_create(int size);创建一个epoll句柄,当这个句柄创建完成之后,在/proc/进程id/fd中可以看到这个fd。
  2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);注册事件函数。
  3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);等待事件的发生,参数timeout是超时时间毫秒值,0会立即返回,-1将不确定,也就是说有可能永久阻塞。该函数返回需要处理的事件数目,如返回0表示已超时。


int Looper::pollInner(int timeoutMillis) {
    LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);

    int result = ALOOPER_POLL_WAKE;
    mResponseIndex = 0;

    nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);

    // 这里表明是使用epoll的io复用凡方式
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 调用epoll_wait等待事件的发生
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    bool acquiredLock = false;
    // Wait for wakeAndLock() waiters to run then set mPolling to true.
    while (mWaiters != 0) {
    mPolling = true;

    size_t requestedCount = mRequestedFds.size();
    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);

    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;

        LOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = ALOOPER_POLL_ERROR;
        goto Done;

    if (eventCount == 0) {
        LOGD("%p ~ pollOnce - timeout", this);
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;

    LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
        } else {
            if (! acquiredLock) {
                acquiredLock = true;

            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
    if (acquiredLock) {
    Done: ;
    for (size_t i = 0; i < requestedCount; i++) {
        const struct pollfd& requestedFd = mRequestedFds.itemAt(i);

        short pollEvents = requestedFd.revents;
        if (pollEvents) {
            if (requestedFd.fd == mWakeReadPipeFd) {
                if (pollEvents & POLLIN) {
                    // 是管道读端发生命令直接读取管道中的数据
                } else {
                    LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
            } else {
                int events = 0;
                if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
                if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
                if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
                if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
                pushResponse(events, mRequests.itemAt(i));
            if (--eventCount == 0) {

    // Set mPolling to false and wake up the wakeAndLock() waiters.
    mPolling = false;
    if (mWaiters != 0) {

    nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
    mSampledPolls += 1;
    if (timeoutMillis == 0) {
        mSampledZeroPollCount += 1;
        mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
    } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
        mSampledTimeoutPollCount += 1;
        mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
                - milliseconds_to_nanoseconds(timeoutMillis);
    if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
        LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
                0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
                0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
        mSampledPolls = 0;
        mSampledZeroPollCount = 0;
        mSampledZeroPollLatencySum = 0;
        mSampledTimeoutPollCount = 0;
        mSampledTimeoutPollLatencySum = 0;

    for (size_t i = 0; i < mResponses.size(); i++) {
        const Response& response = mResponses.itemAt(i);
        if (response.request.callback) {
            LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
                    response.request.fd, response.events, response.request.data);
            int callbackResult = response.request.callback(
                    response.request.fd, response.events, response.request.data);
            if (callbackResult == 0) {

            result = ALOOPER_POLL_CALLBACK;
    return result;

在上述调用epoll_wait方法等待事件的发生,timeoutMillis是我们在java层传递过来的,在MessageQueue的next方法中如果没有消息的时候其中的nextPollTimeoutMillis = -1,也就是说timeoutMillis为-1,那么在等待事情发生的时候,就有可能会造成永久阻塞,直到某个事件发生。如果有事件发生并且是管道读端的事件,那么就会直接读取管道中的数据。之前在分析Looper::woke方法时,就往管道中写入了数据。

