核心梳理——消息处理的骨架流程——ESFramework 4.0 进阶(02)

ESFramework 4.0 概述一文中,我们提到ESFramework.dll作为通信框架的核心,定义了消息处理的骨架流程,本文我们来详细剖析这个流程以及该骨架中所涉及的各个组件。ESFramework的骨架流程如下图所示: 

一.所有的网络引擎都使用同一消息处理骨架流程

ESFramework支持TCP/UDP、二进制协议/文本协议、服务端/客户端组合而成的2x2x2=8种引擎,无论是哪一种引擎,都实现了INetEngine接口,也都使用上图所示的消息处理骨架流程来处理所接收到的所有消息。

所以,只要掌握了这一消息处理的骨架流程,就掌握了ESFramework的核心机密。也只有掌握了该骨架流程,我们才能轻松自如地驾驭ESFramework。

在该骨架流程中,涉及了多个消息组件,像消息分派器IMessageDispatcher、消息管道IMessagePipe、消息转换器IMessageTransformer、消息监控器IMessageSpy、消息内层分派器NakeDispatcher、消息处理器工厂IMessageProcesserFactory、以及消息处理器IMessageProcesser。

注意,MessagePipe可以看做是MessageTransformer以及GatewayMessageSpy和InnerMessageSpy的封装。NakeDispatcher内部则包含了MessageProcesserFactory,并且利用MessageProcesserFactory来创建MessageProcesser。

网络引擎从网络接收到一个消息后会交给MessageDispatcher进行分派,MessageDispatcher在分派消息的时候,首先会让接收到的消息经过MessagePipe进行必要的监控和转换,然后调用NakeDispatcher对消息进行最终分派处理并获取其返回应答消息,然后再让应答消息经过MessagePipe进行必要的监控和转换,最终返回给网络引擎,网络引擎会将最后得到的应答消息通过网络发送出去。

消息处理的骨架流程图中使用箭头表示出了消息在框架中的流动路径与方向,可以看到,消息进入的路径与返回的路径是对称的。而消息由进入转向为返回的转折点是在消息处理器MessageProcesser,即消息处理器处理接收到的消息并返回应答消息。如果没有应答消息,则消息的路径到达MessageProcesser节点被处理后即终止了。

二.消息分派器MessageDispatcher

消息分派器IMessageDispatcher的基础接口定义如下:

public interface IMessageDispatcher
    {
        IAgileLogger EsfLogger{set ;}
        IMessagePipe MessagePipe { set; }        
        INakeDispatcher NakeDispatcher{set ;}

IMessage DispatchMessage(IMessage reqMsg) ;
    }

首先,消息分派器需要使用MessagePipe组件和NakeDispatcher组件,它实际上是要保证一个流程,即在实现DispatchMessage方法的时候,让进入的消息必须先经过消息管道MessagePipe,然后才提交给NakeDispatcher去处理,如果有应答消息,则还要保证应答消息也必须经过MessagePipe之后,才能返回给网络引擎。IMessageDispatcher接口的实现--MessageDispatcher类,保证了这一点。

其次,DispatchMessage方法的参数即是接收到的进入的消息,返回值即是应答消息,如果没有应答,则返回值为null。

再次,框架要求DispatchMessage方法绝不能抛出异常,否则,这个异常将冲击到网络引擎组件,这可能导致对应连接或Session上的后续消息将不能被接收。

ESFramework.Core.MessageDispatcher类确保了DispatchMessage方法不会抛出异常,因为其在实现DispatchMessage方法的内部截获了所有的异常,并将其通过IAgileLogger 记录到日志。这就是为什么IMessageDispatcher需要注入EsfLogger属性的原因。(关于ESFramework中采用的日志模式的更多信息,可以参见ESFramework 4.0 快速上手 -- 异常日志 )

一般来说,DispatchMessage内部抛出异常的来源通常是两个:

