Chromium线程模型、消息循环

多线程的麻烦

多线程编程是一件麻烦的事,相信很多人深有体会。执行顺序的不确定性,资源的并发访问一直困扰着众多程序员。解决多线程编程问题的方法分为两类:一是对并发访问的资源直接加锁;二是避免并发访问资源;Chromium采用第二种思想来设计多线程模型,通过在线程之间传递消息来实现跨进程通讯。

设计原则

Chromium希望尽量保持UI处于响应状态。为此遵循如下设计原则:

1 不在UI线程上执行任何阻塞I/O操作,以及其它耗时操作。

2 少用锁和线程安全对象

3 避免阻塞I/O线程

4 线程之间不要互相阻塞

5 在数据准备好更新到共享缓冲时才用锁(在准备数据期间不要用锁)

Chromium在程序启动时创建了很多预定用途的线程,我们要尽量使用已有线程,不要创建新的线程。为了实现非阻塞代码,Chromium的很多API都是异步的。

MessageLoop

Chromium将消息循环抽象为MessageLoop类,每个线程都有(只有)一个MessageLoop实例。MessageLoop提供了PostTask系列方法允许向特定线程添加任务。MessageLoop会安装“先进先出”的顺序执行任务,直到收到MessageLoop:Quit消息,消息循环才会退出。

MessageLoop类型

每个MessageLoop都有一个类型,所有类型的MessageLoop都能够处理tasks和timers,类型指定了MessageLoop除了处理tasks和timers外,还可以处理的消息类型。

TYPE_DEFAULT:默认的消息循环,只能处理异步任务和定时器timers

TYPE_UI:除了可以处理异步任何和定时器timers,还能够处理系统UI事件。主线程使用的就是该类型的MessageLoop。

TYPE_IO:支持处理异步I/O事件,Chromium创建的所有处理IPC消息的IO线程创建的MessageLoop都是这种类型。

TYPE_JAVA:只有Android支持,后端实现是Java层的消息处理器。它的行为和TYPE_UI一样,只是创建时不使用主线程上的MessagePump工厂方法。

TYPE_CUSTOM:构建时提供MessagePump

MessagePump::Delegate

Delegate定义了一组接口,由MessageLoop实现,MessagePump通过这组接口来触发MessageLoop执行特定的任务。

MessagePump

MessagePump用来从系统获取消息回调,触发MessageLoop执行Task类。不同类型的MessageLoop都有一个相对应的MessagePump,MessagePump的实现与平台相关。

MessageLoop, MessagePump, Delegate关系

MessageLoop具体的实现和平台相关,即使在相同的平台上,由于使用的事件处理库不同,其实现方式也可能不同。Chromium将平台相关的实现封装在MessagePump中。MessagePump的具体实现提供了平台相关的异步事件处理,而MessageLoop提供了轮询和调度异步任务的基本框架,两者通过MessagePump::Delegate抽象接口关联起来。主要类及它们的关系:

线程:创建到运行

下面以Browser Threads为例,来看看线程从创建到执行的过程。

内核初始化时调用BrowserMainLoop::CreateStartupTasks()创建一些启动任务,其中包括“创建Browser线程”的任务,然后调用startup_task_runner_执行启动任务:

StartupTask create_threads =
base::Bind(&BrowserMainLoop::CreateThreads, base::Unretained(this));
startup_task_runner_->AddTask(create_threads);

执行任务时,调用create_threads(),最终会调用系统API创建一个线程,并传入Thread.ThreadMain作为入口函数。线程创建后,就开始执行ThreadMain,该函数首先创建一个MessageLoop,然后开始执行消息循环,直到收到Quit消息。

向线程添加任务

任务最终通过调用MessageLoop:: PostTask*()把任务抛到MessageLoop执行。每个MessageLoop实例都属于一个线程,我们首先要知道需要哪个线程来执行任务。我们必定需要在某个地方维护着可用的线程的信息,例如:名称与线程ID的对应关系,这样我们可以通过一个名称来标识需要执行给定任务的线程。BrowserThread类提供了一组静态方法PostTask*来向Browser Threads添加任务:

staticbool PostTask(ID identifier,
                     consttracked_objects::Location& from_here,
                     constbase::Closure& task);
staticbool PostDelayedTask(ID identifier,
                            consttracked_objects::Location& from_here,
                            constbase::Closure& task,
                            base::TimeDelta delay);
staticbool PostNonNestableTask(ID identifier,
                                consttracked_objects::Location& from_here,
                                constbase::Closure& task);

第一个参数是一个线程的标识符,定义如下:

enumID {
    // The main thread in the browser.
    UI,
    // This is the thread that interacts with the database.
    DB,
    // This is the thread that interacts with the file system.
    FILE,
    // Used for file system operations that block user interactions.
    // Responsiveness of this thread affect users.
    FILE_USER_BLOCKING,
    // Used to launch and terminate Chrome processes.
    PROCESS_LAUNCHER,
    // This is the thread to handle slow HTTP cache operations.
    CACHE,
    // This is the thread that processes non-blocking IO, i.e. IPC and network.
    // Blocking IO should happen on other threads like DB, FILE,
    // FILE_USER_BLOCKING and CACHE depending on the usage.
    IO,
    // NOTE: do not add new threads here that are only used by a small number of
    // files. Instead you should just use a Thread class and pass its
    // MessageLoopProxy around. Named threads there are only for threads that
    // are used in many places.
    // This identifier does not represent a thread.  Instead it counts the
    // number of well-known threads.  Insert new well-known threads before this
    // identifier.
    ID_COUNT
  };

向DB线程添加一个任务就可以采用类似下面这样的代码:

BrowserThread::PostTask(
    BrowserThread::DB, FROM_HERE,
        base::Bind(
            &GetUrlThumbnailTask, url_string, top_sites,
            base::Owned(j_callback), lookup_success_callback,
            lookup_failed_callback));

BrowserThread维护着一个全局变量g_globals,它的类型是BrowserThreadGlobals,定义如下:

structBrowserThreadGlobals {
  BrowserThreadGlobals()
      : blocking_pool(newbase::SequencedWorkerPool(3, "BrowserBlocking")) {
    memset(threads, 0, BrowserThread::ID_COUNT * sizeof(threads[0]));
    memset(thread_delegates, 0,
           BrowserThread::ID_COUNT * sizeof(thread_delegates[0]));
  }
  // This lock protects |threads|. Do not read or modify that array
  // without holding this lock. Do not block while holding this lock.
  base::Lock lock;
  // This array is protected by |lock|. The threads are not owned by this
  // array. Typically, the threads are owned on the UI thread by
  // BrowserMainLoop. BrowserThreadImpl objects remove themselves from this
  // array upon destruction.
  BrowserThreadImpl* threads[BrowserThread::ID_COUNT];
  // Only atomic operations are used on this array. The delegates are not owned
  // by this array, rather by whoever calls BrowserThread::SetDelegate.
  BrowserThreadDelegate* thread_delegates[BrowserThread::ID_COUNT];
  constscoped_refptr<base::SequencedWorkerPool> blocking_pool;
};
base::LazyInstance<BrowserThreadGlobals>::Leaky
    g_globals = LAZY_INSTANCE_INITIALIZER;
} // namespace

其中的threads数组保存了创建的BrowserThread实例的指针。在初始化的时候会给threads赋值:

voidBrowserThreadImpl::Initialize() {
  BrowserThreadGlobals& globals = g_globals.Get();
  base::AutoLock lock(globals.lock);
  DCHECK(identifier_ >= 0 && identifier_ < ID_COUNT);
  DCHECK(globals.threads[identifier_] == NULL);
  globals.threads[identifier_] = this;
}

PostTask()通过调用PostTaskHelper()来添加任务,其内部首先取得g_globals的值,然后根据传入的标识(例如:BrowserThread:: DB)找到对应的thread对象,就可以取得对象的message_loop,最后调用message_loop的PostTask方法把任务交由相应线程执行。

