消息总线优化之PubSub

近段时间,暂缓了消息总线feature的开发,花了部分时间对原先的pubsub机制进行了针对性的优化与重构。这里记录一下优化的过程以及相比原来的设计有哪些改观。

PubSub在消息总线内部的作用

PubSub在消息总线内部主要用于对所有在线客户端进行实时管控的作用。每个客户端在使用消息总线时,都“被迫”注册到PubSub上,并“被迫”订阅了一些Channel,以便消息总线管控台实时下发一些管控指令及时生效。

之前的设计回顾

这里有必要回顾一下之前的设计。消息总线内部的Pub/Sub的机制是通过第三方技术组件的实现(目前支持Zookeeper跟Redis),关于Pub/Sub这里首先普及几个概念,首先组件根据自身业务定义Channel,某个组件如果需要关注某Channel的变更就注册对某Channel的关注(subscribe),当有组件因为业务需要向Channel发送变更(publish),凡是subscribe该Channel的所有组件都会获取到变更。这里因为Zookeeper跟Redis都支持数据存储,所以这里的publish的内容其实既可以被Push给subscribe该Channel的所有组件,也可以使得其他组件根据Channel pull下来。

其实之前的做法的关注点在“自动化”以及“扩展性”。为了所谓的扩展性,我们利用Java注解扫描的方式来使得整个Channel的定义“自动化”,这样就无需硬编码了。并且当后续业务扩展,新增一个Channel的时候,之前Channel的定义无需作任何改变。另外为了客户端首次获取(目前的推送机制zookeeper以及redis都支持KV数据存储)以及后续更新推送数据的对客户端的一致性,我们让一个Channel对应数据库的一张表,同时每个Channel都对应自己的数据自动获取方式。

当然Pub/Sub从服务端角度来看是数据的上行(从数据库提取数据,push到subscribe的客户端),从客户端角度来看是数据的下行。因此这里我们定义了一个IDataExchange接口,用来与Pub/Sub组件进行数据交换:

然后定义了一个@Exchanger注解,它包含两个属性:

  • table:表示对应的表;
  • path:也即channel,对应频道名称;

然后涉及到变更的表都会实现为一个独立的XXXExchanger。

为了让每个Channel的数据源是以一致的接口对外提供,这里统一定义了一个获取数据源的接口:IDataFetcher:

public interface IDataFetcher {

    public byte[] fetchData(IDataConverter converter);

}

该接口接收一个数据序列化器,然后将获取到的数据进行序列化并以byte[]作为统一的返回值,因为需要将数据存储到Pub/Sub组件里去(它们大都支持字节数组的API接口)。

整体的设计如下所示:

这样的设计对最初的关注点(自动化、扩展性、客户端首次获取数据以及后续获取变更数据导致代码处理上的一致性)而言,确实够了。但就性能而言,却非常低效。因为是一张表对应一个Channel,所以其实是全表推送,既然是全表推送,那么就无法鉴别客户端,无法鉴别客户端,就可能代码无效推送(跟某个客户端无关的关系数据,也会被推送过来),从而产生频繁推送,无效解析等一系列恶性循环。另外全表数据,相对来说是原始数据,还需要各个客户端做相应的解析,计算出合适的视图,用于内部控制以及权限校验等,并且所有的客户端在这一步执行的逻辑几乎是一样的。需要解析生成的视图如下:

    private Map<String, Node>   proconNodeMap;
    private Map<String, Node>   reqrespNodeMap;
    private Map<String, Node>   rpcReqRespNodeMap;
    private Map<String, Node>   pubsubNodeMap;
    private Map<String, Node>   idNodeMap;
    private Map<String, Node>   secretNodeMap;
    private Map<String, Config> clientConfigMap;
    private ExchangerManager    exchangeManager;
    private Map<String, Sink>   tokenSinkMap;
    private Map<String, String> pubsubChannelMap;
    private Node                notificationExchangeNode;

优化之后的设计

对于Pub/Sub重新设计之后采用——推拉结合的模式。不再推送数据,只推送变更通知以及变更的KEY(secret)。然后客户端按需拉取。

优化后的设计,带来如下一些优点:

减少客户端内存占用

之前Pub/Sub的设计是“首次拉取,变更全推”的做法。而且拉取的是全表数据,这对于客户端内存的占用是个极大的损耗。而优化之后,将只存储跟当前secret相关的数据视图。

服务端准备“数据视图”,减少客户端计算时间

优化之后针对客户端使用的数据专门定制了数据结构,在服务端按照键值对的形式计算出某个secret对应的客户端需要使用的视图数据并缓存在pub/sub组件的内存中。这个数据视图的数据结构如下:

这样,客户端在验证通信权限的时候,将会非常快。

减少远程访问通信开销

通信次数

减少通信次数的主要手段是本地缓存(local cache),客户端获取数据的方式是:如果本地有,则从本地取,如果本地没有,则从远端获取获取完之后缓存在本地内存里。部分代码如下所示:

    public synchronized NodeView getNodeView(String secret) {
        if (Strings.isNullOrEmpty(secret)) {
            throw new NullPointerException("the secret can not be null or empty");
        }

        if (this.secretNodeViewMap.containsKey(secret)) {   //local cache
            return this.secretNodeViewMap.get(secret);
        } else {                                            //remote data then local cache
            NodeView nodeViewObj = this.pubsuberManager.get(secret, NodeView.class);
            this.secretNodeViewMap.put(secret, nodeViewObj);
            return nodeViewObj;
        }
    }

