Android消息处理惩罚机制(Handler、Looper、MessageQueue与Message)

Android是消息驱动的,实现消息驱动有几个要素:

(1)消息的默示:Message

(2)消息队列:MessageQueue

(3)消息轮回,用于轮回取出消息进行处理惩罚:Looper

(4)消息处理惩罚,消息轮回从消息队列中取出消息后要对消息进行处理惩罚:Handler

  日常平凡我们最常应用的就是Message与Handler了,若是应用过HandlerThread或者本身实现类似HandlerThread的器材可能还会接触到Looper,而MessageQueue是Looper内部应用的,对于标准的SDK,我们是无法实例化并应用的(机关函数是包可见性)。

  我们日常平凡接触到的Looper、Message、Handler都是用JAVA实现的,Android做为基于Linux的体系,底层用C、C++实现的,并且还有NDK的存在,消息驱动的模型怎么可能只存在于JAVA层,实际上,在Native层存在与Java层对应的类如Looper、MessageQueue等。

初始化消息队列

  起首来看一下若是一个线程想实现消息轮回应当怎么做,以HandlerThread为例:

publicvoidrun() {

mTid=Process.myTid();

Looper.prepare();

synchronized(this) {

mLooper=Looper.myLooper();

notifyAll();

}

Process.setThreadPriority(mPriority);

onLooperPrepared();

Looper.loop();

mTid= -1;

}

  主如果红色标明的两句,起首调用prepare初始化MessageQueue与Looper,然后调用loop进入消息轮回。先看一下Looper.prepare。

publicstaticvoidprepare() {

prepare(true);

}

privatestaticvoidprepare(booleanquitAllowed) {

if(sThreadLocal.get() !=null) {

thrownewRuntimeException("Only one Looper may be created per thread");

}

sThreadLocal.set(newLooper(quitAllowed));

}

  重载函数,quitAllowed默认为true,从名字可以看出来就是消息轮回是否可以退出,默认是可退出的,Main线程(UI线程)初始化消息轮回时会调用prepareMainLooper,传进去的是false。应用了ThreadLocal,每个线程可以初始化一个Looper。

  再来看一下Looper在初始化时都做了什么:

privateLooper(booleanquitAllowed) {

mQueue=newMessageQueue(quitAllowed);

mRun=true;

mThread=Thread.currentThread();

}

MessageQueue(booleanquitAllowed) {

mQuitAllowed=quitAllowed;

nativeInit();

}

  在Looper初始化时,新建了一个MessageQueue的对象保存了在成员mQueue中。MessageQueue的机关函数是包可见性,所以我们是无法直接应用的,在MessageQueue初始化的时辰调用了nativeInit,这是一个Native办法:

staticvoidandroid_os_MessageQueue_nativeInit(JNIEnv*env, jobject obj) {

NativeMessageQueue* nativeMessageQueue =newNativeMessageQueue();

if(!nativeMessageQueue) {

jniThrowRuntimeException(env,"Unable to allocate native queue");

return;

}

nativeMessageQueue->incStrong(env);

android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);

}

staticvoidandroid_os_MessageQueue_setNativeMessageQueue(JNIEnv*env, jobject messageQueueObj,

NativeMessageQueue*nativeMessageQueue) {

env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,

reinterpret_cast<jint>(nativeMessageQueue));

}

  在nativeInit中,new了一个Native层的MessageQueue的对象,并将其地址保存在了Java层MessageQueue的成员mPtr中,Android中有很多多少如许的实现,一个类在Java层与Native层都有实现,经由过程JNI的GetFieldID与SetIntField把Native层的类的实例地址保存到Java层类的实例的mPtr成员中,比如Parcel。

  再看NativeMessageQueue的实现:

NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {

mLooper=Looper::getForThread();

if(mLooper ==NULL) {

mLooper=newLooper(false);

Looper::setForThread(mLooper);

}

}

  在NativeMessageQueue的机关函数中获得了一个Native层的Looper对象,Native层的Looper也应用了线程本地存储,重视new Looper时传入了参数false。

Looper::Looper(boolallowNonCallbacks) :

mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),

mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {

intwakeFds[2];

int result =pipe(wakeFds);

LOG_ALWAYS_FATAL_IF(result!=0,"Could not create wake pipe.  errno=%d", errno);

mWakeReadPipeFd= wakeFds[0];

mWakeWritePipeFd = wakeFds[1];

result=fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);

LOG_ALWAYS_FATAL_IF(result!=0,"Could not make wake read pipe non-blocking.  errno=%d",

errno);

result=fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);

LOG_ALWAYS_FATAL_IF(result!=0,"Could not make wake write pipe non-blocking.  errno=%d",

errno);

//Allocate the epoll instance and register the wake pipe.

mEpollFd =epoll_create(EPOLL_SIZE_HINT);

LOG_ALWAYS_FATAL_IF(mEpollFd<0,"Could not create epoll instance.  errno=%d", errno);

structepoll_event eventItem;

memset(& eventItem,0,sizeof(epoll_event));//zero out unused members of data field union

eventItem.events =EPOLLIN;

eventItem.data.fd=mWakeReadPipeFd;

result= epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, &eventItem);