BrowserThreadGlobals& globals = g_globals.Get();
  if(!target_thread_outlives_current)
    globals.lock.Acquire();
  base::MessageLoop* message_loop =
      globals.threads[identifier] ? globals.threads[identifier]->message_loop()
                                  : NULL;
  if(message_loop) {
    if(nestable) {
      message_loop->PostDelayedTask(from_here, task, delay);
    }else {
      message_loop->PostNonNestableDelayedTask(from_here, task, delay);
    }
  }

消息循环工作原理

MessageLoop实际上就是一个循环,不断从任务队列中取出任务,并执行,那么当所有任务都执行完毕之后呢?最直接的做法是,采用忙等待,不断检查任务队列中是否有新的任务。Chromium当然不会采用这么拙劣的方法。以MessagePumpDefault的实现为例,当所有任务执行完毕之后,会执行event对象的wait()函数,等待事件或信号唤醒继续循环执行。

ThreadRestrictions::ScopedAllowWait allow_wait;
if(delayed_work_time_.is_null()) {
  event_.Wait();
}else {
  TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
  if(delay > TimeDelta()) {
    event_.TimedWait(delay);
  }else {
    // It looks like delayed_work_time_ indicates a time in the past, so we
    // need to call DoDelayedWork now.
    delayed_work_time_ = TimeTicks();
  }
}

唤醒的信号是怎么发送的呢?我们来看PostTask*的执行过程:

添加任务到队列时,如果发现任务队列为空,就会调用ScheduleWork启动消息循环,ScheduleWork具体的实现与采用的系统,以及采用的事件模型有关。还是以MessagePumpDefault为例,它的实现如下:

boolIncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
  ……
  boolwas_empty = incoming_queue_.empty();
  incoming_queue_.push(*pending_task);
  pending_task->task.Reset();
  if(always_schedule_work_ || (!message_loop_scheduled_ && was_empty)) {
    // Wake up the message loop.
    message_loop_->ScheduleWork();
    // After we've scheduled the message loop, we do not need to do so again
    // until we know it has processed all of the work in our queue and is
    // waiting for more work again. The message loop will always attempt to
    // reload from the incoming queue before waiting again so we clear this flag
    // in ReloadWorkQueue().
    message_loop_scheduled_ = true;
  }
  returntrue;
}
voidMessagePumpDefault::ScheduleWork() {
  // Since this can be called on any thread, we need to ensure that our Run
  // loop wakes up.
  event_.Signal();
}

为了减少锁的使用和锁的范围,Chromium采用了一个比较巧妙的方法:简单来讲,MessageLoop维护有两个队列,一个work_queue,一个incoming_queue。消息循环不断从work_queue取任务并执行,新加入任务放入incoming_queue。当work_queue中的任务都执行完后,再把incoming_queue拷贝到work_queue(需要加锁)。这样避免了每执行一个任务都要去加锁。

参考

理解WebKit和Chromium: 消息循环(Message Loop)

Chromium on Android: Android系统上Chromium主消息循环的实现分析

Chrome学习笔记(一):线程模型,消息循环

时间: 2024-08-27 07:20:18

Chromium线程模型、消息循环的相关文章

Android应用程序线程消息循环模型分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6905587 我们知道,Android应用程序是通过消息来驱动的,即在应用程序的主线程(UI线程)中有一个消息循环,负责处理消息队列中的消息.我们也知 道,Android应用程序是支持多线程的,即可以创建子线程来执行一些计算型的任务,那么,这些子线程能不能像应用程序的主线程一样具有消息循环呢?这 些子线程又能不能往应用程序的主线程中发送消息呢?本

Chromium on Android: Android系统上Chromium主消息循环的实现分析