(1)消息处理器抛出异常。由于真正处理消息的是我们应用程序提供的消息处理器,所以如果在业务处理的过程中没有捕获抛出的异常,那么这个异常最终会被框架的消息分派器捕获。

(2)消息管道在监控或变换消息时抛出的异常。比如,加密解密失败等。

三.消息管道MessagePipe

MessagePipe由MessageTransformer以及GatewayMessageSpy和InnerMessageSpy构成,其主要作用是监控和变换消息。

IMessagePipe接口的定义如下:

public interface IMessagePipe
    {
        IMessageTransformerMessageTransformer{ set;}

/// <summary>
        /// 工作于网关层,网络组件收到的消息需要经过的第一个组件就是GatewayMessageSpy,
        /// 发送的消息在到达网络组件前经过的最后一个组件也是GatewayMessageSpy
        /// </summary>
        IMessageSpy GatewayMessageSpy { set; }

/// <summary>
        /// 接收的消息到达处理器之前经过的最后一个组件就是InnerMessageSpy,
        /// 处理器返回的结果消息经过的第一个组件也是InnerMessageSpy
        /// </summary>
        IMessageSpy InnerMessageSpy { set; }

/// <summary>
        /// 消息在发送之前经过管道处理。
        /// </summary>        
        IMessage PipeOutMessage(IMessage msg);

/// <summary>
        /// 接收到的消息在被NakeDispatcher分派之前经过管道处理。
        /// </summary>
        IMessage PipeInMessage(IMessage msg);

/// <summary>
        /// 当消息在Pipe中传递被Pipe丢弃时,触发此事件。
        /// </summary>
        event CbGeneric<IMessage> MessageForbidden;
    }

当消息由MessageDispatcher分派给MessagePipe时,首先经过的组件是GatewayMessageSpy,其次经过MessageTransformer,最后经过InnerMessageSpy到达NakeDispatcher组件;而NakeDispatcher返回的应答消息所经过的路径刚好与此相反。ESFramework.Core.MessagePipe类的PipeInMessage方法和PipeOutMessage方法的实现保证了这一点。

1.消息监控器MessageSpy

GatewayMessageSpy和InnerMessageSpy都是MessageSpy类型的组件,其都继承自IMessageSpy接口:

public interface IMessageSpy
    {
        /// <summary>
        /// 监控所有收到的消息,如请求消息,返回false表明丢弃消息。
        /// </summary>       
        bool SpyMessageReceived(IMessage msg);

/// <summary>
        /// 监控所有即将发送的消息,如回复消息,返回false表明丢弃消息。
        /// </summary>       
        bool SpyMessageToBeSent(IMessage msg);
    }

对于进入的消息,将调用SpyMessageReceived方法对其监控;而对于返回的消息,将调用SpyMessageToBeSent方法对其监控。

  之所以称为“监控”,是因为MessageSpy不会修改消息,它不会改变消息的任何内容;而负责修改或变换消息的是MessageTransformer组件。这一点正是MessageSpy和MessageTransformer的本质区别所在,不同类型的消息组件,职责不一样。

至于MessageSpy要做什么样的监控,是由我们具体的应用决定的,比如,我们的应用需要记录接收到的最后10条“原始”的消息,那么就可以实现一个MessageSpy,挂接在MessagePipe的GatewayMessageSpy属性上即可。

除了监控之外, MessageSpy还有个特权 -- 它能决定放行哪些消息、丢弃哪些消息,这由SpyMessageReceived方法和SpyMessageToBeSent方法返回的bool值体现出来。如果返回值为false,则表示丢弃该消息。消息被丢弃后,到此终结,不会再被传递到下一个消息组件。

一般在什么情况下,MessageSpy会丢弃消息了?比如说,被监控的消息格式不正确,消息可能是黑客模拟的"恶意"的消息;又比如说,在使用UDP通信时,接收到的消息是不完整的,等等。这些消息在MessageSpy这个环节就被过滤掉,不会污染到后续的消息组件,特别是不会给消息处理器带来额外的麻烦。

