线程消息通信与异步处理

转载请标明出处:

http://blog.csdn.net/yujun411522/article/details/46444869

本文出自:【yujun411522的博客】

关于android内消息通信和handler的知识在之前的Handler中已经简要介绍过了,这里介绍在native层的实现机制。

相信大家都知道一个标准的Looper线程的写法:

public MyLooperThread extends Thread{
     private Handler mHandler;
     public void run(){
     //在实例化handler之前一定要先调用Looper.prepare()方法
     //1 Looper.prepare()方法
     Looper.prepare();
     //2 实例化handler
     //mHandler = new Handler(){
               public void handleMessage(Message msg){
                         //处理msg操作
                    }

          };
     //3 Looper.loop()不断从消息队列中取出来信息
     Looper.loop();
     }
}

典型的三个操作:

1 调用Looper.prepare方法,进入准备阶段。

2 实例化Handler对象。

3 调用Looper.loop方法,进入循环。

注意,三者的顺序不能有错,下面会介绍

7.1 Looper.prepare进入准备阶段

先看Looper类中prepare方法:

public class Looper {
   // sThreadLocal.get() will return null unless you've called prepare().
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    final MessageQueue mQueue;//内部维护一个消息队列
    final Thread mThread;//对应的线程
    volatile boolean mRun;//运行状态
    private static Looper mMainLooper = null;  // guarded by Looper.class,主线程的Looper

   public static void prepare() {
        if (sThreadLocal.get() != null) {//之前已经设置过Looper对象了,报异常,说明只能设置一次
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());//为sThreadLocal变量设置Looper,这个Looper必须有,且只能有一个,不能重复设置
    }
}

sThreadLocal 是一个用来存储当前线程状态的一个成员变量,存储范围为线程内部。在调用prepare方法时,如果之前线程设置Looper对象就报异常,如果没有设置Looper就为sThreadLocal设置一个new Looper()对象,再看Looper的构造函数:

private Looper() {//私有方法,不允许外部访问它
        mQueue = new MessageQueue();//构造一个MessageQueue对象,Looper中维护了一个消息队列,这一点很重要
        mRun = true;//设置mRun为true
        mThread = Thread.currentThread();//mThread设置为当前线程
    }

其中重要的就是MessageQueue消息队列的创建,看它的构造方法:

 MessageQueue() {
        nativeInit();//这是一个本地方法
    }

nativeInit是一个本地方法,对应的实现方法在android_os_MessageQueue.cpp文件中

先看NativeMessageQueue类:

class NativeMessageQueue {
public:
    NativeMessageQueue();
    ~NativeMessageQueue();

    inline sp<Looper> getLooper() { return mLooper; }//返回该Looper变量,这个Looper类是native中的Looper类,不是java层的

    void pollOnce(int timeoutMillis);//后面会分析,进行询问
    void wake();//后面会分析,向管道写入"W"

private:
    sp<Looper> mLooper;//维护一个Native层的Looper变量
};

再来看MessageQueue中nativeInit方法中的的实现android_os_MessageQueue_nativeInit函数:

static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
     //obj为java层的MessageQueue对象
     //1 创建一个NativeMessageQueue对象,这个是native层的
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (! nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return;
    }
    //2 将NativeMessageQueue和 java层的MessageQueue对象 关联起来,实际就是讲nativeMessagQueue对象保存到MessageQueue中的mPtr变量中
    android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}

两个主要工作: 1 创建NativeMessageQueue对象 ;2 将java层的MessageQueue对象和NativeMessageQueue关联起来

7.1.1 创建NativeMessageQueue对象

看NativeMessageQueue的构造函数:

NativeMessageQueue::NativeMessageQueue() {
    mLooper = Looper::getForThread();//调用getForThread,看返回值是否为null
    if (mLooper == NULL) {//如果返回值为null,则创建一个Looper并设置
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);//
    }
}