摘要:刚一开始接触Chromium on Android时,就很好奇Chromium的主消息循环是怎么整合到Android应用程序中的.对于Android程序来说,一旦启动,主线程就会有一个Java层的消息循环处理用户输入事件等系统事件,而对Chromium来说,它有自己另一套消息循环的实现,这个实现有哪些特点,又将如何无缝整合到Android Java层的消息循环中去,正是本文所要讨论的话题. 原创文章系列,转载请注明原始出处为http://blog.csdn.net/hongbomin/ar

Chromium on Android: Android在系统Chromium为了实现主消息循环分析

总结:刚开始接触一个Chromium on Android时间.很好奇Chromium主消息循环是如何整合Android应用. 为Android计划,一旦启动,主线程将具有Java消息层循环处理系统事件,如用户输入事件,而Chromium为,己还有一套消息循环的实现,这个实现有哪些特点.又将怎样无缝整合到Android Java层的消息循环中去,正是本文所要讨论的话题. 原创文章系列.转载请注明原始出处为http://blog.csdn.net/hongbomin/article/details

TMsgThread, TCommThread -- 在delphi线程中实现消息循环(105篇博客,好多研究消息的文章)

在delphi线程中实现消息循环 在delphi线程中实现消息循环 Delphi的TThread类使用很方便,但是有时候我们需要在线程类中使用消息循环,delphi没有提供. 花了两天的事件研究了一下win32的消息系统,写了一个线程内消息循环的测试. 但是没有具体应用过,贴出来给有这方面需求的DFW参考一下.希望大家和我讨论. {----------------------------------------------------------------------------- Unit

在delphi线程中实现消息循环

http://delphi.cjcsoft.net//viewthread.php?tid=635 在delphi线程中实现消息循环 Delphi的TThread类使用很方便,但是有时候我们需要在线程类中使用消息循环,delphi没有提供. 花了两天的事件研究了一下win32的消息系统,写了一个线程内消息循环的测试. 但是没有具体应用过,贴出来给有这方面需求的DFW参考一下.希望大家和我讨论. {--------------------------------------------------

Chromium的IPC消息发送、接收和分发机制分析

由于Chromium采用多进程架构,因此会涉及到进程间通信问题.通过前面一文的学习,我们知道Browser进程在启动Render进程的过程中会建立一个以UNIX Socket为基础的IPC通道.有了IPC通道之后,接下来Browser进程与Render进程就以消息的形式进行通信.我们将这种消息称为IPC消息,以区别于线程消息循环中的消息.本文就分析Chromium的IPC消息发送.接收和分发机制. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! Chrom

IOS - RunLoop消息循环

什么是RunLoop? -RunLoop就是消息循环,每一个线程内部都有一个消息循环. -只有主线程的消息循环默认开启,子线程的消息循环默认不开启. RunLoop的目的 -保证程序不退出 . -负责处理输入事件.  -如果没有事件发生,会让程序进入休眠状态  . 事件类型 Input Sources (输入源) & Timer Sources (定时源) -输入源可以是键盘鼠标,NSPort, NSConnection 等对象,定时源是NSTimer 事件 添加消息到循环中 -创建输入源.(以

System、应用程序进程的Binder线程池和Handler消息循环

首先看一张Android系统启动流程图: 一个进程最重要的两项指标一个是启动了Binder线程池,也就是能够进程Binder进程间通信了.还有一个是启动了Handler消息循环,能够使用了消息循环机制. 1.那么systemserver进程是什么时候实现上面两个机制的呢?见代码: 启动了Binder线程池.是子线程池. public static final void zygoteInit(String[] argv) throws ZygoteInit.MethodAndArgsCaller

Android开发实践:自定义带消息循环(Looper)的工作线程

上一篇文章提到了Android系统的UI线程是一种带消息循环(Looper)机制的线程,同时Android也提供了封装有消息循环(Looper)的HandlerThread类,这种线程,可以绑定Handler()对象,并通过Handler的sendMessage()函数向线程发送消息,通过handleMessage()函数,处理线程接收到的消息.这么说比较抽象,那么,本文就利用基础的Java类库,实现一个带消息循环(Looper)的线程,以帮助初学者理解这样一个Looper到底是怎么工作的. 1