LOG_ALWAYS_FATAL_IF(result!=0,"Could not add wake read pipe to epoll instance.  errno=%d",

errno);

}

  Native层的Looper应用了epoll。初始化了一个管道,用mWakeWritePipeFd与mWakeReadPipeFd分别保存了管道的写端与读端,并了读端的EPOLLIN事务。重视下初始化列表的值,mAllowNonCallbacks的值为false。

  mAllowNonCallback是做什么的?应用epoll仅为了mWakeReadPipeFd的事务?其实Native Looper不仅可以这一个描述符,Looper还供给了addFd办法:

intaddFd(intfd,intident,intevents, ALooper_callbackFunc callback,void*data);

intaddFd(intfd,intident,intevents,constsp<LooperCallback>& callback,void* data);

  fd默示要的描述符。ident默示要的事务的标识,值必须>=0或者为ALOOPER_POLL_BACK(-2),event默示要的事务,callback是事务产生时的回调函数,mAllowNonCallbacks的感化就在于此,当mAllowNonCallbacks为true时容许callback为NULL,在pollOnce中ident作为成果返回,不然不容许callback为空,当callback不为NULL时,ident的值会被忽视。还是直接看代码便利懂得:

intLooper::addFd(intfd,intident,intevents,constsp<LooperCallback>& callback,void*data) {

#ifDEBUG_CALLBACKS

ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0 x%x, callback=%p, data=%p",this, fd, ident,

events, callback.get(), data);

#endif

if(!callback.get()) {

if(!mAllowNonCallbacks) {

ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");

return-1;

}

if(ident <0) {

ALOGE("Invalid attempt to set NULL callback with ident < 0.");

return-1;

}

}else{

ident=ALOOPER_POLL_CALLBACK;

}

intepollEvents =0;

if(events & ALOOPER_EVENT_INPUT) epollEvents |=EPOLLIN;

if(events & ALOOPER_EVENT_OUTPUT) epollEvents |=EPOLLOUT;

{//acquire lock

AutoMutex _l(mLock);

Request request;

request.fd=fd;

request.ident=ident;

request.callback=callback;

request.data=data;

structepoll_event eventItem;

memset(& eventItem,0,sizeof(epoll_event));//zero out unused members of data field union

eventItem.events =epollEvents;

eventItem.data.fd=fd;

ssize_t requestIndex=mRequests.indexOfKey(fd);

if(requestIndex <0) {

intepollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, &eventItem);

if(epollResult <0) {

ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);

return-1;

}

mRequests.add(fd, request);

}else{

intepollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, &eventItem);

if(epollResult <0) {

ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);

return-1;

}

mRequests.replaceValueAt(requestIndex, request);

}

}//release lock

return1;

}

  若是callback为空会搜检mAllowNonCallbacks看是否容许callback为空,若是容许callback为空还会检测ident是否>=0。若是callback不为空会把ident的值赋值为ALOOPER_POLL_CALLBACK,不管传进来的是什么值。

  接下来把传进来的参数值封装到一个Request布局体中,并以描述符为键保存到一个KeyedVector mRequests中,然后经由过程epoll_ctl添加或调换(若是这个描述符之前有调用addFD添加)对这个描述符事务的。

  类图:

  

发送消息

  经由过程Looper.prepare初始化好消息队列后就可以调用Looper.loop进入消息轮回了,然后我们就可以向消息队列发送消息,消息轮回就会取出消息进行处理惩罚,在看消息处理惩罚之前,先看一下消息是怎么被添加到消息队列的。

  在Java层,Message类默示一个消息对象,要发送消息起首就要先获得一个消息对象,Message类的机关函数是public的,然则不建议直接new Message,Message内部保存了一个缓存的消息池,我们可以用obtain从缓存池获得一个消息,Message应用完后体系会调用recycle收受接管,若是本身new很多Message,每次应用完后体系放入缓存池,会占用很多内存的,如下所示:

publicstaticMessage obtain() {

synchronized(sPoolSync) {

if(sPool !=null) {

Message m=sPool;

sPool=m.next;

m.next=null;

sPoolSize--;

returnm;

}

}

returnnewMessage();

}

publicvoidrecycle() {

clearForRecycle();

synchronized(sPoolSync) {

if(sPoolSize <MAX_POOL_SIZE) {

next=sPool;

sPool=this;

sPoolSize++;

}

}

}

  Message内部经由过程next成员实现了一个链表,如许sPool就了为了一个Messages的缓存链表。

  消息对象获取到了怎么发送呢,大师都知道是经由过程Handler的post、sendMessage等办法,其实这些办法终极都是调用的同一个办法sendMessageAtTime:

publicbooleansendMessageAtTime(Message msg,longuptimeMillis) {

MessageQueue queue=mQueue;

if(queue ==null) {

RuntimeException e=newRuntimeException(

this+ " sendMessageAtTime() called with no mQueue");

Log.w("Looper", e.getMessage(), e);

returnfalse;

}

returnenqueueMessage(queue, msg, uptimeMillis);

}

  sendMessageAtTime获取到消息队列然后调用enqueueMessage办法,消息队列mQueue是从与Handler接洽关系的Looper获得的。

