.net 业务消息队列

开源QQ群: .net 开源基础服务  238543768

开源地址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ

## 业务消息队列 ##
业务消息队列是应用于业务的解耦和分离,应具备分布式,高可靠性,高性能,高实时性,高稳定性,高扩展性等特性。

## 优点: ##
- 大量的业务消息堆积能力
- 无单点故障及故障监控,异常提醒
- 生产者端负载均衡,故障转移,故障自动恢复,并行消息插入。
- 消费者端负载均衡,故障保持,故障自动恢复,并行消息消费。
- 消息高可靠性持久化,较高性能,较高实时性,高稳定性,高扩展性。
- 支持99*99个消息分区,单个消息分区单天支持近1亿的消息存储。
- 消费者拉方式获取消息,在高并发,大量消息涌入的情况下,只要消费能力足够,不会有消息延迟,消息越多性能越好。

## 缺点: ##
- 能保证消息顺序插入,保证相同分区的消息是顺序的(排除网络延迟),但是多个分区之间的可能是乱序的。
- 消息并行消费或者多个分区并行消费或者负载均衡情况下的,消息消费顺序是乱序。

## 缺点原因: ##
- 消息的负载均衡是基于消息的分区存储,故多个分区之间的消息是乱序的,但是相同分区的消息是顺序的。
- 消息的消费者负载均衡也是基于消息的分区进行均衡的,同时单个消费者订阅多个分区的情况下,也可并行进行消费。意味着不同分区的消息的消费是乱序的,但是相同分区的消息消费是顺序的。

## 缺点解决方案: ##
- 生产者自定义负载均衡算法,按照业务维度(用户,商户)等进行分区(多个用户之间可以消息乱序,单个用户的消息必须是顺序的),不同维度可以指向不同的分区,但是单个维度的消息是可以保证顺序的。
- 本解决方案在故障的情况下,故障会移除某些故障节点,意味着故障节点会立即报错(当然也可自己指定故障节点进行转移,但是转移的节点消息会被提前消费,故障的消息会在恢复故障后重新消费,这样也会出现故障程度上的消息乱序消费)。
- 本解决方案在线上无缝扩容和扩展性能方面也会有限制,看要具体的负载均衡算法,但是一般情况下,如果要扩容还是会进行部分消息迁移的情况。

## 问答: ##
### *1.大量的业务消息堆积能力,如何实现?* ###
  每个分区表支持约1亿的消息存储,可以通过增加分区表进行扩容。消费者进行消息消费,内部仅保留某个分区上一次消费的指针,所以不会影响消费者。
  消息持久化到磁盘,不会在内存驻留,理论上不影响内存。

### *2.无单点故障及故障监控,异常提醒?* ###
  故障一般会发生在redis,数据节点,管理中心,日志中心。
  redis节点故障会影响消费者的消息消费响应及时度,一般延迟5s以内。不会影响消息消费速度和消息消费QPS
  数据节点故障会影响生产者和消费者的消息,并造成消息暂时丢失(但是都是可恢复的,具体的看数据库的高可用做到什么程度)。
  生产者端会无缝的进行节点移除,但是会默认1分钟重新尝试重连。消费者会持续报错至日志中,但是不会影响其他分区消费。
  管理中心故障会影响生产者和消费者的心跳检测和新注册的生产者,消费者,但不会影响生产者和消费者具体的消息存储和发送接收。
  日志中心故障不会影响生产者和消费者,但是影响日志的打印,日志中心故障会通知公司内部监控平台。
  虽然故障不会影响线上已有的消息运行,但是还是会在高并发情况下出现性能问题,和系统稳定性,所以一旦发现要重视和及时处理。

### *3.生产者端负载均衡,故障转移,故障自动恢复,并行消息插入?* ###
  默认负载均衡采用多个分区顺序轮询插入,在并发情况下轮询插入是并行插入到不同分区的;某个数据节点出现故障,会移除相关数据节点的所有分区;
  默认1分钟会重新载入故障分区进行重试。

### *4.消费者端负载均衡,故障保持,故障自动恢复,并行消息消费。* ###
  默认消费者端负载均衡是根据消费者订阅的分区进行的(一个消费者可以订阅多个分区,多个相同业务的消费者可以订阅多个不同分区进行负载)。
  一个消费者订阅多个分区,这个消费者可以开启并行进行多分区消费。并行度=分区数,效果理论上最佳。
  分区节点出现故障等,单个分区或者数据节点就会暂停消费,并通知日志中心打印错误日志。当故障恢复后,消费继续进行。

