一个通用的C++ 消息总线框架

应用开发过程中经常会处理对象间通信的问题,一般都是对象或接口的依赖和引用去实现对象间的通信,这在一般情况下是没问题的,但是如果相互通信的对象很多,可能会造成对象间的引用关系像蜘蛛网一样,这样会导致对象关系很复杂,难以维护的问题,解决这个问题的一个好方法是通过消息总线去解耦对象间大量相互引用的紧耦合的关系。

设计思路:被通信对象向消息总线发布一个主题,这个主题包含消息主题、消息类型和消息处理函数,消息主题标示某个特定的主题,消息类型用来区分标示这个主题会响应某个特定的消息,消息处理函数用来响应该主题的某种消息类型。通信对象向消息总线发送某个特定主和某个特定消息,总线就会根据消息主题和消息类型找到对应的消息处理函数处理该请求。

由于用到了c++11的可变模板参数和lamda表达式,windows上编译需要Compiler Nov 2012 CTP,linux需要GCC4.7以上。

具体代码:

#pragma once
#include <boost/tuple/tuple.hpp>
#include <boost/utility.hpp>
#include <boost/unordered_map.hpp>
#include <boost/any.hpp>

template <typename... Args>
struct Impl;

template <typename First, typename... Args>
struct Impl<First, Args...>
{
    static std::string name()
    {
        return std::string(typeid(First).name()) + " " + Impl<Args...>::name();
    }
};

template <>
struct Impl<>
{
    static std::string name()
    {
        return "";
    }
};

template <typename... Args>
std::string type_name()
{
    return Impl<Args...>::name();
}

class MessageBus : boost::noncopyable
{
public:
    //向某个主题注册主题,需要订阅主题(topic、消息类型)和消息处理函数。
    template<typename... TArgs, typename TObject, typename TMember>
    void Attach(string strTopic, TObject* Object, TMember Member)
    {
        std::function<void(TArgs...)> f = std::function<void(TArgs...)>([=](TArgs... arg){(Object->*Member)(arg...);});

        m_map.insert(make_pair(GetKey(strTopic), f));
    }

    //向某个主题发送消息, 需要主题和消息类型。消息总线收到消息后会找到并通知对应的消息处理函数。
    template<typename... Args>
    void SendReq(string strTopic, Args... args)
    {
        auto range=m_map.equal_range(GetKey(strTopic));
        boost::unordered_multimap<string, boost::any>::iterator it;

        for (it = range.first;  it!= range.second; it++)
        {
            std::function<void(Args...)> f = boost::any_cast<std::function<void(Args...)>>(it->second);
            f(args...);
        }
    }

    //移除某个主题, 需要主题和消息类型
    template<typename... Args>
    void Remove(string strTopic)
    {
        auto it = m_map.find(GetKey(strTopic));
        while(it!=m_map.end())
            m_map.erase(it++);
    }

private:
    //获得消息键值,通过某个主题和消息类型可以确定观察者
    template<typename... TArgs>
    string GetKey(string& strTopic)
    {
        return strTopic + type_name<TArgs...>();
    }

private:
    boost::unordered_multimap<string, boost::any> m_map;
};

测试代码:

    MessageBus bus;
    MyStruct st;
    bus.Attach<int,string>("bb", &st, &MyStruct::Test); //注册主题(topic、消息类型、消息处理函数)
    bus.Attach<int,string>("bb", &st, &MyStruct::Test2);
    bus.SendReq<int, string>("bb",0," append"); //发送消息处理请求(主题和消息类型)
    bus.Remove<int, string>("bb"); //移除主题(主题和消息类型)

测试结果:

it is a test: 0 append

it is a test2: 0 append

更新版本,通过万能的函数包装器实现消息总线,使得接口的调用更加通用和一致。

template <typename R=void>
class MessageBus : boost::noncopyable
{
public:
    //注册消息
    template< class... Args, class F, class = typename std::enable_if<!std::is_member_function_pointer<F>::value>::type>
    void Attach(string strKey, F && f)
    {
        std::function<R(Args...)> fn = [&](Args... args){return f(std::forward<Args>(args)...); };
        m_map.insert(std::make_pair(strKey + type_name < Args...>(), std::move(fn)));
    }

    // non-const member function
    template<class... Args, class C, class... DArgs, class P>
    void Attach(string strKey, R(C::*f)(DArgs...), P && p)
    {
        std::function<R(Args...)> fn = [&, f](Args... args){return (*p.*f)(std::forward<Args>(args)...); };
        m_map.insert(std::make_pair(strKey + type_name < Args...>(), std::move(fn)));
    }

    template<class... Args, class C, class... DArgs, class P>
    void Attach(string strKey, R(C::*f)(DArgs...) const, P && p)
    {
        std::function<R(Args...)> fn = [&, f](Args... args){return (*p.*f)(std::forward<Args>(args)...); };
        m_map.insert(std::make_pair(strKey + type_name < Args...>(), std::move(fn)));
    }

    //广播消息,主题和参数可以确定一个消息, 所有的消息接收者都将收到并处理该消息
    template<typename... Args>
    void SendReq(string strTopic, Args... args)
    {
        auto range = m_map.equal_range(strTopic + type_name < Args...>());
        for (auto it = range.first;  it != range.second; it++)
        {
            std::function<R(Args...)> f = boost::any_cast<std::function<R(Args...)>>(it->second);
            f(args...);
        }
    }

