分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

一、分布式消息总线以及基于Socket的实现

在前面的分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载一文之中给大家分享和介绍了一个极其简单也非常容易上的基于.NET Socket Tcp 技术实现的分布消息总线,也是一个简单的发布订阅框架:

并且以案例的形式为大家演示了如何使用这个分布式消息总线架构发布订阅架构模式的应用程序,在得到各位同仁的反馈的同时,大家也非常想了解订阅者离线的情况,即支持离线构发布订阅框架。

二、离线架构

不同于订阅者、发布者都同时在线的情况,支持订阅者离线,架构将有所变化,如下图所示:

也会比原先的结构将更加复杂,其中需要处理以下两个关键点:

1)订阅者的持久化存储。

2)订阅者离线之后其所订阅消息的持久存储。

三、解决方案

为解决消息总线的离线支持机制,我们在Socket 框架之中增加了一个接口ISubscribeStorager

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5:  
   6: namespace EAS.Messages
   7: {   
   8:     /// <summary>
   9:     /// 消息订阅存储接口。
  10:     /// </summary>
  11:     public interface ISubscribeStorager
  12:     {
  13:         /// <summary>
  14:         /// 持久化订阅。
  15:         /// </summary>
  16:         /// <param name="subscriber">订阅者。</param>
  17:         /// <param name="topic">消息主题。</param>
  18:         void Subscribe(string subscriber, string topic);
  19:  
  20:         /// <summary>
  21:         /// 持久化退订。
  22:         /// </summary>
  23:         /// <param name="subscriber">订阅者。</param>
  24:         /// <param name="topic">消息主题。</param>
  25:         void Unsubscribe(string subscriber, string topic);
  26:  
  27:         /// <summary>
  28:         /// 装载订阅信息。
  29:         /// </summary>
  30:         /// <returns>系统之中的订阅清单。</returns>
  31:         List<SubscribeItem> LoadSubscribes();
  32:  
  33:         /// <summary>
  34:         /// 写入消息。
  35:         /// </summary>
  36:         /// <param name="subscriber">订阅者。</param>
  37:         /// <param name="message">消息对象。</param>
  38:         void Write(string subscriber, QueueMessage message);
  39:  
  40:         /// <summary>
  41:         /// 读消息。
  42:         /// </summary>
  43:         /// <param name="subscriber">订阅者。</param>
  44:         /// <param name="message">消息对象。</param>
  45:         /// <returns>成功读取返回true,否则返回false。</returns>
  46:         bool Read(string subscriber, out QueueMessage message);
  47:     }
  48: }

ISubscribeStorager共提供持久化订阅持久化消息存储共五个函数,其中:

LoadSubscribes:服务端初始化时读取所有的离线订阅关系,即那个订阅都订阅那那个主题。

Subscribe:持久化订阅者,当订阅才上线订阅消息时,持久化订阅关系,供离线检测之用。

Unsubscribe:持久化取消订阅,当订阅者退订消息时,从持久化订阅关系之中删除。

Write:当订阅者离线时,把订阅消息写入持久化存储。

Read:当离线订阅者上线时,从持久存储之中读取一条消息向其发送。

ISubscribeStorager:可以选择自己实现这个接口,以建立满足自己规则的离线存储机制,当然在AgileEAS.NET SOA 中间件之中提供了两种离线存储机制,存储于数据库和存储于MSMQ,下面向大家介绍一下这两种内置实现。

四、两种内置离线存储机制

在AgileEAS.NET SOA 中间件平台之中提供了两个ISubscribeStorager的实现,基于数据库的离线订阅存储实现EAS.Messages.DbSubscribeStorager和基于MSMQ的离线订阅存储实现EAS.Messages.MsmqSubscribeStorager

EAS.Messages.DbSubscribeStorager:存储订阅关系在messageSubscribe.Config文件之中,消息存储在关系数据库SOA_SUBSCRIBEEVENTS表之中,使用前必须要建立相应的表结构,以下是SQL Server的DDL脚本:

   1: CREATE TABLE [SOA_SUBSCRIBEEVENTS](
   2:     [GUID] [varchar](36) NOT NULL,
   3:     [SUBSCRIBER] [nvarchar](128) NOT NULL,
   4:     [TOPIC] [nvarchar](128) NOT NULL,
   5:     [BODY] [image] NULL,
   6:     [FCTIME] [datetime] NOT NULL,
   7:  CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED 
   8: (
   9:     [GUID] ASC
  10: )
  11: ) 

目前理论上支持SQLServer 、Mysql、ORACLE、Sqlite四种数据库结构,具体建表脚本请自行参考相应资料书写,也可以使用AgileEAS.NET SOA中间件所提供的数据库初始化工具创建。

      EAS.Messages.MsmqSubscribeStorager:存储订阅关系在messageSubscribe.Config文件之中,消息存储Msmq消息队列之中,使用之前请确保机器上安装了MSMQ消息对列。