如果我们的应用不需要任何消息监控,则可以使用一个“占位符”消息监控器挂接到MessagePipe,ESFramework提供了null object模式实现的ESFramework.Core.EmptyMessageSpy作为这个“占位符”供应用直接使用。

2.消息转换器 MessageTransformer

  刚刚提到,MessageTransformer对消息可以进行变换,经过MessageTransformer组件的消息,其内容会被改变。这就为我们对消息的加密、解密、压缩、解压等等提供了挂接点。比如,网络上传递的消息都是加密的,网络引擎接收到这些加密的消息传递给分派器,分派器又传递给了MessagePipe,所以,GatewayMessageSpy作为消息进入MessagePipe的第一个组件,其监控到的就是加密的消息;而接下来当消息经过MessageTransformer被解密后,被传递给InnerMessageSpy的消息是已经解密的、是明文的,这种消息在ESFramework中称为赤裸消息“NakeMessage”。所以,InnerMessageSpy监控到的消息是明文的NakeMessage。当接收到的消息被传递到内层消息分派器NakeDispatcher被分派时,已经是NakeMessage,所以内层的消息分派器被称为NakeDispatcher。

对于NakeMessageDispatcher返回的应答消息,也是明文消息,当其沿出去的方向经过MessageTransformer时,MessageTransformer会对其进行加密,所以,最终网络引擎发送出去的应答消息也是加密的。

从上面的描述,我们已经看出,InnerMessageSpy和GatewayMessageSpy所处的位置不同而决定了它们监控到的消息的状态不一样。通常,GatewayMessageSpy看到的消息都是经过加密的、压缩的,是密文;而InnerMessageSpy看到的消息都是已经被解密的、被解压缩的,是明文。所以,你的应用到底是需要挂接一个GatewayMessageSpy还是一个InnerMessageSpy,取决于你需要监控到什么状态的消息。

  消息过滤器必须实现IMessageTransformer接口:

public interface IMessageTransformer

{
        /// <summary>
        /// 截获即将发出去的消息,可以对截获到的消息进行转换,比如加密、压缩等。
        /// </summary>
        /// <param name="msg">即将发送的消息</param>
        /// <returns>经过截获转换得到的结果</returns>
        IMessage CaptureBeforeSendMessage(IMessage msg);

/// <summary>
        /// 截获收到的消息,可以对截获到的消息进行转换,比如解密、解压缩等。
        /// </summary>      
        /// <param name="msg">从网络接收到的消息</param>
        /// <returns>经过截获转换得到的结果</returns>
        IMessage CaptureReceivedMessage(IMessage msg) ;        
    }

  对于进入的消息,框架将调用MessageTransformer的CaptureReceivedMessage方法对消息进行转换;对于要出去的应答消息,将调用MessageTransformer的CaptureBeforeSendMessage方法对消息进行变换。

从IMessagePipe接口的定义我们看到,其只能挂接一个MessageTransformer实例,如果我们要挂接多个MessageTransformer,比如,一个MessageTransformer用于加密/解密,一个MessageTransformer用于压缩/解压缩,那该怎么办了?ESFramework提供了一个容器类型MessageTransformer-- ESFramework.Core.ContainerStyleTransformer,它是一个实现了IMessageTransformer接口的容器,内部有一个列表可以容纳多个MessageTransformer实例。由于,ContainerStyleTransformer实现了IMessageTransformer接口,所以,可以将其挂接到MessagePipe组件,而我们再把需要用到的多个MessageTransformer实例放到这个容器里就解决了MessagePipe需要挂接多个MessageTransformer的问题。

同MessageSpy的情况一样,如果我们的应用不需要加密/解密消息等任何变换,则可以使用一个“占位符”消息过滤器挂接到MessagePipe,ESFramework提供了null object模式实现的ESFramework.Core.EmptyMessageTransformer作为这个“占位符”供应用直接使用。