    //移除消息
    template<typename... Args>
    void Remove(string strTopic)
    {
        string strMsgType = GetNameofMsgType<Args...>();
        auto range=m_map.equal_range(strTopic+strMsgType);
        m_map.erase(range.first, range.second);
    }

private:
    std::multimap<string, boost::any> m_map;
};

测试代码:

struct A
{
    void Test(int x){ cout << x << endl; }
    void GTest()
    {
        cout << "it is a test" << endl;
    }
    void HTest(int x) const
    {
        cout << "it is a HTest" << endl;
    }
};

void GG(int x)
{
    cout << "it is a gg" << endl;
}

void GG1()
{
    cout << "it is a GG" << endl;
}

void TestMessageBus()
{
    A a;
    MessageBus<> bus;
    bus.Attach<int>("aa", &A::Test, &a);
    int x = 3;
    bus.SendReq("aa", 3);

    bus.Attach<int>("hh", &A::HTest, &a);
    bus.SendReq("hh", x);
    bus.Attach("bb", &A::GTest, &a);
    bus.SendReq("bb");

    bus.Attach<int>("gg", GG);
    bus.SendReq("gg", 3);

    bus.Attach("gg", GG1);
    bus.SendReq("gg");
}

时间: 2024-10-13 02:04:38

一个通用的C++ 消息总线框架的相关文章

微服务实战(二):落地微服务架构到直销系统(构建消息总线框架接口)

从上一篇文章大家可以看出,实现一个自己的消息总线框架是非常重要的内容,消息总线可以将界限上下文之间进行解耦,也可以为大并发访问提供必要的支持. 消息总线的作用: 1.界限上下文解耦:在DDD第一波文章中,当更新了订单信息后,我们通过调用经销商界限上下文的领域模型和仓储,进行了经销商信息的更新,这造成了耦合.通过一个消息总线,可以在订单界限上下文的WebApi服务(来源微服务-生产者)更新了订单信息后,发布一个事件消息到消息总线的某个队列中,经销商界限上下文的WebApi服务(消费者)订阅这个事件

Android组件化方案及组件消息总线modular-event实战

背景 组件化作为Android客户端技术的一个重要分支,近年来一直是业界积极探索和实践的方向.美团内部各个Android开发团队也在尝试和实践不同的组件化方案,并且在组件化通信框架上也有很多高质量的产出.最近,我们团队对美团零售收银和美团轻收银两款Android App进行了组件化改造.本文主要介绍我们的组件化方案,希望对从事Android组件化开发的同学能有所启发. 为什么要组件化 近年来,为什么这么多团队要进行组件化实践呢?组件化究竟能给我们的工程.代码带来什么好处?我们认为组件化能够带来两

分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载

一.分布式消息总线 在很多MIS项目之中都有这样的需求,需要一个及时.高效的的通知机制,即比如当使用者A完成了任务X,就需要立即告知使用者B任务X已经完成,在通常的情况下,开发人中都是在使用者B所使用的程序之中写数据库轮循代码,这样就会产品一个很严重的两个问题,第一个问题是延迟,轮循机制要定时执行,必须会引起延迟,第二个问题是数据库压力过大,当进行高频度的轮循会生产大量的数据库查询,并且如果有大量的使用者进行轮循,那数据库的压力就更大了. 那么在这个时间,就需要一套能支持发布-订阅模式的分布式消

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum

消息总线VS消息队列

前段时间实现了一个基于RabbitMQ的消息总线,实现的过程中自己也在不断得思考.总结以及修正.需要考虑各个维度:效率.性能.网络.吞吐量.甚至需要自己去设想API可能的使用场景.模式.不过能有一件事情,自己愿意去做,在走路.吃饭.坐公交的时候都在思考如何去改进它,然后在实践的过程中,促使去思考并挖掘自己知识面的空白,也是一件让人开心的事情. 借此记录下自己在实现的过程中,以及平时的一些想法. 这是第一篇,先谈谈消息总线跟消息队列的区别,以及对于企业级应用需要将消息队列封装成消息总线的必要性.

再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案.那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析. 核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占.对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法.比如有人喜欢简单.安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且

消息总线授权设计

我曾在之前的一篇文章中对比过消息队列跟消息总线.它们其中的一个不同点就是:消息总线更关注通信安全,消息总线可以管控通信双方,对通信的管控是建立在授权的基础上.因此授权模型的设计是消息总线必须考虑的问题.所谓的授权,就是校验通信双方有没有建立可信任的通信关系.这篇文章我们来谈谈消息总线的权限设计. 消息总线使用场景及RabbitMQ通信简介 在介绍授权设计之前,我们先了解一些必要信息.通常我们将消息总线应用于以下这些场景: 缓冲类--自生产自消费 解耦类.异步类--生产者消费者模型 服务调用类(R

分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

一.分布式消息总线以及基于Socket的实现 在前面的分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载一文之中给大家分享和介绍了一个极其简单也非常容易上的基于.NET Socket Tcp 技术实现的分布消息总线,也是一个简单的发布订阅框架: 并且以案例的形式为大家演示了如何使用这个分布式消息总线架构发布订阅架构模式的应用程序,在得到各位同仁的反馈的同时,大家也非常想了解订阅者离线的情况,即支持离线构发布订阅框架. 二.离线架构 不同于订阅者.发布者都同时在