privatebooleanenqueueMessage(MessageQueue queue, Message msg,longuptimeMillis) {

msg.target=this;

if (mAsynchronous) {

msg.setAsynchronous(true);

}

returnqueue.enqueueMessage(msg, uptimeMillis);

}

  enqueueMessage将message的target设置为当前的handler,然后调用MessageQueue的enqueueMessage,在调用queue.enqueueMessage之前断定了mAsynchronous,从名字看是异步消息的意思,要熟悉打听Asynchronous的感化,须要先懂得一个概念Barrier。

Barrier与Asynchronous Message

  Barrier是什么意思呢,从名字看是一个阻碍器,在这个阻碍器后面的消息都临时无法履行,直到这个阻碍器被移除了,MessageQueue有一个函数叫enqueueSyncBarier可以添加一个Barrier。

intenqueueSyncBarrier(longwhen) {

//Enqueue a new sync barrier token.

//We don""t need to wake the queue because the purpose of a barrier is to stall it.

synchronized(this) {

finalinttoken = mNextBarrierToken++;

finalMessage msg =Message.obtain();

msg.arg1=token;

Message prev=null;

Message p=mMessages;

if(when != 0) {

while(p !=null&& p.when <=when) {

prev=p;

p=p.next;

}

}

if(prev !=null) {//invariant: p == prev.next

msg.next =p;

prev.next=msg;

}else{

msg.next=p;

mMessages=msg;

}

returntoken;

}

}

  在enqueueSyncBarrier中,obtain了一个Message,并设置msg.arg1=token,token仅是一个每次调用enqueueSyncBarrier时自增的int值,目标是每次调用enqueueSyncBarrier时返回独一的一个token,这个Message同样须要设置履行时候,然后插入到消息队列,特别的是这个Message没有设置target,即msg.target为null。

  进入消息轮回后会不绝地从MessageQueue中作废息履行,调用的是MessageQueue的next函数,此中有这么一段:

Message msg =mMessages;

if(msg !=null&& msg.target ==null) {

//Stalled by a barrier.  Find the next asynchronous message in the queue.

do{

prevMsg=msg;

msg=msg.next;

}while(msg !=null&& !msg.isAsynchronous());

}

  若是队列头部的消息的target为null就默示它是个Barrier,因为只有两种办法往mMessages中添加消息,一种是enqueueMessage,另一种是enqueueBarrier,而enqueueMessage中若是mst.target为null是直接抛异常的,后面会看到。

  所谓的异步消息其实就是如许的,我们可以经由过程enqueueBarrier往消息队列中插入一个Barrier,那么队列中履行时候在这个Barrier今后的同步消息都邑被这个Barrier阻碍住无法履行,直到我们调用removeBarrier移除了这个Barrier,而异步消息则没有影响,消息默认就是同步消息,除非我们调用了Message的setAsynchronous,这个办法是隐蔽的。只有在初始化Handler时经由过程参数指定往这个Handler发送的消息都是异步的,如许在Handler的enqueueMessage中就会调用Message的setAsynchronous设置消息是异步的,从上方Handler.enqueueMessage的代码中可以看到。

  所谓异步消息,其实只有一个感化,就是在设置Barrier时仍可以不受Barrier的影响被正常处理惩罚,若是没有设置Barrier,异步消息就与同步消息没有差别,可以经由过程removeSyncBarrier移除Barrier:

voidremoveSyncBarrier(inttoken) {

//Remove a sync barrier token  the queue.

//If the queue is no longer stalled by a barrier then wake it.

finalbooleanneedWake;

synchronized(this) {

Message prev=null;

Message p=mMessages;

while(p !=null&& (p.target !=null|| p.arg1 !=token)) {

prev=p;

p=p.next;

}

if(p ==null) {

thrownewIllegalStateException("The specified message queue synchronization "

+ " barrier token has not been posted or has already been removed.");

}

if(prev !=null) {

prev.next=p.next;

needWake=false;

}else{

mMessages=p.next;

needWake= mMessages ==null|| mMessages.target !=null;

}

p.recycle();

}

if(needWake) {

nativeWake(mPtr);

}

}

  参数token就是enqueueSyncBarrier的返回值,若是没有调用指定的token不存在是会抛异常的。

enqueueMessage

  接下来看一下是怎么MessageQueue的enqueueMessage。

finalbooleanenqueueMessage(Message msg,longwhen) {

if(msg.isInUse()) {

thrownewAndroidRuntimeException(msg + " This message is already in use.");

}

if (msg.target == null) {

throw new AndroidRuntimeException("Message must have a target.");

}

booleanneedWake;

synchronized(this) {

if(mQuiting) {

RuntimeException e=newRuntimeException(

msg.target+ " sending message to a Handler on a dead thread");

Log.w("MessageQueue", e.getMessage(), e);

returnfalse;

}

msg.when=when;

Message p=mMessages;

if(p ==null|| when == 0 || when <p.when) {

//New head, wake up the event queue if blocked.

msg.next =p;

mMessages=msg;

needWake=mBlocked;

}else{

//Inserted within the middle of the queue.  Usually we don""t have to wake

//up the event queue unless there is a barrier at the head of the queue

//and the message is the earliest asynchronous message in the queue.

needWake = mBlocked && p.target == null &&msg.isAsynchronous();

Message prev;

for(;;) {

prev=p;

p=p.next;

if(p ==null|| when <p.when) {

break;

}

if(needWake &&p.isAsynchronous()) {

needWake=false;

}

}

msg.next= p;//invariant: p == prev.next

prev.next =msg;

}

}

if(needWake) {

nativeWake(mPtr);

}

returntrue;

}

  重视上方代码红色的项目组,当msg.target为null时是直接抛异常的。

  在enqueueMessage中起首断定,若是当前的消息队列为空,或者新添加的消息的履行时候when是0,或者新添加的消息的履行时候比消息队列头的消息的履行时候还早,就把消息添加到消息队列头(消息队列按时候排序),不然就要找到合适的地位将当前消息添加到消息队列。