四.内层消息分派器NakeDispatcher

  刚才已经提到,当进入的消息被传递到NakeDispatcher时,已经是明文的了,这个时候,该消息就可以直接被消息处理器处理了。内层消息分派器的接口INakeDispatcher定义如下:

public interface INakeDispatcher
    {
        IProcesserFactory ProcesserFactory { set; }     
  
        /// <summary>
        /// 在最内部分配消息给最终的消息处理器去处理,并返回处理的结果。
        /// </summary>     
        IMessage DispatchMessage(IMessage msg) ;
    }

NakeDispatcher需要通过DispatchMessage方法最终分派并处理消息,且返回应答消息(如果有的话)。消息需要被消息处理器处理,那么从哪里获取正确的消息处理器了?是消息处理器工厂。所以NakeDispatcher需要依赖于ProcesserFactory。ESFramework已经提供了ESFramework.Core.NakeDispatcher类实现了INakeDispatcher接口,供我们使用,我们不必再实现此接口。

1.消息处理器工厂 ProcesserFactory

  处理器工厂用于创建或返回正确的消息处理器,IProcesserFactory的接口定义如下:

public interface IProcesserFactory
    {
        /// <summary>
        /// 根据消息的类型创建对应的处理器 。
        /// </summary>
        /// <param name="messageType">消息的类型</param>       
        IMessageProcesser CreateProcesser(int messageType);
    }

CreateProcesser方法创建或返回什么样的处理器取决于消息的类型。即NakeDispatcher会从消息的头部IMessagHeader(详情请参见ESFramework 4.0 进阶(01) -- 消息)中取出MessageType属性的值,然后调用IProcesserFactory的CreateProcesser方法获取正确类型的处理器。如果工厂中没有对应的处理器存在,CreateProcesser方法将返回null,这种情况一般只会在调试阶段出现,通常是因为为对应的消息还没有定义相应的消息处理器。如果在正式运行阶段出现CreateProcesser返回null,那将是非常严重的,要么是服务端和客户端所引用的程序集的版本不一致,要么是黑客在向服务器发送试探消息。当CreateProcesser返回null时,NakeDispatcher会将详细的信息记录到日志文件,我们可以从日志中查看到消息的具体内容。

ESFramework已经提供了ESFramework.Core.ProcesserFactory类实现了IProcesserFactory接口,其内部可以挂接多个消息处理器实例,大部分情况下,我们直接使用这个实现就行了,不需要再实现自己的处理器工厂。并且,ESFramework.Core.ProcesserFactory类的实现采用的缓存,当根据消息类型寻找处理器时,不需要每次都遍历所有的处理器。

  当NakeDispatcher从IProcesserFactory获取到正确的处理器之后,即将消息提交给处理器进行处理。

    

2.消息处理器MessageProcesser

  消息处理器接口IMessageProcesser定义如下:

public interface IMessageProcesser
    {      
        /// <summary>
        /// 该消息处理器是否能够处理类型为messageType的消息。
        /// </summary>        
        bool CanProcess(int messageType);

/// <summary>
        /// 处理消息并返回回复消息,如果返回null,表示没有回复消息。
        /// </summary>       
        IMessage ProcessMessage(IMessage message);        
    }

一个消息处理器能够处理哪些类型的消息,它自己是最清楚的,所以IMessageProcesser提供了CanProcess方法来让我们可以查询它是否能处理目标类型的消息。

同需要定义哪些类型的消息一样,继承了IMessageProcesser接口的消息处理器的实现是由具体的应用来做的,当它们定义好之后,我们便可以将它们挂接到处理器工厂来处理消息了,框架会根据消息的类型来自动调用它们(IOC模式)。

