上文从源码分析Handler机制中从java层分析了消息机制,接下来本文从native层去分析Android中的消息机制。
在一个消息驱动的系统中,最重要的就是消息队列和消息获取和处理,从上一篇文章可以看出handler的消息机制主要是靠MessageQueue进行消息列队,靠Looper进行消息循环,Looper的loop方法中进行轮询消息的实际操作还是依靠MessageQueue的next方法来获取消息,也就是说在这个消息驱动机制中最重要的就是MessageQueue这个类了。在Android 2.3之前,只有java层中可以往MessageQueue中添加消息使得消息驱动正常的运作,在2.3之后,MessageQueue的核心部分移到了native层,MessageQueue兼顾了两个世界的来保证消息的运作。
在MessageQueue的构造方法中:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
构造函数调用nativeInit,该函数由Native层实现,native层的真正实现为android_os_MessageQueue.cpp中的android_os_MessageQueue_nativeInit方法。
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (! nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return;
}
android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}
NativeMessageQueue::NativeMessageQueue() {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
android_os_MessageQueue_nativeInit函数中创建以一个与java层MessageQueue对应点nativeMessageQueue消息队列,NativeMessageQueue构造中从当前线程中获取一个looper,如果当前线程没有到话,就实例化一个并且绑定到当前线程。
前一篇文章提到,当与消息机制相关的几个对象初始化完毕后,就要开始loop操作,而loop其实也就是循环的执行MessageQueue的next方法。
Message next() {
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// We can assume mPtr != 0 because the loop is obviously still running.
// The looper will not call this method after the loop quits.
nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
nativePollOnce方法返回后,就代表next方法就可以从mMessages中获取一个消息,也就是说如果消息队列中没有消息存在nativePollOnce就不会返回。
在MessageQueue的enqueueMessage方法中
boolean enqueueMessage(Message msg, long when) {
if (msg.isInUse()) {
throw new AndroidRuntimeException(msg + " This message is already in use.");
}
if (msg.target == null) {
throw new AndroidRuntimeException("Message must have a target.");
}
synchronized (this) {
if (mQuitting) {
RuntimeException e = new RuntimeException(
msg.target + " sending message to a Handler on a dead thread");
Log.w("MessageQueue", e.getMessage(), e);
return false;
}
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don‘t have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
添加完message后,调用了native层的nativeWake方法,这个应该是触发上面提到的nativePollOnce方法返回,好让加入的message得到分发处理。
在android_os_MessageQueue.cpp中:
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
在Looper.cpp中:
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ wake", this);
#endif
#ifdef LOOPER_STATISTICS
// FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
if (mPendingWakeCount++ == 0) {
mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
}
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
LOGW("Could not write wake signal, errno=%d", errno);
}
}
}
在wake方法中,惊讶的发现是往管道中写入了一个”w”,难道这样就可以唤醒nativePollOnce方法返回么?是不是也就意味着nativePollOnce方法中承载着这个管道的读操作呢?如果真是这样那在nativePollOnce方法的执行过程中肯定有这么一个监控这个管道的过程吧?这个都是猜测,我们接下来分析nativePollOnce方法的具体实现。
nativePollOnce的实现在android_os_MessageQueue.cpp中:
void NativeMessageQueue::pollOnce(int timeoutMillis) {
mLooper->pollOnce(timeoutMillis);
}
在Looper.cpp中:
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
if (! response.request.callback) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p", this,
response.request.ident, response.request.fd,
response.events, response.request.data);
#endif
if (outFd != NULL) *outFd = response.request.fd;
if (outEvents != NULL) *outEvents = response.events;
if (outData != NULL) *outData = response.request.data;
return response.request.ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = NULL;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
在Looper::pollOnce方法中你会发现使用了#if 和 #endif,这就代表着looper采用了编译选项来控制是否使用epoll机制来进行I/O复用。在linux的网络编程中,很长的一段时间都在使用select来做事件触发,在linux新的内核中使用了epoll来替换它,相比于select,epoll最大的好处在于它不会随着监听文件描述符数目的增长而效率降低,select机制是采用轮询来处理的,轮询的fd数目越多,效率也就越低。epoll的接口非常简单就只有三个函数:
- int epoll_create(int size);创建一个epoll句柄,当这个句柄创建完成之后,在/proc/进程id/fd中可以看到这个fd。
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);注册事件函数。
- int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);等待事件的发生,参数timeout是超时时间毫秒值,0会立即返回,-1将不确定,也就是说有可能永久阻塞。该函数返回需要处理的事件数目,如返回0表示已超时。
再回到Looper::pollOnce方法中,每次的for循环都会调用一个函数,不妨前去看看。
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
#ifdef LOOPER_STATISTICS
nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
#endif
#ifdef LOOPER_USES_EPOLL
// 这里表明是使用epoll的io复用凡方式
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 调用epoll_wait等待事件的发生
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
bool acquiredLock = false;
#else
// Wait for wakeAndLock() waiters to run then set mPolling to true.
mLock.lock();
while (mWaiters != 0) {
mResume.wait(mLock);
}
mPolling = true;
mLock.unlock();
size_t requestedCount = mRequestedFds.size();
int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
#endif
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
LOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
#ifdef LOOPER_USES_EPOLL
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
if (! acquiredLock) {
mLock.lock();
acquiredLock = true;
}
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
if (acquiredLock) {
mLock.unlock();
}
Done: ;
#else
for (size_t i = 0; i < requestedCount; i++) {
const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
short pollEvents = requestedFd.revents;
if (pollEvents) {
if (requestedFd.fd == mWakeReadPipeFd) {
if (pollEvents & POLLIN) {
// 是管道读端发生命令直接读取管道中的数据
awoken();
} else {
LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
}
} else {
int events = 0;
if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
pushResponse(events, mRequests.itemAt(i));
}
if (--eventCount == 0) {
break;
}
}
}
Done:
// Set mPolling to false and wake up the wakeAndLock() waiters.
mLock.lock();
mPolling = false;
if (mWaiters != 0) {
mAwake.broadcast();
}
mLock.unlock();
#endif
#ifdef LOOPER_STATISTICS
nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
mSampledPolls += 1;
if (timeoutMillis == 0) {
mSampledZeroPollCount += 1;
mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
} else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
mSampledTimeoutPollCount += 1;
mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
- milliseconds_to_nanoseconds(timeoutMillis);
}
if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
mSampledPolls = 0;
mSampledZeroPollCount = 0;
mSampledZeroPollLatencySum = 0;
mSampledTimeoutPollCount = 0;
mSampledTimeoutPollLatencySum = 0;
}
#endif
for (size_t i = 0; i < mResponses.size(); i++) {
const Response& response = mResponses.itemAt(i);
if (response.request.callback) {
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
response.request.fd, response.events, response.request.data);
#endif
int callbackResult = response.request.callback(
response.request.fd, response.events, response.request.data);
if (callbackResult == 0) {
removeFd(response.request.fd);
}
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
在上述调用epoll_wait方法等待事件的发生,timeoutMillis是我们在java层传递过来的,在MessageQueue的next方法中如果没有消息的时候其中的nextPollTimeoutMillis = -1,也就是说timeoutMillis为-1,那么在等待事情发生的时候,就有可能会造成永久阻塞,直到某个事件发生。如果有事件发生并且是管道读端的事件,那么就会直接读取管道中的数据。之前在分析Looper::woke方法时,就往管道中写入了数据。
版权声明:本文为博主原创文章,未经博主允许不得转载(联系方式:QQ312037487 邮箱:[email protected])。