五、关于自定义实现ISubscribeStorager

有兴趣的朋友可以自定义实现接口ISubscribeStorager,这样就可以按自己的规则进行存储,比如把离线消息存储到mongodb、Redis、或者直接存储在文件之中,或者其他更多的实现规则,在此就不一一介绍,如有相关兴趣,请联系作者,如确有必要需要给在家介绍一下如何实现,将会另开一文本介绍如何自定义实现ISubscribeStorager接口。

六、改进在线例子支持离线

还是跟上次一样,以案例为在家展示一下怎么进行离线消息,就不重新开始例子,对原有例子做一些改进,改进后例子如下:

其中在原有项目的基础上增加了:Demo.Subscriber1和Demo.Subscriber2项目,其项目配置代码、配置文件基本上同Demo.Subscriber一样,其中唯一的差别在于,Demo.Subscriber1和Demo.Subscriber2向服务器提交订阅的时候都增加一个另friendName参数,其使用IMessageBus接口的以下订阅函数:

   1: /// <summary>
   2: /// 订阅消息。
   3: /// </summary>
   4: /// <param name="subscriber">订阅者。</param>
   5: /// <param name="friendName">订阅者名称,用于处理离线订阅。</param>
   6: /// <param name="topic">主题。</param>
   7: /// <param name="notifyHandler">订阅通知。</param>
   8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);

Demo.Publisher项目为发布者代码。

Demo.Subscriber项目为订阅者代码。

Demo.Server项目为服务端代码。

Demo.Subscriber1项目之中,其Program.cs代码如下:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Windows.Forms;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Subscriber1
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Subscriber1");
  16:  
  17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
  18:             System.Console.ReadLine();
  19:         }
  20:  
  21:         static void MessageNotify(object m)
  22:         {
  23:             Demo.Messages.Message message = m as Demo.Messages.Message;
  24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
  25:         }
  26:     }
  27: }

其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在订阅消息的时候给了一个friendName为Subscriber1,Demo.Subscriber2与Demo.Subscriber1项目的唯一的差别就是此处为Subscriber2.

我们使用内置的EAS.Messages.DbSubscribeStorager,则不需要修改服务端的代码,只需要修改服务端的配置文件如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--数据库连接-->
  12:       <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread">
  13:         <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" />
  14:       </object>
  15:       <!--数据访问器-->
  16:       <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread">
  17:         <property name="DbProvider" type="object" value="DbProvider"/>
  18:         <property name="Language" type="object" value="TSqlLanguage"/>
  19:       </object>
  20:       <!--ORM访问器-->
  21:       <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread">
  22:         <property name="DataAccessor" type="object" value="DataAccessor"/>
  23:       </object>
  24:       <!--查询语言-->
  25:       <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/>
  26:       <!--消息持久化-->
  27:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/>
  28:       <!--日志管理-->
  29:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  30:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  31:       </object>
  32:     </objects>
  33:   </eas>
  34: </configuration>

在配置文件的IOC配置之中我们配置了消息存储对象以及其所依赖的数据库访问对象、Linq查询语言表达式,另外需要说明的是,我们需要把配置文件之中所涉及的EAS.Data.dll、EAS.SOA.BootStrap.dll复制到编译输出Publish,这两个文件可以从AgileEAS.NET SOA 中间件平台发布包之中寻找,本案例的下载压碎包之中会包括这两个文件。

有关于基于Msmq的配置,只需要修改配置文件如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--消息持久化-->
  12:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/>
  13:       <!--日志管理-->
  14:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  15:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  16:       </object>
  17:     </objects>
  18:   </eas>
  19: </configuration>

到此为止,所有代码均已完成,是不是很简单,接下来,我们跑起来验证一下效果。

七、验证效果

我们在编译输入目录Publish下先启动Demo.Server.exe,再各启动Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再启动一个Demo.Publisher.exe,在Demo.Publisher.exe控制台按回车键:

目前程序三个订阅者都是在线的,Demo.Publisher发布了三条消息,三个订阅者都收到了三条消息,那么我们关闭Demo.Subscriber2之后再由Demo.Publisher发布两条消息:

然后我们再启动Demo.Subscriber2,看是否还能收到其离线之后由Demo.Publisher发布的两条消息:

OK,到此为止。

八、源代码下载

本程序的源代码已上传到服务器,请通过http://42.121.30.77/downloads/eas/Demo.Pub_Sub_Offline.rar进行下载,如果在开发过程之中想要了解更多有关Socket通信框架以及更多AgileEAS.NET SOA中间件平台的技术资源,请通过AgileEAS.NET SOA 网站:http://www.smarteas.net最新下载栏目进行下载。

九、问题反馈