附带说一下,之所以采用ESPlus可以加快ESFramework的开发,就是因为ESPlus已经为我们预定义好了一批消息类型、消息协议以及消息处理器,我们只要将它们挂接到ESFramework的消息骨架上,就可以运转起来。而如果我们直接基于ESFramework.dll开发,则这些事情需要我们自己手动去打造,虽然麻烦一些,但是足够灵活。ESPlus虽然让我们省了不少事,但是也限制了ESFramework的某些方面,正所谓“有得必有失”。

五.实例化一个最简单消息处理流程

假设我们的某个项目不需要任何类型的消息监控,也不需要任何类型的消息变换,仅仅需要挂接两个消息处理器即可。那么,我们可以通过类似下面的代码来实例化这个处理流程:

//构造MessagePipe
    IMessagePipe messagePipe = new MessagePipe();
    messagePipe.InnerMessageSpy = new EmptyMessageSpy();
    messagePipe.GatewayMessageSpy = new EmptyMessageSpy();
    messagePipe.MessageTransformer= new EmptyMessageTransformer();

//构造NakeDispatcher
    IMessageProcesser messageProcesser1 = ......;
    IMessageProcesser messageProcesser2 = ......;
    IMessageProcesser[] messageProcessers = new IMessageProcesser[] { messageProcesser1, messageProcesser2 };
    IProcesserFactory processerFactory = new ProcesserFactory(messageProcessers);            
    INakeDispatcher nakeDispatcher = new NakeDispatcher(processerFactory);

//构造MessageDispatcher
    IMessageDispatcher messageDispatcher = new MessageDispatcher(nakeDispatcher, messagePipe);

上面的组装代码看视简单,却具有非常强的扩展性,如果项目需要,我们可以构造出非常复杂的流程。甚至,由于采用了接口分离原则,我们还可以实现自己的消息分派器或处理器工厂,比如,我们可以实现一个插件处理器工厂,专门从插件中动态地加载所需要的消息处理器(ESPlus提供的ESPlus.Core.Addins.ServerAddinProcesserFactory正是做这件事情的)。

如果有通信引擎实例,那么将构造好的messageDispatcher对象挂接到通信引擎,就可以使整个流程运转起来了。

  关于ESFramework消息处理的骨架流程,就介绍这么多,下一篇我们将深入到ESFramework提供的各种网络引擎内部去看看。that‘s all,thanks.

时间: 2024-09-29 02:22:08

核心梳理——消息处理的骨架流程——ESFramework 4.0 进阶(02)的相关文章

驱动力—— 通信引擎(上)—— ESFramework 4.0 进阶(03)

在ESFramework 4.0 进阶(02)-- 核心:消息处理的骨架流程一文中我们详细介绍了ESFramework中消息处理的骨架流程,并且我们已经知道,ESFramework中的所有通信引擎使用的都是这一套骨架流程.ESFramework内置了多种通信引擎以完全支持"客户端/服务端.TCP/UDP.文本协议/二进制协议"这些特性的组合.本文就来剖析ESFramework中的各种通信引擎. 一.通信引擎接口继承关系图 INetEngine是所有网络引擎的基础接口,接下来再派生出服务

ESFramework 4.0 进阶(01)-- 消息

需要交互的分布式系统之间通过消息来传递有意义的信息.消息是通信框架的核心.离开了消息,再谈通信框架就没有任何意义,所以,消息是ESFramework中一个最核心的概念. 一. 消息的类别 在具体的应用中,我们需要对消息的类别进行定义,这有助于我们分析和讨论问题.消息大致可以分为4个类别:请求消息.回复消息.报告.通知.P2P消息. 在Client/Server模式中,出现最多的便是请求消息和回复消息.这两种类别的消息非常容易理解. 报告指的是Client/Server模式中客户端发送给服务端的消

ESFramework 4.0 快速上手(01) -- Rapid引擎