Native发送消息

  消息模型不只是Java层用的,Native层也可以用,前面也看到了消息队列初始化时也同时初始化了Native层的Looper与NativeMessageQueue,所以Native层应当也是可以发送消息的。与Java层不合的是,Native层是经由过程Looper发消息的,同样所有的发送办法终极是调用sendMessageAtTime:

voidLooper::sendMessageAtTime(nsecs_t uptime,constsp<MessageHandler>&handler,

constMessage&message) {

#ifDEBUG_CALLBACKS

ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",

this, uptime, handler.get(), message.what);

#endif

size_t i=0;

{//acquire lock

AutoMutex _l(mLock);

size_t messageCount=mMessageEnvelopes.size();

while(i < messageCount && uptime >=mMessageEnvelopes.itemAt(i).uptime) {

i+=1;

}

MessageEnvelope messageEnvelope(uptime, handler, message);

mMessageEnvelopes.At(messageEnvelope, i,1);

//Optimization: If the Looper is currently sending a message, then we can skip

//the call to wake() because the next thing the Looper will do after processing

//messages is to decide when the next wakeup time should be.  In fact, it does

//not even matter whether this code is running on the Looper thread.

if(mSendingMessage) {

return;

}

}//release lock

//Wake the poll loop only when we enqueue a new message at the head.

if(i ==0) {

wake();

}

}

  Native Message只有一个int型的what字段用来区分不合的消息,sendMessageAtTime指定了Message,Message要履行的时候when,与处理惩罚这个消息的Handler:MessageHandler,然后用MessageEnvelope封装了time, MessageHandler与Message,Native层发的消息都保存到了mMessageEnvelopes中,mMessageEnvelopes是一个Vector<MessageEnvelope>。Native层消息同样是按时候排序,与Java层的消息分别保存在两个队列里。

消息轮回

  消息队列初始化好了,也知道怎么发消息了,下面就是怎么处理惩罚消息了,看Handler.loop函数:

publicstaticvoidloop() {

finalLooper me =myLooper();

if(me ==null) {

thrownewRuntimeException("No Looper; Looper.prepare() wasn""t called on this thread.");

}

finalMessageQueue queue =me.mQueue;

//Make sure the identity of this thread is that of the local process,

//and keep track of what that identity token actually is.

Binder.clearCallingIdentity();

finallongident =Binder.clearCallingIdentity();

for(;;) {

Message msg= queue.next();//might block

if (msg == null) {

// No message indicates that the message queue is quitting.

return;

}

//This must be in a local variable, in case a UI event sets the logger

Printer logging =me.mLogging;

if(logging !=null) {

logging.println(">>>>> Dispatching to " + msg.target + " " +

msg.callback+ ": " +msg.what);

}

msg.target.dispatchMessage(msg);

if(logging !=null) {

logging.println("<<<<< Finished to " + msg.target + " " +msg.callback);

}

//Make sure that during the course of dispatching the

//identity of the thread wasn""t corrupted.

finallongnewIdent =Binder.clearCallingIdentity();

if(ident !=newIdent) {

Log.wtf(TAG,"Thread identity changed  0 x"

+ Long.toHexString(ident) + " to 0 x"

+ Long.toHexString(newIdent) + " while dispatching to "

+ msg.target.getClass().getName() + " "

+ msg.callback + " what=" +msg.what);

}

msg.recycle();

}

}

  loop每次从MessageQueue取出一个Message,调用msg.target.dispatchMessage(msg),target就是发送message时跟message接洽关系的handler,如许就调用到了熟悉的dispatchMessage,Message被处理惩罚后会被recycle。当queue.next返回null时会退出消息轮回,接下来就看一下MessageQueue.next是怎么取出消息的,又会在什么时辰返回null。