先看Looper::getForThread函数是否为null,如果为null,构造一个Looper对象,并调用setForThread设置为线程唯一的一个Looper,这一部分工作和java层中的Looper.prepare()方法相同。那么再看一下native层的Looper对象是如何实例化的,看它的构造函数:

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    //创建一个管道,读写端分别为wakeFds[0]和wakeFds[1]
    int result = pipe(wakeFds); 

    mWakeReadPipeFd = wakeFds[0];//读端
    mWakeWritePipeFd = wakeFds[1];//写端

    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置非阻塞方式
    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置非阻塞方式 

    mEpollFd = epoll_create(EPOLL_SIZE_HINT);//监听管道
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);//只监听mWakeReadPipeFd ,也就是读端
    }

7.1.2 将java层的MessageQueue对象和NativeMessageQueue关联起来

关联工作由android_os_MessageQueue_setNativeMessageQueue函数来完成:

static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
        NativeMessageQueue* nativeMessageQueue) {
    env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
             reinterpret_cast<jint>(nativeMessageQueue));
}

实际就是将nativeMessageQueue对象的地址复制给java层MessageQueue中的gMessageQueueClassInfo.mPtr变量,那么gMessageQueueClassInfo中mPtr是什么:

static struct {
    jfieldID mPtr;   // native object attached to the DVM MessageQueue
} gMessageQueueClassInfo;

它的实例化在register_android_os_MessageQueue函数中:

int register_android_os_MessageQueue(JNIEnv* env) {
    int res = jniRegisterNativeMethods(env, "android/os/MessageQueue",
            gMessageQueueMethods, NELEM(gMessageQueueMethods));

    jclass clazz;
    FIND_CLASS(clazz, "android/os/MessageQueue");
    //mPtre指向Java层中MessageQueue的mPtr成员变量
    GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz,"mPtr", "I");
    return 0;
}

绑定的结果就是在java层中的MessageQueue的mPtr保存了NativeMessageQueue变量的地址,这样就可以通过MessageQueue访问NativeMessageQueue了,到此为止java层MessageQueue构造完成。

可以看出Java层Looper.prepare()函数的作用就是在Looper中持有一个MessageQueue和当前线程的引用,而MessageQueue中的mPtr指向native层的NativeMessageQueue对象,NativeMessageQueue对象中又保存了线程唯一的一个native Looper变量,四者之间的关系:

7.2 Handler的创建和发送消息

Looper是用来驱动消息的,而Looper的消息从哪里来,Handler的发送;Looper的消息哪里去,Handler的处理。所以Handler的作用就是发送和处理消息.

7.2.1 Handler的创建

看没有参数的构造方法:

 public Handler() {
         ....
        //返回当前线程的Looper对象
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;//将Looper中的mQueue赋值给Handler的成员变量mQueue
        mCallback = null;
    }

它的构造中有一点注意,先通过Looper.myLooper方法获取当前线程的Looper对象,如果为null报异常,所以在创建创建Handler之前一定要先调用Looper.prepare方法。

7.2.2 创建消息

这一部分参看Handler机制

7.2.3 消息发送

1 java层

无论是通过post还是send都最终通过sendMessageAtTime函数来实现

 public boolean sendMessageAtTime(Message msg, long uptimeMillis)//uptimeMillis 绝对时间,开机时间作为基准
    {
        boolean sent = false;
        MessageQueue queue = mQueue;//looper中创建的MessageQueue
        if (queue != null) {
            msg.target = this;//指明msg的处理者为this
            sent = queue.enqueueMessage(msg, uptimeMillis);//调用MessageQueue  的enqueueMessage 方法
        }
        else {
          .....
        }
        return sent;
    }

