记一次消息总线的打造

虽说消息队列的用法很简单:PUB/SUB, PRODUCER/CONSUMER,不过真做起来还真麻烦。

先说下原始需求:

  • Web前端发送命令消息,后端Consumer处理,然后前端得到结果
  • 需要支持Windows服务

很快,下图就出来了:

先来分析分析:

    • 前端怎么知道后端已经处理完成?
    • 前端如何在处理完后的第一时间被触发去执行某些callback呢?
    • Web前端很可能会通过ajax来定时查看某消息的处理状态

  第一反应是增加应答队列,此时:

    • 前端能够很及时的被通知到(后端处理完触发),来执行callback
    • 但是
      • ajax类型的定时查看怎么做?在ResponseQueue中查?显然不行(队列中数据越多,性能越差)
      • 如果前端关闭一段时间,消息会积压下来,性能越变越差

  因此决定增加一个DB来解决这些消息的保存以及后续的ajax类型的多次查询,如下图:

再来分析分析,此时

    • 前端ajax类型的不定时、多次的查询某消息处理状态是解决了
    • 如果消息量很大,也可以将RabbitMQ以及DB分别做集群以及切片
    • 但,似乎还是得增加应答队列进去,因为现在CONSUMER处理完成后,针对前端的通知很麻烦,理由如下
      • 基于DB行记录的通知效率低
    • 但,即便增加了这个应答队列,也会出现如下问题
      • 如果前端崩掉后有段时间未on service,此时应答队列就会积压消息...性能会变差

  此时该咋办?

  答:用PUB/SUB机制来做这个应答队列,此时如果前端崩掉,就不会SUB了,只要online时才会有消息被通知到

  因此,继续出一张图

    

  图中的Notifier, NotifierPublisher是前端和后端的BROKER,考虑到有些线程需要主动监听,因此画在了上面。

  再来谈谈后端,由于没有特别高要求,对后端的要求也就是这么几点:

    • 在业务逻辑角度,消息只能被无错处理一次
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
    • 对于报Exception的消息,人工处理要方便

  分别分析    

    • 在业务逻辑角度,消息只能被无错处理一次

      • 在业务处理没有报错的情况下,将RabbitMQ消息的Ack动作与DB的消息状态回写做成一个TRANSACTION,如下
      • 这样就能保证此消息“从RABBITMQ Server中remove、写入查询DB”同时确保
      • 但是,如果存在下述情况时,会出现即便业务逻辑没有报错情况下多次执行
        • 那就是:如果业务逻辑执行完毕,没有报错,此时,即将触发上述代码,却还没有触发的时刻,服务crash了...
        • 解决方法
          • 在业务逻辑代码中加入幂等性
          • 在业务逻辑代码中加入检查性质代码
          • 告诉我下其他简便的方法吧(记录本地文件日志能解决,就是比较复杂)
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
      • 增加相应的Exception队列,实际中是增加了2个:如:

        • 比如目前有队列messages.CommandA,则异常队列有:

          • messages.CommandA.exceptions1
          • messages.CommandA.exceptions2
          • 为啥是2个?看后续
    • 对于报Exception的消息,人工处理要方便
      • 在配置文件中增加一个参数,表示运行级别:普通、异常1、异常2
      • 如果是普通级别,则CONSUMER会从messages.CommandA中获取消息进行处理,报错后会将消息move到exception1中
      • 如果是异常1级别,则CONSUMER会从messages.CommandA.exception1中获取消息进行处理,报错后会将消息move到exception2中
      • 如果是异常2级别,则CONSUMER会从messages.CommandA.exception2中获取消息进行处理,报错后将消息move到exception1中
      • 这里还有个问题,就是上面的这些RABBITMQ级别的消息从exception1移动到exception2中,都是分成PUBLISH和BASICACK两个CHANNEL上的动作完成的,不能套RABBITMQ的TX,也就是存在一致性问题
        • 解决起来同HandleSuccessfulMessage类似,都是通过本地db事务来做,都是借助了BASE思想来实现的

剩下的一个问题,RABBITMQ有优先级队列特性吗?答案是有:

DONE.

时间: 2024-10-06 20:22:18

记一次消息总线的打造的相关文章

消息总线扩展之主动转发

问题简述 消息总线目前为Java编程语言提供了SDK,同时针对其他语言提供了一个称之为httpBridge的http代理.这基本可以满足大部分主流编程语言对消息总线的使用需求,但这也仅仅是对技术层面上的需求的满足.在业务层面上,尤其是面对老的业务系统的适配一直都是个难题,这篇文章谈谈面对一个在线上运行的业务系统,如何使得引入消息总线的总体成本尽可能得低. 就消息总线的两种使用方式而言,无论是SDK的方式还是httpBridge的方式,都需要往第三方系统引入对消息总线的依赖,这些依赖包括但不仅限于

消息总线重构之简化客户端

这段时间对消息总线进行了再次重构.本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法. 简化client的复杂度 之前的client需要同时连接两个分布式组件.消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber.而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ.而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsub

分布式消息总线

消息总线是一种通信工具,可以在机器之间互相传输消息.文件等. 消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向.发送段只需要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了一定的持久化存储和灾备的机制. 分布式消息总线比较 开源消息总线ActiveMQ

谈消息总线的路由模型

最近在写一个基于RabbitMQ的消息总线.虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型.当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制. 预备知识 RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish:Channel#basicConsum

消息总线VS消息队列

前段时间实现了一个基于RabbitMQ的消息总线,实现的过程中自己也在不断得思考.总结以及修正.需要考虑各个维度:效率.性能.网络.吞吐量.甚至需要自己去设想API可能的使用场景.模式.不过能有一件事情,自己愿意去做,在走路.吃饭.坐公交的时候都在思考如何去改进它,然后在实践的过程中,促使去思考并挖掘自己知识面的空白,也是一件让人开心的事情. 借此记录下自己在实现的过程中,以及平时的一些想法. 这是第一篇,先谈谈消息总线跟消息队列的区别,以及对于企业级应用需要将消息队列封装成消息总线的必要性.

消息总线的应用场景

应用场景 分布式事务 分布式系统组件相互通信 数据复制 日志同步 delay queue 广播通知 介绍 消息总线是一种通信工具,可以在机器之间互相传输消息.文件等. 消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向.发送段只需要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了一定的持久化存储和灾备的机制. [多机房同步方案] 通过消息广播方式将数据多点分布 数据提交给一个代理,这个代理帮我们把这些数据同步到多个机房,那我们应用不需要关心这

再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案.那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析. 核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占.对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法.比如有人喜欢简单.安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且

消息总线扩展之集成Thrift-RPC

本文主要探讨了消息总线支持Thrift RPC的实现过程.鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API.然后也尝试了为消息总线增加对Thrift-RPC的扩展支持,希望此举能让消息总线同时为SOA提供基础设施. Thrift简介 Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义

消息总线扩展之面向消息的数据集成

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向.这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向. 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了.其实,XML的用途远不止充当配置文件这一方面.它还被广泛应用于异构系统集成.数据集成.语义/协议转换等等方面,甚至成为构建平台非常重要的基石.虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余