当然通信次数的减少,还得益于特地为客户端定制的“数据视图”,并且是按照每个队列的secret拆分成key/value的。管控台导致的数据变更将过渡为变更通知事件,然后再按序更新本地缓存。而不会像原来那样,推送数据变更,从而导致太多无效网络交互以及数据计算。

通信数据量

减少通信数据量的主要手段是只获取有效数据,比如当调用消息总线API的时候,每个API都要求传入一个secret来指示当前对应的队列节点,因此我们只需要从远程获取客户端需要的跟当前secret相关的“数据视图”。当然这里我们作了一个假设:大部分场景下,一个客户端在某个JVM进程内通常只使用一个secret。因为API被设计为某个使用者只需要知道自己队列对应的secret即可使用,因此这样的假设是合理的。当然也不排除某个应用涉及到多个队列的操作,这种情况最多多获取几个secret的数据视图。但基本的原则是:不取多余数据,按需取用。并且,推送也从原来的数据变成了现在的变更通知,该通知虽然是广播式的,但却是“自认领”的机制:

    public void onChannelDataChanged(String channel, Object obj) {
        logger.debug("=-=-=-=-=-=- received change from channel : " + channel + " =-=-=-=-=-=-");
        if (channel.equals(Constants.PUBSUB_NODEVIEW_CHANNEL)) {
            String secret = obj.toString();
            this.updateNodeView(secret);
        } else if (channel.equals(Constants.PUBSUB_SERVER_STATE_CHANNEL)) {
            String serverState = obj.toString();
            this.setServerState(serverState);
        } else if (channel.equals(Constants.PUBSUB_CONFIG_CHANNEL)) {
            this.updateConfig(obj.toString());
        } else if (channel.equals(Constants.PUBSUB_NOTIFICATION_EXCHANGE_CHANNEL)) {
            this.updateNotificationNode();
        }
    }

拉取更新:

    public synchronized void updateNodeView(String secret) {
        if (this.secretNodeViewMap.containsKey(secret)) {
            this.secretNodeViewMap.remove(secret);
            this.getNodeView(secret);
        }
    }

可以看到,只有在推送的secret在本地有缓存时,才会去远端拉取更新。否则,将直接丢弃该变更通知。

取舍

当然,这种完全定制化的机制,也彻底废弃了之前关注的自动化以及扩展性的特性。这是必要的,因为我们队消息总线的定位还是希望它具有更好的性能。

时间: 2025-01-02 21:05:35

消息总线优化之PubSub的相关文章

消息总线重构之简化客户端

这段时间对消息总线进行了再次重构.本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法. 简化client的复杂度 之前的client需要同时连接两个分布式组件.消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber.而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ.而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsub

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum

再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案.那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析. 核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占.对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法.比如有人喜欢简单.安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且

手写消息总线LiveDataBus,让你永无后顾之忧

做了很久的面试专题,不知道对各位需要面试和有跳槽想法的小伙伴有没有帮助,今天收集一篇关于LiveDataBus方面的文章,面试方面的收集,后续我还会持续更新如果觉得有用可以点个关注 原文链接:https://www.jianshu.com/p/e13ef9068ee0 Android四大组件和线程间通信方式有很多,比如Handler管道.广播.接口回调.rxBus.EventBus等,但是这些方式都存在一些瑕疵,(比如EvebtBus可能现在用的人比较少了,个人见解可以能算半个过时性的~个人见解

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

分布式消息总线

消息总线是一种通信工具,可以在机器之间互相传输消息.文件等. 消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向.发送段只需要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了一定的持久化存储和灾备的机制. 分布式消息总线比较 开源消息总线ActiveMQ

消息总线VS消息队列

前段时间实现了一个基于RabbitMQ的消息总线,实现的过程中自己也在不断得思考.总结以及修正.需要考虑各个维度:效率.性能.网络.吞吐量.甚至需要自己去设想API可能的使用场景.模式.不过能有一件事情,自己愿意去做,在走路.吃饭.坐公交的时候都在思考如何去改进它,然后在实践的过程中,促使去思考并挖掘自己知识面的空白,也是一件让人开心的事情. 借此记录下自己在实现的过程中,以及平时的一些想法. 这是第一篇,先谈谈消息总线跟消息队列的区别,以及对于企业级应用需要将消息队列封装成消息总线的必要性.

消息总线的应用场景

应用场景 分布式事务 分布式系统组件相互通信 数据复制 日志同步 delay queue 广播通知 介绍 消息总线是一种通信工具,可以在机器之间互相传输消息.文件等. 消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向.发送段只需要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了一定的持久化存储和灾备的机制. [多机房同步方案] 通过消息广播方式将数据多点分布 数据提交给一个代理,这个代理帮我们把这些数据同步到多个机房,那我们应用不需要关心这

消息总线扩展之集成Thrift-RPC

本文主要探讨了消息总线支持Thrift RPC的实现过程.鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API.然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施. Thrift简介 Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义