麻烦大家在通过视频进行学习的时候能及时把问题反馈给楼主,或者有什么需要改进的一些建议都请向楼主直接反馈,以下是联系方式:

AgileEAS.NET SOA 网站:http://www.smarteas.net

官方博客:http://eastjade.cnblogs.com

楼主QQ:47920381,AgileEAS.NET

QQ群:113723486(AgileEAS SOA 平台)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平台交流)/上限1000人

邮件:[email protected],[email protected],

电话:18629261335。

分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

时间: 2024-10-24 05:31:01

分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载的相关文章

分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载

一.分布式消息总线 在很多MIS项目之中都有这样的需求,需要一个及时.高效的的通知机制,即比如当使用者A完成了任务X,就需要立即告知使用者B任务X已经完成,在通常的情况下,开发人中都是在使用者B所使用的程序之中写数据库轮循代码,这样就会产品一个很严重的两个问题,第一个问题是延迟,轮循机制要定时执行,必须会引起延迟,第二个问题是数据库压力过大,当进行高频度的轮循会生产大量的数据库查询,并且如果有大量的使用者进行轮循,那数据库的压力就更大了. 那么在这个时间,就需要一套能支持发布-订阅模式的分布式消

分布式消息总线

消息总线是一种通信工具,可以在机器之间互相传输消息.文件等. 消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向.发送段只需要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了一定的持久化存储和灾备的机制. 分布式消息总线比较 开源消息总线ActiveMQ

eShopOnContainers学习系列(三):RabbitMQ消息总线实践

今天研究了下eShopOnContainers里的RabbitMQ的使用,在项目里是以封装成消息总线的方式使用的,但是仍然是以其发布.订阅两个方法作为基础封装的,我们今天就来实际使用一下. 为了简单起见,就在同一个API项目里实现发布订阅. 新建API项目 RabbitMQ_Bus_Test ,类库 EventBus.EventBusRabbitMQ,这两个类库中将会实现消息总线最主要的方法.发布订阅. 在EventBus中新增消息事件类:IntegrationEvent,这个类在事件里是作为一

Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇

目前业界流行的分布式消息队列系统(或者可以叫做消息中间件)种类繁多,比如,基于Erlang的RabbitMQ.基于Java的ActiveMQ/Apache Kafka.基于C/C++的ZeroMQ等等,都能进行大批量的消息路由转发.它们的共同特点是,都有一个消息中转路由节点,按照消息队列里面的专业术语,这个角色应该是broker.整个消息系统通过这个broker节点,进行从消息生产者Producer到消费者Consumer的消息路由.当然了,生产者和消费者可以是多对多的关系.消息路由的时候,可以

消息总线扩展之面向消息的数据集成

最近一段时间,我在琢磨消息总线除了能进行受管控的消息通信之外,还有哪些可以扩展的方向.这篇文章我们来探讨一下面向消息的数据集成是否可以作为一种尝试方向. 相关技术简介 XML 谈到XML我们的第一映像就是用它来做各种配置,当然如果你是Javaer,那么可能你印象最深的就是Spring的bena配置了.其实,XML的用途远不止充当配置文件这一方面.它还被广泛应用于异构系统集成.数据集成.语义/协议转换等等方面,甚至成为构建平台非常重要的基石.虽然XML一直以来被人诟病其解析效率低下以及数据量太冗余

Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也带来了调用链路处理上的问题,因为大部分应用请求都会要求同步响应实时处理结果,而由于请求的处理过程已经通过消息异步解耦,所以整个调用链路就变成了异步链路,此时请求链路的发起者如何同步拿到响应结果,就需要进行额外的系统设计考虑. 为了更清晰地理解这个问题,小码哥以最近正在做的共享单车的IOT系统为例,给

JMS发布/订阅消息传送例子

阅读目录 前言 在Tomcat中配置JNDI 在Web工厂中编写代码 参考资料 前言 基于上篇文章"基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送"很容易就可以编写一个发布/订阅消息传送例子,相关环境准备与该篇文章基本类似,主要的区别如下. 在Tomcat中配置JNDI 配置连接工厂和话题 <Resource name="topic/connectionFactory" auth="Container" ty

发布/订阅消息传送模型

1.发布/订阅模型概览 发布/订阅(publish-and-subscribe)模型通常被简写为pub/sub模型.在这个模型中,消息生产者成为发布者(publisher),而消息消费者则称为订阅者(subscribe).在点对点模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题.发布/订阅模型最重要的特性如下: 消息通过一个称为主题的虚拟通道进行交换. 每条消息都会传送给称为订阅者的多个消息消费者.订阅者有许多类型,包括持久性.非持久性和动态性. 发布者通常不会知道.也

消息队列中点对点与发布订阅区别(转)

背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. 点对点与发布订阅最初是由JMS定义的.这两种模式主要区别或解决的问题就是发送到队列的消息能否重