finalMessage next() {

intpendingIdleHandlerCount = -1;//-1 only during first iteration

intnextPollTimeoutMillis = 0;

for(;;) {

if(nextPollTimeoutMillis != 0) {

Binder.flushPendingCommands();

}

nativePollOnce(mPtr, nextPollTimeoutMillis);

synchronized(this) {

if (mQuiting) {

return null;

}

//Try to retrieve the next message.  Return if found.

finallongnow =SystemClock.uptimeMillis();

Message prevMsg=null;

Message msg=mMessages;

if(msg !=null&& msg.target ==null) {

//Stalled by a barrier.  Find the next asynchronous message in the queue.

do{

prevMsg=msg;

msg=msg.next;

}while(msg !=null&& !msg.isAsynchronous());

}

if(msg !=null) {

if(now <msg.when) {

//Next message is not ready.  Set a timeout to wake up when it is ready.

nextPollTimeoutMillis = (int) Math.min(msg.when -now, Integer.MAX_VALUE);

}else{

//Got a message.

mBlocked =false;

if(prevMsg !=null) {

prevMsg.next=msg.next;

}else{

mMessages=msg.next;

}

msg.next=null;

if(false) Log.v("MessageQueue", "Returning message: " +msg);

msg.markInUse();

returnmsg;

}

}else{

//No more messages.

nextPollTimeoutMillis = -1;

}

//If first time idle, then get the number of idlers to run.

//Idle handles only run if the queue is empty or if the first message

//in the queue (possibly a barrier) is due to be handled in the future.

if(pendingIdleHandlerCount < 0

&& (mMessages ==null|| now <mMessages.when)) {

pendingIdleHandlerCount=mIdleHandlers.size();

}

if(pendingIdleHandlerCount <= 0) {

//No idle handlers to run.  Loop and wait some more.

mBlocked =true;

continue;

}

if(mPendingIdleHandlers ==null) {

mPendingIdleHandlers=newIdleHandler[Math.max(pendingIdleHandlerCount, 4)];

}

mPendingIdleHandlers=mIdleHandlers.toArray(mPendingIdleHandlers);

}

//Run the idle handlers.

//We only ever reach this code block during the first iteration.

for(inti = 0; i < pendingIdleHandlerCount; i++) {

finalIdleHandler idler =mPendingIdleHandlers[i];

mPendingIdleHandlers[i]=null;//release the reference to the handler

booleankeep =false;

try{

keep=idler.queueIdle();

}catch(Throwable t) {

Log.wtf("MessageQueue", "IdleHandler threw exception", t);

}

if(!keep) {

synchronized(this) {

mIdleHandlers.remove(idler);

}

}

}

//Reset the idle handler count to 0 so we do not run them again.

pendingIdleHandlerCount = 0;

//While calling an idle handler, a new message could have been delivered

//so go back and look again for a pending message without waiting.

nextPollTimeoutMillis = 0;

}

}

  MessageQueue.next起首会调用nativePollOnce,然后若是mQuiting为true就返回null,Looper就会退出消息轮回。

  接下来作废息队列头部的消息,若是头部消息是Barrier(target==null)就往后遍历找到第一个异步消息,接下来检测获取到的消息(消息队列头部的消息或者第一个异步消息),若是为null默示没有消息要履行,设置nextPollTimeoutMillis = -1;不然检测这个消息要履行的时候,若是到履行时候了就将这个消息markInUse并从消息队列移除,然后从next返回到loop;不然设置nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE),即间隔比来要履行的消息还须要多久,无论是当前消息队列没有消息可以履行(设置了Barrier并且没有异步消息或消息队列为空)还是队列头部的消息未到履行时候,都邑履行后面的代码,看有没有设置IdleHandler,若是有就运行IdleHandler,当IdleHandler被履行之后会设置nextPollTimeoutMillis = 0。

  起首看一下nativePollOnce,native办法,调用JNI,最后调到了Native Looper::pollOnce,并从Java层传进去了nextPollTimeMillis,即Java层的消息队列中履行时候比来的消息还要多久到履行时候。

intLooper::pollOnce(inttimeoutMillis,int* outFd,int* outEvents,void**outData) {

intresult =0;

for(;;) {

while(mResponseIndex <mResponses.size()) {

constResponse& response = mResponses.itemAt(mResponseIndex++);

intident =response.request.ident;

if(ident >=0) {

intfd =response.request.fd;

intevents =response.events;

void* data =response.request.data;

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - returning signalled identifier %d:"

"fd=%d, events=0 x%x, data=%p",

this, ident, fd, events, data);

#endif

if(outFd != NULL) *outFd =fd;

if(outEvents != NULL) *outEvents =events;

if(outData != NULL) *outData =data;

returnident;

}

}

if(result !=0) {

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - returning result %d",this, result);

#endif

if(outFd != NULL) *outFd =0;

if(outEvents != NULL) *outEvents =0;

if(outData != NULL) *outData =NULL;

returnresult;

}

result=pollInner(timeoutMillis);

}

}

  先不看开端的一大串代码,先看一下pollInner:

intLooper::pollInner(inttimeoutMillis) {

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d",this, timeoutMillis);

#endif

//Adjust the timeout based on when the next message is due.

if(timeoutMillis !=0&& mNextMessageUptime !=LLONG_MAX) {

nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);

intmessageTimeoutMillis =toMillisecondTimeoutDelay(now, mNextMessageUptime);

if(messageTimeoutMillis >=0

&& (timeoutMillis <0|| messageTimeoutMillis <timeoutMillis)) {

timeoutMillis=messageTimeoutMillis;

}

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",

this, mNextMessageUptime -now, timeoutMillis);

#endif

}

//Poll.

intresult =ALOOPER_POLL_WAKE;

mResponses.clear();

mResponseIndex=0;

structepoll_event eventItems[EPOLL_MAX_EVENTS];

int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

//Acquire lock.

mLock.lock();

//Check for poll error.

if(eventCount <0) {

if(errno ==EINTR) {

gotoDone;

}

ALOGW("Poll failed with an unexpected error, errno=%d", errno);

result=ALOOPER_POLL_ERROR;

gotoDone;

}

