线程安全的观察者模式的设计

观察者模式的应用,主要的行为就是注册和移除观察者(observer),以及通知所有已注册的Observers。这里介绍的是Chromium项目中实现的线程安全的观察者管理及通知的基础类ObserverListThreadSafe, 它的能力包括:

  • 观察者可以在任意线程中注册,消息回调会发生在注册时所在的线程。
  • 任意线程可以Notify()触发通知消息。
  • 观察者可以在回调时从列表中移除自己。
  • 如果一个线程正在通知观察者, 此时一个观察者正在从列表中移除自己, 通知会被丢弃。

线程安全的基础

实现这个基础是记录线程与观察者列表的对应关系,即某个线程上存在的观察者的列表。定义如下:

typedef std::map<base::PlatformThreadId, ObserverListContext*>
      ObserversListMap;

其中PlatformThreadID为线程的ID, 而ObserverListContext是一个数组,其定义如下:

  struct ObserverListContext {
    scoped_refptr<base::MessageLoopProxy> loop;
    ObserverList<ObserverType> list;
  };

其中loop为Chromium线程机制中的MessageLoopProxy, 也可以理解为线程的消息队列代理,使用它就可以完成将某个操作抛到指定线程上运行。list就很好理解了,记录的是该线程中的观察者列表。

只要全局的持有这个表(ObserversListMap),就可以将观察者与线程关联起来,进而保证通知一定可以运行到它注册时所在的线程。

设计总览及使用方式

其中ObserverListThreadSafe就是我们的主角。它本身是为一个模板类:

template <class ObserverType>
class ObserverListThreadSafe
    : public base::RefCountedThreadSafe<
        ObserverListThreadSafe<ObserverType>,
        ObserverListThreadSafeTraits<ObserverType> > {

下面是一个使用的示例:

  // The interface to receive mouse movement events.
  class MEDIA_EXPORT MouseEventListener {
   public:
    // |position| is the new mouse position.
    virtual void OnMouseMoved(const SkIPoint& position) = 0;

   protected:
    virtual ~MouseEventListener() {}
  };
  typedef ObserverListThreadSafe<UserInputMonitor::MouseEventListener>
      MouseListenerList;
scoped_refptr<MouseListenerList> mouse_listeners_;

添加/删除Observer方法很简单:

void UserInputMonitor::AddMouseListener(MouseEventListener* listener) {
  mouse_listeners_->AddObserver(listener);
  ......
}

void UserInputMonitor::RemoveMouseListener(MouseEventListener* listener) {
  mouse_listeners_->RemoveObserver(listener);
  ......
}

当需要通知各个观察者时,代码如下:

SkIPoint position(SkIPoint::Make(event->u.keyButtonPointer.rootX,
                                     event->u.keyButtonPointer.rootY));
    mouse_listeners_->Notify(
        &UserInputMonitor::MouseEventListener::OnMouseMoved, position);

ObserverListThreadSafe实现要点

操作线程-观察者列表时加锁

即内部成员变量list_lock_,在操作observer_list_要使用如下方法加锁:

base::AutoLock lock(list_lock_);

添加及删除观察者

基本思路就是:

* 以当前线程ID,找到ObserverListContext。如果新增但又没有,则新建。

* 操作找到的ObserverListContext进行添加或删除操作。

下面是AddObserver()的实现:

  // Add an observer to the list.  An observer should not be added to
  // the same list more than once.
  void AddObserver(ObserverType* obs) {
    // If there is not a current MessageLoop, it is impossible to notify on it,
    // so do not add the observer.
    if (!base::MessageLoop::current())
      return;

    ObserverList<ObserverType>* list = NULL;
    base::PlatformThreadId thread_id = base::PlatformThread::CurrentId();
    {
      base::AutoLock lock(list_lock_);
      if (observer_lists_.find(thread_id) == observer_lists_.end())
        observer_lists_[thread_id] = new ObserverListContext(type_);
      list = &(observer_lists_[thread_id]->list);
    }
    list->AddObserver(obs);
  }

事件通知

这个过程实现上,因为需要兼容不同数理的参数,所以定义了一组模板方法。先说明一下基本思路:

* 封装要调用的通知方法为Callback形式:函数,及使用tuple(不是C++11的元组,而是base::tuple)封装起来的参数。

* 遍历observer_lists_,找到每个线程对应的ObserverListContext。

* 使用ObserverListContext中记录的MessageProxyLoop,执行NotifyWrapper,并传入Callback作为参数。

以上这个过程就是在通知的线程的完成的, 具体的代码如下:

  template <class Method, class Params>
  void Notify(const UnboundMethod<ObserverType, Method, Params>& method) {
    base::AutoLock lock(list_lock_);
    typename ObserversListMap::iterator it;
    for (it = observer_lists_.begin(); it != observer_lists_.end(); ++it) {
      ObserverListContext* context = (*it).second;
      context->loop->PostTask(
          FROM_HERE,
          base::Bind(&ObserverListThreadSafe<ObserverType>::
              template NotifyWrapper<Method, Params>, this, context, method));
    }
  }

下一步就是在各个observer所在的线程上触发NotifyWrapper了。它主要做两件事:

* 为每一个observer运行通知的函数:

    {
      typename ObserverList<ObserverType>::Iterator it(context->list);
      ObserverType* obs;
      while ((obs = it.GetNext()) != NULL)
        method.Run(obs);
    }
  • 如果发现这个线程上已经没有可用的观察者,则将它从observer_list_中移除。

封装的通知方法

再回头看一下对回调方法封装,上面提到了参数是使用base::tuple封装的,它同时也提供了DispatchToMethod方法,把参数解开,再调用方法,详见base::tuple中的说明。

下面是UnboundMethod的定义:

// An UnboundMethod is a wrapper for a method where the actual object is
// provided at Run dispatch time.
template <class T, class Method, class Params>
class UnboundMethod {
 public:
  UnboundMethod(Method m, const Params& p) : m_(m), p_(p) {
    COMPILE_ASSERT(
        (base::internal::ParamsUseScopedRefptrCorrectly<Params>::value),
        badunboundmethodparams);
  }
  void Run(T* obj) const {
    DispatchToMethod(obj, m_, p_);
  }
 private:
  Method m_;
  Params p_;
};

如果没有Chromium的线程机制,也是可以实现的,核心是线程的抛转。

小结

这种方法适用于多线程下以函数+参数通知的方式。参数直接抛转到指定的线程不用特别担心线程安全问题。对于获得通知后仍然要跨线程访问数据的情况,则可以考虑: 1.以类似的方式,将数据通过函数参数传递(目前最多为6个)。 2. 如果数据量大,则可以考虑使用Multiversion Concurrency Control的算法,尽量避免加锁的开销。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2025-01-02 14:21:06

线程安全的观察者模式的设计的相关文章

线程安全的日志类设计

最近在写多线程方面的内容, 其实多线程开发设计清楚了在写, 并不会有太大的坑, 尽管如此, 难免有需要调试的时候, 多线程的程序单步调试有时候难以发现bug, 通过记录日志, 可以用来查找问题, 方便调试. 其实有不少开源的C++日志库, 比如大名鼎鼎的log4cxx, 轻量级的glog, 还有一些热心的同行提供的开源工程, 我大体看了几个, 觉得不是很符合我的胃口, 于是就花了一天的时间, 自己写了一个, 用起来还比较顺手, 想怎么改就怎么改, 目前还是比较适合需求的. 考虑到线程安全, 就有

聊聊高并发(二)结合实例说说线程封闭和背后的设计思想

高并发问题抛去架构层面的问题,落实到代码层面就是多线程的问题.多线程的问题主要是线程安全的问题(其他还有活跃性问题,性能问题等). 那什么是线程安全?下面这个定义来自<Java并发编程实战>,这本书强烈推荐,是几个Java语言的作者合写的,都是并发编程方面的大神. 线程安全指的是:当多个线程访问某个类时,这个类始终都能表现出正确的行为. 正确指的是"所见即所知",程序执行的结果和你所预想的结果一致. 理解线程安全的概念很重要,所谓线程安全问题,就是处理对象状态的问题.如果要

Java 并发编程(三)设计线程安全的类-实例封闭

到目前为止,我们已经介绍了关于线程安全与同步的一些基础知识.然而,我们并不希望对每一次内存访问都进行分析以确保是线程安全的,而是希望将一些现有的线程安全组件组合为更大规模的组合为更大规模的组件或程序.之后,我们会讲一些设计线程安全类的一些基本概念,介绍一些组合模式. 一.设计线程安全的类 在设计线程安全类的过程中,需要包含以下三个基本要素: 1.找出构成对象状态的所有变量 2.找出约束状态变量的不变性条件 3.建立对象状态的并发访问管理策略 要分析对象的状态,首先从对象的域开始.如果对象中所有的

.NET设计篇08-线程统一取消模型和跨线程访问UI

知识需要不断积累.总结和沉淀,思考和写作是成长的催化剂,输出倒逼输入 内容目录 一.线程统一取消模型1.取消令牌2.可以中断的线程1.设计一个中断函数2.创建CancellationTokenSource对象3.启动线程4.取消线程执行二.跨线程访问UI基本方法1.Control.Invoke和BeginInvoke2.桌面退出3.编写线程安全的控件三.BackgroundWorker组件1.干活的代码2.启动任务3.结果取回4.取消任务5.进度报告四.等等 一.线程统一取消模型 线程取消在多线

.NET设计篇08-线程取消模型和跨线程访问UI

知识需要不断积累.总结和沉淀,思考和写作是成长的催化剂,输出倒逼输入 内容目录 一.线程统一取消模型1.取消令牌2.可以中断的线程1.设计一个中断函数2.创建CancellationTokenSource对象3.启动线程4.取消线程执行二.跨线程访问UI基本方法1.Control.Invoke和BeginInvoke2.桌面退出3.编写线程安全的控件三.BackgroundWorker组件1.干活的代码2.启动任务3.结果取回4.取消任务5.进度报告四.等等 一.线程统一取消模型 线程取消在多线

java:从消息机制谈到观察者模式

本文接编程思想之消息机制,读者可以结合编程思想之消息机制一起阅读,也可以直接从本文开始阅读. 从简单的例子开始 同样,我们还是先看一个简单例子:创建一个窗口实现加法的计算功能.其效果如下: 图1: 加法计算 Calculator.java: import javax.swing.*; import javax.swing.border.BevelBorder; import java.awt.*; import java.awt.event.ActionEvent; import java.aw

EventBus事件总线框架(发布者/订阅者模式,观察者模式)

一. android应用内消息传递的方式: 1. handler方式-----------------不同线程间传递消息. 2. Interface接口回调方式-------任意两个对象. 3. Intent进行组件间通信,广播方式. 二.单例比较好的写法: private static volatile EventBus defaultInstance; 构造函数应当是private,不应该是public 1 public static EventBus getDefault() { 2 if

多线程——线程模型

什么是程序? 安装在磁盘上的一段指令集合,它是静态的概念. 什么是进程? 它是运行中的程序,是动态的概念,每个进程都有独立的资源空间. 什么是线程? 线程,又称为轻量级进程,是程序执行流的最小单元,是程序中一个单一的顺序控制流程.线程是进程的一个实体,是被系统独立调度和分派的基本单位. 什么是多线程? 多线程则指的是在单个程序中可以同时运行多个不同的线程执行不同的任务. 多线程的特点 ①   一个进程可以包含一个或多个线程. ②   一个程序实现多个代码同时交替运行就需要产生多个线程. ③  

异步并行批处理框架设计的一些思考(转)

随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来.如何有效地处理.分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器.可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰.当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析.此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Storm,也能很好的满足数据实时性分