调用MessageQueue  的enqueueMessage 方法

 final boolean enqueueMessage(Message msg, long when) {
        ....
        final boolean needWake;
        synchronized (this) {
            if (mQuiting) {
              return false;//handler所在线程正在退出
            } else if (msg.target == null) {
                mQuiting = true;
            }

            msg.when = when;//指明消息何时处理
            //mMessages 消息队列头部,p为当前消息
            Message p = mMessages;
             // 消息队列为空或者新消息需要立马处理或者新消息处理时间早于队首消息的处理时间
            if (p == null || when == 0 || when < p.when) {//如果是这些情况,需要将新消息插入到队首立马处理
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked; // new head, might need to wake up
            } else {//否则找到合适的位置插入,按照时间的顺序
                Message prev = null;
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
                msg.next = prev.next;
                prev.next = msg;
                needWake = false; // still waiting on head, no need to wake up
            }
        }
        if (needWake) {//新加入消息队列,且处于block状态,需要唤醒
            nativeWake(mPtr);//本地方法,mPtr 就是NativeMessageQueue的native地址
        }
        return true;
    }

新加入消息队列,且处于block状态,需要唤醒,调用本地方法nativeWake,mPtr 就是NativeMessageQueue的native地址,对应的native实现在android_os_MessageQueue.cpp文件中的android_os_MessageQueue_nativeWake方法:

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
     //ptr 就是NativeMessageQueue的地址
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    return nativeMessageQueue->wake();
}

继续调用nativeMessageQueue的wake方法:

void NativeMessageQueue::wake() {
    mLooper->wake();
}

调用mLooper的wake方法,在Looper.cpp文件中:

void Looper::wake() {

   if (mPendingWakeCount++ == 0) {
        mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
    }

   ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1);//向mWakeWritePipeFd 写入一个"W"
    } while (nWrite == -1 && errno == EINTR);

    if (nWrite != 1) {
        if (errno != EAGAIN) {
            LOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

还记得native层创建的Looper对象时创建的管道吗?mWakeWritePipeFd 就是管道的写端,一旦写入了"W"字符,处理消息的线程就会被I/O唤醒。

2 native层

native层可以发送消息,但不是像java层中通过handler来发送,而是通过native层的Looper对象来发送,最终都是通过sendMessageAtTime方法来实现,在Looper.cpp文件中:

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
        const Message& message) { 

    size_t i = 0;
    {
        AutoMutex _l(mLock);
         //native层消息队列的大小
        size_t messageCount = mMessageEnvelopes.size();
         //按照时间先后顺序找到它插入的位置
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }
        //利用MessageEnvelope  ,将发送时间,messageHandler和消息封装在MessageEnvelope  中
        MessageEnvelope messageEnvelope(uptime, handler, message);
        //插入到native消息队列中
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

        if (mSendingMessage) {
            return;
        }
    }  

    if (i == 0) {
        wake();//和java层调用nativeWake 方法效果一样,向管道写端写入一个"W"
    }
}

native层发送消息的过程是先找到插入的位置,然后将message,handler,发送时间等信息封装到MessageEnvelope中,最后插入到消息队列的合适位置之中。

其中涉及到一个结构体:MessageEnvelope:

struct MessageEnvelope {
        MessageEnvelope() : uptime(0) { }

        MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,
                const Message& message) : uptime(uptime), handler(handler), message(message) {
        }

        nsecs_t uptime; //执行事件
        sp<MessageHandler> handler;//执行handler
        Message message;//消息
    };

到目前为止,消息已经发送成功了,接下来就是取出消息并处理。

7.3 Looper.loop 循环处理消息