//Check for poll timeout.

if(eventCount ==0) {

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - timeout",this);

#endif

result=ALOOPER_POLL_TIMEOUT;

gotoDone;

}

//Handle all events.

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ pollOnce - handling events  %d fds",this, eventCount);

#endif

for(inti =0; i < eventCount; i++) {

intfd =eventItems[i].data.fd;

uint32_t epollEvents=eventItems[i].events;

if(fd ==mWakeReadPipeFd) {

if(epollEvents &EPOLLIN) {

awoken();

}else{

ALOGW("Ignoring unexpected epoll events 0 x%x on wake read pipe.", epollEvents);

}

}else{

ssize_t requestIndex=mRequests.indexOfKey(fd);

if(requestIndex >=0) {

intevents =0;

if(epollEvents & EPOLLIN) events |=ALOOPER_EVENT_INPUT;

if(epollEvents & EPOLLOUT) events |=ALOOPER_EVENT_OUTPUT;

if(epollEvents & EPOLLERR) events |=ALOOPER_EVENT_ERROR;

if(epollEvents & EPOLLHUP) events |=ALOOPER_EVENT_HANGUP;

pushResponse(events, mRequests.valueAt(requestIndex));

}else{

ALOGW("Ignoring unexpected epoll events 0 x%x on fd %d that is"

"no longer registered.", epollEvents, fd);

}

}

}

Done: ;

//Invoke pending message callbacks.

mNextMessageUptime =LLONG_MAX;

while(mMessageEnvelopes.size() !=0) {

nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);

constMessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);

if(messageEnvelope.uptime <=now) {

//Remove the envelope  the list.

//We keep a strong reference to the handler until the call to handleMessage

//finishes.  Then we drop it so that the handler can be d *before*

//we reacquire our lock.

{//obtain handler

sp<MessageHandler> handler =messageEnvelope.handler;

Message message=messageEnvelope.message;

mMessageEnvelopes.removeAt(0);

mSendingMessage=true;

mLock.unlock();

#ifDEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",

this, handler.get(), message.what);

#endif

handler->handleMessage(message);

}//release handler

mLock.lock();

mSendingMessage=false;

result=ALOOPER_POLL_CALLBACK;

}else{

//The last message left at the head of the queue determines the next wakeup time.

mNextMessageUptime =messageEnvelope.uptime;

break;

}

}

//Release lock.

mLock.unlock();

//Invoke all response callbacks.

for(size_t i =0; i < mResponses.size(); i++) {

Response& response =mResponses.editItemAt(i);

if(response.request.ident ==ALOOPER_POLL_CALLBACK) {

intfd =response.request.fd;

intevents =response.events;

void* data =response.request.data;

#ifDEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0 x%x, data=%p",

this, response.request.callback.get(), fd, events, data);

#endif

intcallbackResult = response.request.callback->handleEvent(fd, events, data);

if(callbackResult ==0) {

removeFd(fd);

}

//Clear the callback reference in the response structure promptly because we

//will not clear the response vector itself until the next poll.

response.request.callback.clear();

result=ALOOPER_POLL_CALLBACK;

}

}

returnresult;

}

  Java层的消息都保存在了Java层MessageQueue的成员mMessages中,Native层的消息都保存在了Native Looper的mMessageEnvelopes中,这就可以说有两个消息队列,并且都是按时候分列的。timeOutMillis默示Java层下个要履行的消息还要多久履行,mNextMessageUpdate默示Native层下个要履行的消息还要多久履行,若是timeOutMillis为0,epoll_wait不设置TimeOut直接返回;若是为-1申明Java层无消息直接用Native的time out;不然pollInner取这两个中的最小值作为timeOut调用epoll_wait。当epoll_wait返回时就可能有以下几种景象:

失足返回。

Time Out

正常返回,描述符上有事务产生。

若是是前两种景象直接goto DONE。

  不然就申明FD上有事务产生了,若是是mWakeReadPipeFd的EPOLLIN事务就调用awoken,若是不是mWakeReadPipeFd,那就是经由过程addFD添加的fd,在addFD中将要的fd及其events,callback,data封装成了Request对象,并以fd为键保存到了KeyedVector mRequests中,所以在这里就以fd为键获得在addFD时接洽关系的Request,并连同events经由过程pushResonse参加mResonse队列(Vector),Resonse仅是对events与Request的封装。若是是epoll_wait失足或timeout,就没有描述符上有事务,就不消履行这一段代码,所以直接goto DONE了。

