多线程的麻烦
多线程编程是一件麻烦的事,相信很多人深有体会。执行顺序的不确定性,资源的并发访问一直困扰着众多程序员。解决多线程编程问题的方法分为两类:一是对并发访问的资源直接加锁;二是避免并发访问资源;Chromium采用第二种思想来设计多线程模型,通过在线程之间传递消息来实现跨进程通讯。
设计原则
Chromium希望尽量保持UI处于响应状态。为此遵循如下设计原则:
1 不在UI线程上执行任何阻塞I/O操作,以及其它耗时操作。
2 少用锁和线程安全对象
3 避免阻塞I/O线程
4 线程之间不要互相阻塞
5 在数据准备好更新到共享缓冲时才用锁(在准备数据期间不要用锁)
Chromium在程序启动时创建了很多预定用途的线程,我们要尽量使用已有线程,不要创建新的线程。为了实现非阻塞代码,Chromium的很多API都是异步的。
MessageLoop
Chromium将消息循环抽象为MessageLoop类,每个线程都有(只有)一个MessageLoop实例。MessageLoop提供了PostTask系列方法允许向特定线程添加任务。MessageLoop会安装“先进先出”的顺序执行任务,直到收到MessageLoop:Quit消息,消息循环才会退出。
MessageLoop类型
每个MessageLoop都有一个类型,所有类型的MessageLoop都能够处理tasks和timers,类型指定了MessageLoop除了处理tasks和timers外,还可以处理的消息类型。
TYPE_DEFAULT:默认的消息循环,只能处理异步任务和定时器timers
TYPE_UI:除了可以处理异步任何和定时器timers,还能够处理系统UI事件。主线程使用的就是该类型的MessageLoop。
TYPE_IO:支持处理异步I/O事件,Chromium创建的所有处理IPC消息的IO线程创建的MessageLoop都是这种类型。
TYPE_JAVA:只有Android支持,后端实现是Java层的消息处理器。它的行为和TYPE_UI一样,只是创建时不使用主线程上的MessagePump工厂方法。
TYPE_CUSTOM:构建时提供MessagePump
MessagePump::Delegate
Delegate定义了一组接口,由MessageLoop实现,MessagePump通过这组接口来触发MessageLoop执行特定的任务。
MessagePump
MessagePump用来从系统获取消息回调,触发MessageLoop执行Task类。不同类型的MessageLoop都有一个相对应的MessagePump,MessagePump的实现与平台相关。
MessageLoop, MessagePump, Delegate关系
MessageLoop具体的实现和平台相关,即使在相同的平台上,由于使用的事件处理库不同,其实现方式也可能不同。Chromium将平台相关的实现封装在MessagePump中。MessagePump的具体实现提供了平台相关的异步事件处理,而MessageLoop提供了轮询和调度异步任务的基本框架,两者通过MessagePump::Delegate抽象接口关联起来。主要类及它们的关系:
线程:创建到运行
下面以Browser Threads为例,来看看线程从创建到执行的过程。
内核初始化时调用BrowserMainLoop::CreateStartupTasks()创建一些启动任务,其中包括“创建Browser线程”的任务,然后调用startup_task_runner_执行启动任务:
StartupTask create_threads = base::Bind(&BrowserMainLoop::CreateThreads, base::Unretained(this)); startup_task_runner_->AddTask(create_threads);
执行任务时,调用create_threads(),最终会调用系统API创建一个线程,并传入Thread.ThreadMain作为入口函数。线程创建后,就开始执行ThreadMain,该函数首先创建一个MessageLoop,然后开始执行消息循环,直到收到Quit消息。
向线程添加任务
任务最终通过调用MessageLoop:: PostTask*()把任务抛到MessageLoop执行。每个MessageLoop实例都属于一个线程,我们首先要知道需要哪个线程来执行任务。我们必定需要在某个地方维护着可用的线程的信息,例如:名称与线程ID的对应关系,这样我们可以通过一个名称来标识需要执行给定任务的线程。BrowserThread类提供了一组静态方法PostTask*来向Browser Threads添加任务:
staticbool PostTask(ID identifier, consttracked_objects::Location& from_here, constbase::Closure& task); staticbool PostDelayedTask(ID identifier, consttracked_objects::Location& from_here, constbase::Closure& task, base::TimeDelta delay); staticbool PostNonNestableTask(ID identifier, consttracked_objects::Location& from_here, constbase::Closure& task);
第一个参数是一个线程的标识符,定义如下:
enumID { // The main thread in the browser. UI, // This is the thread that interacts with the database. DB, // This is the thread that interacts with the file system. FILE, // Used for file system operations that block user interactions. // Responsiveness of this thread affect users. FILE_USER_BLOCKING, // Used to launch and terminate Chrome processes. PROCESS_LAUNCHER, // This is the thread to handle slow HTTP cache operations. CACHE, // This is the thread that processes non-blocking IO, i.e. IPC and network. // Blocking IO should happen on other threads like DB, FILE, // FILE_USER_BLOCKING and CACHE depending on the usage. IO, // NOTE: do not add new threads here that are only used by a small number of // files. Instead you should just use a Thread class and pass its // MessageLoopProxy around. Named threads there are only for threads that // are used in many places. // This identifier does not represent a thread. Instead it counts the // number of well-known threads. Insert new well-known threads before this // identifier. ID_COUNT };
向DB线程添加一个任务就可以采用类似下面这样的代码:
BrowserThread::PostTask( BrowserThread::DB, FROM_HERE, base::Bind( &GetUrlThumbnailTask, url_string, top_sites, base::Owned(j_callback), lookup_success_callback, lookup_failed_callback));
BrowserThread维护着一个全局变量g_globals,它的类型是BrowserThreadGlobals,定义如下:
structBrowserThreadGlobals { BrowserThreadGlobals() : blocking_pool(newbase::SequencedWorkerPool(3, "BrowserBlocking")) { memset(threads, 0, BrowserThread::ID_COUNT * sizeof(threads[0])); memset(thread_delegates, 0, BrowserThread::ID_COUNT * sizeof(thread_delegates[0])); } // This lock protects |threads|. Do not read or modify that array // without holding this lock. Do not block while holding this lock. base::Lock lock; // This array is protected by |lock|. The threads are not owned by this // array. Typically, the threads are owned on the UI thread by // BrowserMainLoop. BrowserThreadImpl objects remove themselves from this // array upon destruction. BrowserThreadImpl* threads[BrowserThread::ID_COUNT]; // Only atomic operations are used on this array. The delegates are not owned // by this array, rather by whoever calls BrowserThread::SetDelegate. BrowserThreadDelegate* thread_delegates[BrowserThread::ID_COUNT]; constscoped_refptr<base::SequencedWorkerPool> blocking_pool; }; base::LazyInstance<BrowserThreadGlobals>::Leaky g_globals = LAZY_INSTANCE_INITIALIZER; } // namespace
其中的threads数组保存了创建的BrowserThread实例的指针。在初始化的时候会给threads赋值:
voidBrowserThreadImpl::Initialize() { BrowserThreadGlobals& globals = g_globals.Get(); base::AutoLock lock(globals.lock); DCHECK(identifier_ >= 0 && identifier_ < ID_COUNT); DCHECK(globals.threads[identifier_] == NULL); globals.threads[identifier_] = this; }
PostTask()通过调用PostTaskHelper()来添加任务,其内部首先取得g_globals的值,然后根据传入的标识(例如:BrowserThread:: DB)找到对应的thread对象,就可以取得对象的message_loop,最后调用message_loop的PostTask方法把任务交由相应线程执行。
BrowserThreadGlobals& globals = g_globals.Get(); if(!target_thread_outlives_current) globals.lock.Acquire(); base::MessageLoop* message_loop = globals.threads[identifier] ? globals.threads[identifier]->message_loop() : NULL; if(message_loop) { if(nestable) { message_loop->PostDelayedTask(from_here, task, delay); }else { message_loop->PostNonNestableDelayedTask(from_here, task, delay); } }
消息循环工作原理
MessageLoop实际上就是一个循环,不断从任务队列中取出任务,并执行,那么当所有任务都执行完毕之后呢?最直接的做法是,采用忙等待,不断检查任务队列中是否有新的任务。Chromium当然不会采用这么拙劣的方法。以MessagePumpDefault的实现为例,当所有任务执行完毕之后,会执行event对象的wait()函数,等待事件或信号唤醒继续循环执行。
ThreadRestrictions::ScopedAllowWait allow_wait; if(delayed_work_time_.is_null()) { event_.Wait(); }else { TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); if(delay > TimeDelta()) { event_.TimedWait(delay); }else { // It looks like delayed_work_time_ indicates a time in the past, so we // need to call DoDelayedWork now. delayed_work_time_ = TimeTicks(); } }
唤醒的信号是怎么发送的呢?我们来看PostTask*的执行过程:
添加任务到队列时,如果发现任务队列为空,就会调用ScheduleWork启动消息循环,ScheduleWork具体的实现与采用的系统,以及采用的事件模型有关。还是以MessagePumpDefault为例,它的实现如下:
boolIncomingTaskQueue::PostPendingTask(PendingTask* pending_task) { …… boolwas_empty = incoming_queue_.empty(); incoming_queue_.push(*pending_task); pending_task->task.Reset(); if(always_schedule_work_ || (!message_loop_scheduled_ && was_empty)) { // Wake up the message loop. message_loop_->ScheduleWork(); // After we've scheduled the message loop, we do not need to do so again // until we know it has processed all of the work in our queue and is // waiting for more work again. The message loop will always attempt to // reload from the incoming queue before waiting again so we clear this flag // in ReloadWorkQueue(). message_loop_scheduled_ = true; } returntrue; } voidMessagePumpDefault::ScheduleWork() { // Since this can be called on any thread, we need to ensure that our Run // loop wakes up. event_.Signal(); }
为了减少锁的使用和锁的范围,Chromium采用了一个比较巧妙的方法:简单来讲,MessageLoop维护有两个队列,一个work_queue,一个incoming_queue。消息循环不断从work_queue取任务并执行,新加入任务放入incoming_queue。当work_queue中的任务都执行完后,再把incoming_queue拷贝到work_queue(需要加锁)。这样避免了每执行一个任务都要去加锁。
参考
理解WebKit和Chromium: 消息循环(Message Loop)
Chromium on Android: Android系统上Chromium主消息循环的实现分析