### *5.消息高可靠性持久化,较高性能,较高实时性,高稳定性,高稳定性。* ###
  消息传递到消息中心后,立即持久化到磁盘,故不会丢失消息。生产者可以采用多个分区进行并行插入,消费者可以采用并行进行消息消费,故理论上性能是可扩展无限量的。
  消息是通过拉取的方式获取的,发送消息会由redis进行即时通知消费者拉取(即时消息默认会合并在500ms内redis通知消息),一般在20ms内消息会被消费掉。
  批量拉消息的方式相对push的消息推送方式在高并发和大量消息处理的情况下,消息发送性能应该是更优的。
  稳定性是基于数据库的稳定性和故障转移层面来确保的,扩展性体现在线上无缝的迁移和扩容。

### *6.支持9999个消息分区,单个消息分区单天支持近1亿的消息存储。* ###
  数据节点是01~99个,节点里面的表分区是01~99个,所以可以支持近1万个分区节点。单表的mqid最大应该是(1亿-1)条,应该满足一般的业务需求,
  若不能满足,可以通过多个分区的方式扩容。

### *7.消费者拉方式获取消息,在高并发,大量消息涌入的情况下,只要消费能力足够,不会有消息延迟,消息越多性能越好。* ###
  push推消息的模式能保证更高的实时性,但是在大量消息的情况下,消息堆积的情况更严重,性能会有所影响。
  pull拉消息的模式在保证消息实时性方面会略差,但是在大量消息涌入的情况下,批量拉消息效率更加。而且会将消息分发的负载转移到多个消费者端上。

## 未来改进: ##
1.  未来采用leveldb重写存储。
1.  剥离broker服务用于支持相对可靠的消息服务。
1.  消息完成标记本地缓存/持久化(或者存储redis),每秒提交更新至数据库,消除频繁消费导致的瓶颈。

## 架构示意图 ##

## 使用demo示例 ##

 /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="msg"></param>
        public void SendMessageDemo(string msg)
        {
            //发送字符串示例
            var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() {
                    ManageConnectString = "server=192.168.17.201;Initial
                    Catalog=dyd_bs_MQ_manage;User ID=sa;[email protected]#;" },//管理中心数据库
                    "dyd.mytest3");//队列路径 .分隔,类似类的namespace,是队列的唯一标识,要提前告知运维在消息中心注册,方可使用。
            p.SendMessage(@"1");
            //发送对象示例
            /* var obj = new message2 { text = "文字", num = 1 };
              var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() {
                        ManageConnectString = "server=192.168.17.237;Initial
                        Catalog=dyd_bs_MQ_manage;User ID=sa;[email protected]#;" },//管理中心数据库
                        "test.diayadian.obj");//队列路径 .分隔,类似类的namespace,是队列的唯一标识,要提前告知运维在消息中心注册,方可使用。
            p.SendMessage<message>(obj);
            */
        }

        private ConsumerProvider Consumer;
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="action"></param>
        public void ReceiveMessageDemo(Action<string> action)
        {
            if (Consumer == null)
            {
                Consumer = new ConsumerProvider();
                Consumer.Client = "dyd.mytest3.customer1";//clientid,接收消息的(消费者)唯一标示,一旦注册以后,不能更改,业务下线废弃后必须要告知运维,删除消费者注册。
                Consumer.ClientName = "客户端名称";//这个相对随意些,主要是用来自己识别的,要简短
                Consumer.Config = new BusinessMQConfig() { ManageConnectString =
                        "server=192.168.17.201;Initial Catalog=dyd_bs_MQ_manage;User
                        ID=sa;[email protected]#;" };
                Consumer.MaxReceiveMQThread = 1;//并行处理的线程数,一般为1足够,若消息处理慢,又想并行消费,则考虑 正在使用的分区=并行处理线程数 为并行效率极端最优,但cpu消耗应该不小。
                Consumer.MQPath = "dyd.mytest3";//接收的队列要正确
                Consumer.PartitionIndexs = new List<int>() { 1, 2, 3,4, 5, 6, 7, 8 };//消费者订阅的分区顺序号,从1开始
                Consumer.RegisterReceiveMQListener<string>((r) =>
                {
                    /*
                       * 这些编写业务代码
                       * 编写的时候要注意考虑,业务处理失败的情况。
                       * 1.重试失败n次。
                       * 2.重试还不行,则标记消息已被处理。然后跳过该消息处理,自己另外文档记录这种情况。
                       * 消息被消费完毕,一定要调用MarkFinished,标记消息被消费完毕。
                       */
                    action.Invoke(r.ObjMsg);
                    r.MarkFinished();
                });
            }

        }
        /// <summary>
        /// 关闭消息订阅连接
        /// </summary>
        public void CloseReceiveMessage()
        {
            //注册消费者消息,消费者务必要在程序关闭后关掉(dispose)。否则导致异常终止,要人工等待连接超时后,方可重新注册。
            if (Consumer != null)
            {
                Consumer.Dispose();
                Consumer = null;
            }
        }
    }