voidLooper::pushResponse(intevents,constRequest&request) {

Response response;

response.events=events;

response.request=request;

mResponses.push(response);

}

  接下来进入DONE项目组,从mMessageEnvelopes取出头部的Native消息,若是达到了履行时候就调用它内部保存的MessageeHandler的handleMessage处理惩罚并从Native 消息队列移除,设置result为ALOOPER_POLL_CALLBACK,不然策画mNextMessageUptime默示Native消息队列下一次消息要履行的时候。若是未到头部消息的履行时候有可能是Java层消息队列消息的履行时候小于Native层消息队列头部消息的履行时候,达到了Java层消息的履行时候epoll_wait TimeOut返回了,或都经由过程addFd添加的描述符上有事务产生导致epoll_wait返回,或者epoll_wait是失足返回。Native消息是没有Barrier与Asynchronous的。

  最后,遍历mResponses(前面刚经由过程pushResponse存进去的),若是response.request.ident ==ALOOPER_POLL_CALLBACK,就调用注册的callback的handleEvent(fd, events, data)进行处理惩罚,然后从mResonses队列中移除,此次遍历完之后,mResponses中保存来来的就都是ident>=0并且callback为NULL的了。在NativeMessageQueue初始化Looper时传入了mAllowNonCallbacks为false,所以此次处理惩罚完后mResponses必然为空。

  接下来返回到pollOnce。pollOnce是一个for轮回,pollInner中处理惩罚了所有response.request.ident==ALOOPER_POLL_CALLBACK的Response,在第二次进入for轮回后若是mResponses不为空就可以找到ident>0的Response,将其ident作为返回值返回由调用pollOnce的函数本身处理惩罚,在这里我们是在NativeMessageQueue中调用的Loope的pollOnce,没对返回值进行处理惩罚,并且mAllowNonCallbacks为false也就不成能进入这个轮回。pollInner返回值不成能是0,或者说只可能是负数,所以pollOnce中的for轮回只会履行两次,在第二次就返回了。

  Native Looper可以零丁应用,也有一个prepare函数,这时mAllowNonCallbakcs值可能为true,pollOnce中对mResponses的处理惩罚就有意义了。

wake与awoken

  在Native Looper的机关函数中,经由过程pipe打开了一个管道,并用mWakeReadPipeFd与mWakeWritePipeFd分别保存了管道的读端与写端,然后用epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd,& eventItem)了读端的EPOLLIN事务,在pollInner中经由过程epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)读取事务,那是在什么时辰往mWakeWritePipeFd写,又是在什么时辰读的mWakeReadPipeFd呢?

  在Looper.cpp中我们可以发明如下两个函数:

voidLooper::wake() {

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ wake",this);

#endif

ssize_t nWrite;

do{

nWrite= write(mWakeWritePipeFd,"W",1);

}while(nWrite == -1&& errno ==EINTR);

if(nWrite !=1) {

if(errno !=EAGAIN) {

ALOGW("Could not write wake signal, errno=%d", errno);

}

}

}

voidLooper::awoken() {

#ifDEBUG_POLL_AND_WAKE

ALOGD("%p ~ awoken",this);

#endif

charbuffer[16];

ssize_t nRead;

do{

nRead= read(mWakeReadPipeFd, buffer,sizeof(buffer));

}while((nRead == -1&& errno == EINTR) || nRead ==sizeof(buffer));

}

  wake函数向mWakeWritePipeFd写入了一个“W”字符,awoken从mWakeReadPipeFd读,往mWakeWritePipeFd写数据只是为了在pollInner中的epoll_wait可以到事务返回。在pollInner也可以看到若是是mWakeReadPipeFd的EPOLLIN事务只是调用了awoken消费掉了写入的字符就往后处理惩罚了。

  那什么时辰调用wake呢?这个只要找到调用的处所解析一下就行了,先看Looper.cpp,在sendMessageAtTime即发送Native Message的时辰,按照发送的Message的履行时候查找mMessageEnvelopes策画应当插入的地位,若是是在头部插入,就调用wake唤醒epoll_wait,因为在进入pollInner时按照Java层消息队列头部消息的履行时候与Native层消息队列头部消息的履行时候策画出了一个timeout,若是这个新消息是在头部插入,申明履行时候至少在上述两个消息中的一个之前,所以应当唤醒epoll_wait,epoll_wait返回后,搜检Native消息队列,看头部消息即刚插入的消息是否到履行时候了,到了就履行,不然就可能须要设置新的timeout。同样在Java层的MessageQueue中,有一个函数nativeWake也同样可以经由过程JNI调用wake,调用nativeWake的机会与在Native调用wake的机会类似,在消息队列头部插入消息,还有一种景象就是,消息队列头部是一个Barrier,并且插入的消息是第一个异步消息。

if(p ==null|| when == 0 || when <p.when) {

//New head, wake up the event queue if blocked.

msg.next =p;

mMessages=msg;

needWake=mBlocked;

}else{

//Inserted within the middle of the queue.  Usually we don""t have to wake

//up the event queue unless there is a barrier at the head of the queue

//and the message is the earliest asynchronous message in the queue.

needWake = mBlocked && p.target ==null&& msg.isAsynchronous();//若是头部是Barrier并且新消息是异步消息则“有可能”须要唤醒

Message prev;

for(;;) {

prev=p;

p=p.next;

if(p ==null|| when <p.when) {

break;

}

if(needWake && p.isAsynchronous()) {//消息队列中有异步消息并且履行时候在新消息之前,所以不须要唤醒。

needWake =false;

}

}

msg.next= p;//invariant: p == prev.next

prev.next =msg;

}

  在头部插入消息不必然调用nativeWake,因为之前可能正在履行IdleHandler,若是履行了IdleHandler,就在IdleHandler履行后把nextPollTimeoutMillis设置为0,下次进入for轮回就用0调用nativePollOnce,不须要wake,只有在没有消息可以履行(消息队列为空或没到履行时候)并且没有设置IdleHandler时mBlocked才会为true。

  若是Java层的消息队列被Barrier Block住了并且当前插入的是一个异步消息有可能须要唤醒Looper,因为异步消息可以在Barrier下履行,然则这个异步消息必然如果履行时候最早的异步消息。

  退出Looper也须要wake,removeSyncBarrier时也可能须要。