看Looper.looper方法的实现:

  public static void loop() {
        Looper me = myLooper();
        if (me == null) {//可以看出,必须在设置当前线程的Looper之后才能执行loop方法
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        MessageQueue queue = me.mQueue;

        .....
        while (true) {
            //1 从MessageQueue中取出消息
            Message msg = queue.next(); // might block
            if (msg != null) {
                if (msg.target == null) {
                 return;//没有消息时退出
                }
                .....
                //派发消息
                msg.target.dispatchMessage(msg);

                ....
               //回收消息
                msg.recycle();
            }
        }
    }

重要的操作就两个,最重要的第一步从messageQuene中取出消息;2 派发该消息处理函数

7.3.1 messageQuene.next取出合适的消息

final Message next() {
        int nextPollTimeoutMillis = 0;//下一次poll的时间,0表示马上进行
        for (;;) {
             ...
            //本地方法nativePollOnce
            nativePollOnce(mPtr, nextPollTimeoutMillis);

            synchronized (this) {//同步
                final long now = SystemClock.uptimeMillis();
                final Message msg = mMessages;//队首消息
                if (msg != null) {
                    final long when = msg.when;//队首消息的执行事件 ,这里假如是4s,now为5s
                    if (now >= when) {//说明队首消息已经到了或者过了执行事件,所以立马执行
                        mBlocked = false;
                        mMessages = msg.next;//取出队首消息,返回
                        msg.next = null;

                        msg.markInUse();//标记正在使用
                        return msg;
                    } else {
                        //否则,假如队首消息执行时间when为10s,now为5s,则过when-now= 5s之后再询问

                        nextPollTimeoutMillis = (int) Math.min(when - now, Integer.MAX_VALUE);
                    }
                } else {
                    //消息队列中没有消息
                    nextPollTimeoutMillis = -1;
                }
            }
        }
    }

这里的关键部分是nativePollOnce本地方法,它的实现在android_os_MessageQueue.cpp中:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jint ptr, jint timeoutMillis) {
    //其中ptr就是NativeMessageQueue对象地址,timeOutMills就是经过多长时间询问
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(timeoutMillis);
}

可以看出调用NativeMessageQueue的pollOnce方法

void NativeMessageQueue::pollOnce(int timeoutMillis) {
    mLooper->pollOnce(timeoutMillis);
}

又调用Looper的pollOnce方法:

 inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, NULL, NULL, NULL);
    }
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
     //timeoutMillis, NULL, NULL, NULL 参数
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            ALooper_callbackFunc callback = response.request.callback;
            if (!callback) {
                int ident = response.request.ident;
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data; 

                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = NULL;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        //执行pollInner方法
        result = pollInner(timeoutMillis);
    }
}

进一步调用了pollInner方法:

int Looper::pollInner(int timeoutMillis) {
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // Wait for wakeAndLock() waiters to run then set mPolling to true.
    mLock.lock();
    while (mWaiters != 0) {
        mResume.wait(mLock);
    }
    mPolling = true;
    mLock.unlock();

    size_t requestedCount = mRequestedFds.size();
    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);//监控mRequestedFds 上的时间 

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();//调用awoken方法
            } else {
                  ....
            }
        } else {
          .....
        }
    }
Done: ;

    ...
    return result;
}

这里调用的awoken方法:

void Looper::awoken() {
    char buffer[16];
    ssize_t nRead;
    do {
         //读管道写端写入的"W"
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

7.3.2 msg.target.dispatchMessage(msg)分发消息

这部分参看Handler机制的实现

7.4 AsyncTask

这部分参看AsyncTask源码分析

时间: 2024-10-11 13:38:38

线程消息通信与异步处理的相关文章

Net线程间通信的异步机制

线程间通信 我们看下面的图 我们来看线程间通信的原理:线程(Thread B)和线程(Thread A)通信, 首先线程A 必须实现同步上下文对象(Synchronization Context), 线程B通过调用线程A的同步上下文对象来访问线程A,所有实现都是在同步上下文中完成的.线程B有两种方式来实现线程间的通信. 第一种:调用线程A的同步上下文对象,阻碍当前线程,执行红色箭头调用,直到黄色箭头返回(同步上下文执行完毕)才释放当前线程. (1->2->3->5) 第二种:调用线程A的

Android线程消息通信(二)

创建线程消息队列 Android应用程序的消息队列是使用一个MessageQueue对象来描述的,它可以通过调用Looper类的静态成员函数prepareMainLooper或者prepare来创建,其中,前者用来为应用程序的主线程创建消息队列:而后者用来为应用程序的其它子线程创建消息队列. 在分析Android应用程序线程的消息队列的创建过程之前,先要了解一下Looper类和MessageQueue类的实现. Looper类源代码: public final class Looper { pr

Windows 线程间消息通信

使用消息(message)是线程见通信的常用方法之一.Windows也提供了许多函数来实现这一点.主要使用的函数有PostThreadMessage(),  PeekMessage(), GetMessage() 发消息: 一般消息都是和窗口(window)联系在一起的.对于没有窗口的线程, windows提供了专门的发消息函数PostThreadMessage(). 该函数把PostMessage()里的窗口句柄参数换成了目标线程ID.线程ID在线程创建过程中可以通过参数传递出来,也可以用Ge

ZeroMQ——一个轻量级的消息通信组件

ZeroMQ是一个轻量级的消息通信组件,尽管名字中包含了"MQ",严格上来讲ZeroMQ并不是"消息队列/消息中间件".ZeroMQ是一个传输层API库, 更关注消息的传输.与消息队列相比,ZeroMQ有以下一些特点: 点对点无中间节点 传统的消息队列都需要一个消息服务器来存储转发消息.而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致.以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上.ZeroMQ能缓存消息,但是是

Android中线程间通信原理分析:Looper,MessageQueue,Handler

自问自答的两个问题 在我们去讨论Handler,Looper,MessageQueue的关系之前,我们需要先问两个问题: 1.这一套东西搞出来是为了解决什么问题呢? 2.如果让我们来解决这个问题该怎么做? 以上者两个问题,是我最近总结出来的,在我们学习了解一个新的技术之前,最好是先能回答这两个问题,这样你才能对你正在学习的东西有更深刻的认识. 第一个问题:google的程序员们搞出这一套东西是为了解决什么问题的?这个问题很显而易见,为了解决线程间通信的问题.我们都知道,Android的UI/Vi

进程间通信与线程间通信

序 今天被问及进程间通信的问题,发现自己了解的并不够,所以,对此好好总结一番~ 操作系统的主要任务是管理计算机的软件.硬件资源.现代操作系统的主要特点是多用户和多任务,也就是程序的并行执行,windows如此linux也是如此.所以操作系统就借助于进程来管理计算机的软.硬件资源,支持多任务的并行执行.要并行执行就需要多进程.多线程.因此多进程和多线程间为了完成一定的任务,就需要进行一定的通信.而线程间通信又和进程间的通信不同.由于进程的数据空间相对独立而线程是共享数据空间的,彼此通信机制也很不同

消息通信库ZeroMQ 4.0.4安装指南

消息通信库ZeroMQ 4.0.4安装指南 一.ZeroMQ介绍 ZeroMQ是一个开源的消息队列系统,按照官方的定义,它是一个消息通信库,帮助开发者设计分布式和并行的应用程序. 首先,我们需要明白,ZeroMQ不是传统的消息队列系统(比如ActiveMQ.WebSphereMQ.RabbitMQ等).ZeroMQ可以帮助我们建立自己的消息队列系统,它只是一个库.ZeroMQ可以运行于带x86处理器或ARM处理器的机器上,支持40多种编程语言. 消息队列,从技术的角度来讲,是以先进先出FIFO算

ZeroMQ——一个轻量级的消息通信组件 C#

ZeroMQ——一个轻量级的消息通信组件 ZeroMQ是一个轻量级的消息通信组件,尽管名字中包含了"MQ",严格上来讲ZeroMQ并不是"消息队列/消息中间件".ZeroMQ是一个传输层API库, 更关注消息的传输.与消息队列相比,ZeroMQ有以下一些特点: 点对点无中间节点 传统的消息队列都需要一个消息服务器来存储转发消息.而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致.以为消息服务器最终还是转化为服务器对其他节点的点对点

进程 线程 多线程 并发 同步异步

进程 线程 多线程 并发 同步异步 很多人对进程,线程,多线程,并发,同步,异步等概念感到困惑,这都是大学没好好听课的缘故啊.咱在这里帮感到概念给感到困惑的同学复习下. 程序 程序用来描述计算机所完成的独立功能,并在时间上严格地按前后次序相继地进行计算机操作序列集合,是一个静态概念. 进程 并发执行的程序在执行过程中分配和管理资源的基本单位.是一个动态的执行过程. 进程的静态描述 进程控制块PCB 有关程序段 该程序员对齐进行操作的数据结构集 进程控制块PCB 进程控制块PCB是系统管制进程存在