部分截图

备注:.net开源的消息队列很少,特别是针对业务的高可靠性的消息队列;希望这个开源的消息队列,能够为.net领域带来更多解决方案,更多的思路和架构设计;同时也希望了解消息队列的人能够给于这个解决方案更多的建议和完善意见。

作者:车江毅

时间: 2024-12-04 18:09:53

.net 业务消息队列的相关文章

消息总线VS消息队列

前段时间实现了一个基于RabbitMQ的消息总线,实现的过程中自己也在不断得思考.总结以及修正.需要考虑各个维度:效率.性能.网络.吞吐量.甚至需要自己去设想API可能的使用场景.模式.不过能有一件事情,自己愿意去做,在走路.吃饭.坐公交的时候都在思考如何去改进它,然后在实践的过程中,促使去思考并挖掘自己知识面的空白,也是一件让人开心的事情. 借此记录下自己在实现的过程中,以及平时的一些想法. 这是第一篇,先谈谈消息总线跟消息队列的区别,以及对于企业级应用需要将消息队列封装成消息总线的必要性.

互联网业务场景下消息队列架构

消息队列作为一种基础的抽象数据结构,被广泛应用在各类编程与系统设计中. 同步VS异步 通信的一个基本问题是:发出去的消息什么时候需要被接收到?这个问题引出了两个基础概念:"同步通信"和"异步通信".根据理论抽象模型,同步通信和异步通信最本质的差别来自于时钟机制的有无.同步通信的双方需要一个校准的时钟,异步通信的双方不需要时钟.现实的情况是,没有完全校准的时钟,所以没有绝对的同步通信.同样,绝对异步通信意味着无法控制一个发出去的消息被接收到的时间点,无期限的等待一个消

使用事件和消息队列实现分布式事务(转+补充)

虽然本文并非笔者原创,但是我们在非强依赖的事务中原理上也是采用这种方式处理的,不过因为没有仔细去总结,最近在整理和总结时看到了,故转载并做部分根据我们实际情况的完善和补充. 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行多次等待. 有一

C#分布式消息队列 EQueue 2.0 发布啦

前言 最近花了我几个月的业余时间,对EQueue做了一个重大的改造,消息持久化采用本地写文件的方式.到现在为止,总算完成了,所以第一时间写文章分享给大家这段时间我所积累的一些成果. EQueue开源地址:https://github.com/tangxuehua/equeue EQueue相关文档:http://www.cnblogs.com/netfocus/category/598000.html EQueue Nuget地址:http://www.nuget.org/packages/eq

消息队列

1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜快,厨师做菜慢. 再举个例子,到

对分布式事务、消息队列的重新认识

本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功.若两张表在同一个数据库实例中,则使用本地事务就好了.否则可以采用分布式事务,或者消息队列. 前阵子从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了. 上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品

mysql之消息队列

消息队列:在消息的传输过程中保存消息的容器. 消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它. 如图所示: 在不使用消息队列的情况下,用户的请求数据直接写入数据库,再高并发的情况下,会对数据库造成巨的压力,同时也使得响应延迟加剧.在使用消息队列后,用户请求的数据发送给消息队列后立即返回,再由消息队列的消费者进程(通常情况下,该进程独立部署在专门的服务器集群上)从消息队列中获取

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异

消息队列的理解总结

消息队列可实现系统的分布式处理:将业务流程解耦,使得业务处理更专一:可异步处理,不必等待耗时操作完成等. 关于消息队列的介绍,可参考http://kb.cnblogs.com/page/510354/ 1.消息队列的逻辑处理流程 类似设计模式中的“观察者模式(Observer)”或“发布-订阅模式(Pub-Sub)”. “生产者”生成和发送消息到“消息队列”,“消费者”从“消息队列”中取走消息进行处理,使用消息将“生产者”和“消费者”之间的操作关联解耦,易于扩展. 消息队列的应用场景可参考htt