Android消息处理惩罚机制(Handler、Looper、MessageQueue与Message)

时间: 2024-10-10 23:14:42

Android消息处理惩罚机制(Handler、Looper、MessageQueue与Message)的相关文章

Android多线程之图解Handler Looper MessageQueue Message

Android中的多线程可以有多种实现方式,前面我们已经讲过了封装程度较高异步任务(AnsyncTask),这一节我们来看看较为灵活的方式:Handler Looper MessageQueue Message. Message:用于线程之间传递信息,发送的消息放入目标线程的MessageQueue中. MessageQueue:用于简化线程之间的消息传递,MessageQueue接受发送端的Message,并作为消息处理端的输入源.每个线程只有一个实例. Handler:用于处理Message

转载:android笔记--android中的多线程--Handler, Looper, MessageQueue, Message类

什么时候使用多线程: 1. 耗时操作使用多线程, 耗时操作放在UI线程中会导致用户的操作无法得到响应. 2. 阻塞操作使用多线程, 理由同上. 3. 多核CUP的设备使用多线程, 可以有效提高CPU的利用率. 4. 并行操作使用多线程. android中的多线程模型主要涉及的类有:Looper, Handler, MessageQueue, Message等. 一:Looper类: 1 static final ThreadLocal<Looper> sThreadLocal = new Th

Android中的Handler, Looper, MessageQueue和Thread

Android中的Handler, Looper, MessageQueue和Thread 前几天,和同事探讨了一下Android中的消息机制,探究了消息的发送和接收过程以及与线程之间的关系.虽然我们经常使用这些基础的东西,但对于其内部原理的了解,能使我们更加容易.合理地架构系统,并避免一些低级错误. 对于这部分的内容,将分成4小节来描述: 1.职责与关系 2.消息循环 3.线程与更新 4.几点小结 ------------------------------------------------

Android异步处理三:Handler+Looper+MessageQueue深入详解

Android Loop&Handle学习总结 - New Start - 博客频道 - CSDN.NET ?????? 昨晚偷懒,这篇博客只写了一个标题,今天早晨一看,还有15的阅读量.实在是对不起那些同学.......换了是我,也会BS这样的LZ吧!sorry 啦 -------------------------------------------------------------------------------------------------------------------

解析Android的 消息传递机制Handler

1. 什么是Handler: Handler 网络释义"操纵者,管理者的"意思,在Android里面用于管理多线程对UI的操作: 2. 为什么会出现Handler: 在Android的设计机制里面,只允许主线程(一个程序第一次启动时所移动的线程,因为此线程主要是完成对UI相关事件的处理,所以也称UI线程) 对UI进行修改等操作,这是一种规则的简化,之所以这样简化是因为Android的UI操作时线程不安全的,为了避免多个线程同时操作UI造成线程安全 问题,才出现了这个简化的规则. 由此以

Android消息机制:Looper,MessageQueue,Message与handler

Android消息机制好多人都讲过,但是自己去翻源码的时候才能明白. 今天试着讲一下,因为目标是讲清楚整体逻辑,所以不追究细节. Message是消息机制的核心,所以从Message讲起. 1.Message是什么? 看一个从消息池中取出一个msg的方法: public static Message obtain(Handler h, int what, int arg1, int arg2, Object obj) { Message m = obtain(); m.target = h; m

Android消息传递源码理解。Handler,Looper,MessageQueue,Message

Android中消息传递模块差不多看了好几次,虽然每次看的方式都差不多但是还是发觉的到每次看了之后,理解的更清晰一点. 关于这个模块的文章数不胜数,但是最重要的还是自己动手理解一遍更好. 会牵扯到的几个类: Handler.java  , Looper.java , MessageQueue.java , Message.java 源代码路径: xxx/frameworks/base/core/java/android/os  看的过程中你会发现高版本和低版本系统代码有些地方比较大的差距.从中我

Android中的Handler, Looper, MessageQueue和Thread对应关系

1) 作用 Message:消息,其中包含了消息ID,消息处理对象以及处理的数据等,由MessageQueue统一列队,终由Handler处理. Handler:处理者,负责Message的发送及处理.使用Handler时,需要实现handleMessage(Message msg)方法来对特定的Message进行处理,例如更新UI等. MessageQueue:消息队列,用来存放Handler发送过来的消息,并按照FIFO规则执行.当然,存放Message并非实际意义的保存,而是将Messag

Android的消息机制Handler详解

Android的消息机制详解 Android的消息机制主要指 Handler 的运行机制,Handler的运行需要底层的MessageQueue 和 Looper 的支撑. MessageQueue:消息队列,它的内部存储了一组消息,以队列的形式对外提供插入和删除的工作,其内部存储结构采用单链表的数据结构来存储消息列表. Looper:可理解为消息循环. 由于MessageQueue只是一个消息存储单元,不能去处理消息,而Looper会以无限循环的形式去查找是否有新的消息,如果有的话就处理,否则