(在阅读该文之前,请先阅读 ESFramework 4.0 概述 ,会对本文的理解更有帮助.) ESFramework/ESPlatform 4.0 的终极目标是为百万级的用户同时在线提供支持,因为强大,所以使用也较为复杂,配置也较多.但是如果我们的应用只是一个中小型的通信应用(同时在线5000人以下),直接使用ESPlatform就有点显得杀鸡用牛刀了.ESPlus.Rapid提供了一种快速的方式,来解决类似中小型的通信应用,以最简洁的方式来使用ESFramework. 使用ESPlus.Ra

ESFramework 4.0 快速上手(06) -- Rapid引擎(续)

<ESFramework 4.0 快速上手>系列介绍的都是如何使用Rapid引擎(快速引擎) -- RapidServerEngine 和 RapidPassiveEngine.其实,大家可以将这两个引擎看作是两个壳,内部包装的才是真正的ESFramework的网络引擎, ESFramework支持很多种网络引擎(客户端/服务端.二进制协议/文本协议.TCP/UDP),而RapidServerEngine和RapidPassiveEngine采用的是基于TCP和二进制协议的服务端引擎和客户端引

全流程开发 TP6.0实战高并发电商服务系统

第1章 课程简介[PHP行情分析]本章主要讲解本课程的主线, 导学内容,PHP行情分析等让同学们对当前PHP发展充满信心等,同时还分析了企业级开发流程以及规范说明,让同学们对中大型公司的敏捷开发有一个初步认知. 第2章 环境及框架准备[必备基础]本章主要讲解环境的安装,通过composer获取TP6源码,nginx的配置等工作,环境是我们一切学习的根源,造起来. 第3章 TP6基础知识[新框架]本章主要讲解了TP5/TP6异同之处,基础的控制器层.模型层的使用,杜绝无效请求让代码更加健壮,数据库

如何使用自定义消息?--ESFramework 4.0 快速上手(04)

在ESFramework 4.0 快速上手一文中,我们讲述了如何使用Rapid引擎可以快速地上手ESFramework开发,文中介绍了使用ESPlus.Application.CustomizeInfo命名空间下的类可以发送和处理自定义消息,本文我们就通过一个简单的例子来深入讲解如何使用自定义消息. 例子的场景很简单:假设客户端登陆到服务器之后,要求请求加入某个组,服务端收到该请求后,处理该请求,并给客户端相应的回复 -- 是否加入成功,客户端收到回复后,即可作出相应的处理. 一.定义消息类型和

离线消息如何实现?-- ESFramework 4.0 快速上手(02)

在ESFramework 4.0 快速上手一文中,主要介绍了如何使用ESPlus.Rapid命名空间中的引擎来快速地构建基于TCP的网络通信系统,即使是使用ESPlus.Rapid来进行ESFramework快速开发,也还有很多可以介绍的内容,于是,我想再多写几篇文章来说明现实通信系统中的一些常见需求如何使用ESFramework快速实现.本文是为第一篇,介绍离线消息的原理和实现. 一.如何截获离线消息 阅读了ESFramework 4.0 快速上手朋友都知道,一个在线用户给另一个用户发送文本信

cocos2d-x游戏引擎核心(3.x)----启动渲染流程

(1) 首先,这里以win32平台下为例子.win32下游戏的启动都是从win32目录下main文件开始的,即是游戏的入口函数,如下: #include "main.h" #include "AppDelegate.h" #include "cocos2d.h" USING_NS_CC; int APIENTRY _tWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPTSTR lpCm

重登陆模式 --ESFramework 4.0 快速上手(07)

在ESFramework框架中基于TCP的服务端引擎(当然也包括Rapid引擎)都采用了这样一条规则:默认情况下,客户端与服务器成功建立TCP连接以后,服务端会从客户端发过来的第一条消息中取出消息头的UserID属性的值,并将其与对应的TCP连接绑定起来.这样,服务端就知道每一个TCP连接所对应的用户UserID,而当我们要求服务端向某个客户端发送消息时,服务端就知道通过哪个TCP连接进行发送了.TCP连接与UserID是一一对应的,一个TCP连接只能对应一个UserID,同样的,一个UserI