企业级工作流解决方案(三)--微服务消息处理模型之服务端处理

1. Json-Rpc 2.0

  参考地址:https://www.jsonrpc.org/specification

  JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议,它允许运行在基于socket,http等诸多不同消息传输环境的同一进程中。其使用JSON(RFC 4627)作为数据格式,它为简单而生。

  请求对象

 发送一个请求对象至服务端代表一个rpc调用, 一个请求对象包含下列成员:

    jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0”

    method包含所要调用方法名称的字符串,以rpc开头的方法名,用英文句号(U+002E or ASCII 46)连接的为预留给rpc内部的方法名及扩展名,且不能在其他地方使用。

    params调用方法所需要的结构化参数值,该成员参数可以被省略。

    id已建立客户端的唯一标识id,值必须包含一个字符串、数值或NULL空值。如果不包含该成员则被认定为是一个通知。该值一般不为NULL[1],若为数值则不应该包含小数[2]

  响应对象

  当发起一个rpc调用时,除通知之外,服务端都必须回复响应。响应表示为一个JSON对象,使用以下成员:

    jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0”

    result该成员在成功时必须包含。当调用方法引起错误时必须不包含该成员。服务端中的被调用方法决定了该成员的值。

    error该成员在失败是必须包含。当没有引起错误的时必须不包含该成员。该成员参数值必须为5.1中定义的对象。

    id该成员必须包含。该成员值必须于请求对象中的id成员值一致。若在检查请求对象id时错误(例如参数错误或无效请求),则该值必须为空值。

  响应对象必须包含result或error成员,但两个成员必须不能同时包含。

  错误对象

  当一个rpc调用遇到错误时,返回的响应对象必须包含错误成员参数,并且为带有下列成员参数的对象:

    code使用数值表示该异常的错误类型。 必须为整数。

    message对该错误的简单描述字符串。 该描述应尽量限定在简短的一句话。

    data包含关于错误附加信息的基本类型或结构化类型。该成员可忽略。 该成员值由服务端定义(例如详细的错误信息,嵌套的错误等)。

  错误码及实例见官方文档。

2. 传输消息模型

  Json-Rpc请求消息或者答复消息在服务消费者与服务提供者之间传输需要一个载体,传输消息模型定义如下:

/// <summary>
    /// 传输消息模型。
    /// </summary>
    public class TransportMessage
    {

        public TransportMessage()
        {
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage(object content,object headers)
        {
            if (content == null)
                throw new ArgumentNullException(nameof(content));

            Content = content;
            Headers = headers;
            ContentType = content.GetType().FullName;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage(object content, object headers, string fullName)
        {
            if (content == null)
                throw new ArgumentNullException(nameof(content));

            Headers = headers;
            Content = content;
            ContentType = fullName;
        }

        /// <summary>
        /// 消息Id。
        /// </summary>
        public string Id { get; set; }

        /// <summary>
        /// 消息内容。
        /// </summary>
        public object Content { get; set; }

        /// <summary>
        /// 消息传输Header
        /// </summary>
        public object Headers { get; set; }

        /// <summary>
        /// 内容类型。
        /// </summary>
        public string ContentType { get; set; }

        /// <summary>
        /// 是否调用消息。
        /// </summary>
        /// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        /// <summary>
        /// 是否是调用结果消息。
        /// </summary>
        /// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        /// <summary>
        /// 获取内容。
        /// </summary>
        /// <typeparam name="T">内容类型。</typeparam>
        /// <returns>内容实例。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T GetContent<T>()
        {
            return (T)Content;
        }

        /// <summary>
        /// 获取Header。
        /// </summary>
        /// <typeparam name="T">Header类型。</typeparam>
        /// <returns>Header实例。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T GetHeaders<T>()
        {
            return (T)Headers;
        }

        /// <summary>
        /// 创建一个调用传输消息。
        /// </summary>
        /// <param name="invokeMessage">调用实例。</param>
        /// <returns>调用传输消息。</returns>
        public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection)
        {
            return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName)
            {
                Id = Guid.NewGuid().ToString("N")
            };
        }

        /// <summary>
        /// 创建一个调用结果传输消息。
        /// </summary>
        /// <param name="id">消息Id。</param>
        /// <param name="invokeResultMessage">调用结果实例。</param>
        /// <returns>调用结果传输消息。</returns>
        public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection)
        {
            return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName)
            {
                Id = id
            };
        }
    }

  这里的Content就是JsonRpc请求对象或者答复对象,重点说一下Id和Header,这里的Id定义为一个Guid,每一个请求消息给Id赋值Id = Guid.NewGuid().ToString("N"),每一次发起Rpc调用,消费者线程都会等待服务端消息答复,等待的标识就是这个消息Id,服务端收到Rpc调用请求的时候,新建答复消息,将请求消息Id赋值给答复消息Id,这样,服务消费者就可以拿到消息Id唤起对应的等待线程;这里的Header是一个数据字典,Json-Rpc标准没有定义Header,但是Header在很多情况下都是非常重要的,后续文章会介绍到在Abp用户身份认证中的作用。

3. 消息处理模型

  方法启动时,把每个方法对应的方法处理模型存储到全局对象中

public class SMD
    {
        /// <summary>
        /// key:方法名称,value:smdservice
        /// </summary>
        public Dictionary<string, SMDService> SMDServices { get; set; }
    }

  方法处理模型

public class SMDService
    {
        public Dictionary<string, Type> parameterDict { get; set; }

        public Type ApplicationType { get; private set; }
        public SMDResult returns { get; private set; }
        public SMDAdditionalParameters[] parameters { get; private set; }
        /// <summary>
        /// Stores default values for optional parameters.
        /// </summary>
        public ParameterDefaultValue[] defaultValues { get; private set; }
    }

  ApplicationType为方法对应的服务类型,Rpc调用处理时,根据这个值从容器里面创建处理实例,parameters,returns,defaultValues为方法对应的参数,Rpc调用时,会根据这些值验证请求的合法性,并根据这些值创建正真调用的动态委托方法,执行正真的服务调用。  

  再见微服务交互流程图

4. 启动服务注册过程

  服务启动注册的目的是要构造全局消息处理模型,最终需要解析为Dictionary<方法名称,服务处理模型>,这里的方法名称格式固定为“服务名称.类名称.方法名称”,服务启动依赖于服务配置文件,格式如下:

<jsonrpc>
  <serviceAssemblies>
    <add assembly="CK.Sprite.AuthCenterService" domain="AuthCenterService" methodMode="allPublic" />
    <add assembly="CK.Sprite.CommonService" domain="CommonService" methodMode="allPublic" />
    <add assembly="CK.Sprite.Workflow" domain="SpriteWorkflow" methodMode="allPublic" />
    <add assembly="CK.Plugin.OAService" domain="OAService" methodMode="allPublic" />
  </serviceAssemblies>
</jsonrpc>

  服务启动过程遍历此配置文件,对每一条记录,根据assembly获取运行时的System.Reflection. Assembly,遍历程序集里面的实现了IApplicationService接口的类,对于每一个类,通过反射获取每一个public方法,并解析方法名称和参数,最终构造出Dictionary<方法名称,服务处理模型>,核心代码如下:

private static void RegisterAssembly(XmlNode assemblyNode)
        {
            var assemblyName = assemblyNode.Attributes["assembly"].Value;
            var domain = assemblyNode.Attributes["domain"].Value;
            var methodMode = assemblyNode.Attributes["methodMode"].Value;

            //var assem = Assembly.Load(assemblyName);
            var assem = Dependency.IocManager.Instance.Resolve<IAssemblyFinder>().GetAllAssemblies().FirstOrDefault(r=> r.GetName().Name == assemblyName);
            if(assem == null)
            {
                return;
            }
            var typesWithHandlers = assem.GetTypes().Where(f => f.IsPublic && f.IsClass && typeof(IApplicationService).GetTypeInfo().IsAssignableFrom(f));
            foreach (Type applicationServiceType in typesWithHandlers)
            {
                try
                {
                    RegisterType(applicationServiceType, assem, methodMode, domain);
                }
                catch (Exception ex)
                {
                    System.Diagnostics.Trace.WriteLine("Register type " + applicationServiceType.FullName + " error and skiped, " + ex.ToString());
                }
            }
        }

        private static void RegisterType(Type applicationServiceType, Assembly assem, string methodMode, string domain)
        {
            string sessionID = Handler.DefaultSessionId();

            var item = applicationServiceType;
            List<MethodInfo> methods;
            bool isMethodModeByAttribute = methodMode != null &&
                                           "attribute".Equals(methodMode, StringComparison.InvariantCultureIgnoreCase);

            if (isMethodModeByAttribute)
            {
                methods =
                    item.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)
                        .Where(m => m.GetCustomAttributes(typeof(JsonRpcMethodAttribute), false).Length > 0)
                        .ToList();
            }
            else
            {
                methods = item.GetMethods(BindingFlags.Public | BindingFlags.Instance | BindingFlags.DeclaredOnly).ToList();
            }

            foreach (var meth in methods)
            {
                if (meth.Name.StartsWith("get_") || meth.Name.StartsWith("set_"))
                    continue;

                Dictionary<string, Type> paras = new Dictionary<string, Type>();
                Dictionary<string, object> defaultValues = new Dictionary<string, object>();
                // dictionary that holds default values for optional params.

                var paramzs = meth.GetParameters();

                List<Type> parameterTypeArray = new List<Type>();
                for (int i = 0; i < paramzs.Length; i++)
                {
                    // reflection attribute information for optional parameters
                    paras.Add(paramzs[i].Name, paramzs[i].ParameterType);

                    if (paramzs[i].IsOptional) // if the parameter is an optional, add the default value to our default values dictionary.
                        defaultValues.Add(paramzs[i].Name, paramzs[i].DefaultValue);
                }

                var resType = meth.ReturnType;
                paras.Add("returns", resType); // add the return type to the generic parameters list.

                if (isMethodModeByAttribute)
                {
                    var atdata = meth.GetCustomAttributes(typeof(JsonRpcMethodAttribute), false);
                    foreach (JsonRpcMethodAttribute handlerAttribute in atdata)
                    {
                        string methodName;
                        if (handlerAttribute.JsonMethodName == string.Empty)
                        {
                            methodName = applicationServiceType.Name + "." + meth.Name;
                            if (!string.IsNullOrWhiteSpace(domain))
                            {
                                methodName = domain + "." + methodName;
                            }
                        }
                        else
                        {
                            methodName = handlerAttribute.JsonMethodName == string.Empty ? meth.Name : handlerAttribute.JsonMethodName;
                        }
                        var handlerSession = Handler.GetSessionHandler(sessionID);
                        try
                        {
                            handlerSession.MetaData.AddService(methodName, new SMDService(applicationServiceType, paras, defaultValues));
                        }
                        catch (Exception ex)
                        {
                            System.Diagnostics.Trace.WriteLine("Register method " + methodName + " error and skiped, " + ex.ToString());
                        }
                    }
                }
                else
                {
                    {
                        var methodName = applicationServiceType.Name + "." + meth.Name;
                        if (!string.IsNullOrWhiteSpace(domain))
                        {
                            methodName = domain + "." + methodName;
                        }

                        var handlerSession = Handler.GetSessionHandler(sessionID);
                        try
                        {
                            handlerSession.MetaData.AddService(methodName, new SMDService(applicationServiceType, paras, defaultValues));
                        }
                        catch (Exception ex)
                        {
                            System.Diagnostics.Trace.WriteLine("Register method " + methodName + " error and skiped, " + ex.ToString());
                        }
                    }
                }
            }
        }

5. 服务处理过程

  首先根据请求参数获取消息处理模型,然后经过一系列的请求合法性验证,最终从容器中拿出Rpc调用正真的处理服务实例,动态构造调用委托,添加Abp的各种拦截(参照abp里面的actionfilter做法),动态调用方法委托,然后对结果进行rpcresponse封装。核心处理代码如下:

SMDService metadata = null;
            // 根据请求参数从全局消息处理模型中获取方法对应的消息处理模型
            var haveMetadata = this.MetaData.SMDServices.TryGetValue(Rpc.Method, out metadata);
            // 请求合法性,消息参数,权限验证等一系列验证...
// 从容器中获取服务类
                var applicationServiceInstance = Dependency.IocManager.Instance.Resolve(metadata.ApplicationType);

                // 构造动态执行调用委托
                var newDel = Delegate.CreateDelegate(System.Linq.Expressions.Expression.GetDelegateType(metadata.parameterDict.Values.ToArray()),
                            applicationServiceInstance /*Need to add support for other methods outside of this instance*/, mothod.First());

                // 添加abp uow拦截
                var _unitOfWorkDefaultOptions = Dependency.IocManager.Instance.Resolve<IUnitOfWorkDefaultOptions>();
                var _unitOfWorkManager = Dependency.IocManager.Instance.Resolve<IUnitOfWorkManager>();
                var unitOfWorkAttr = _unitOfWorkDefaultOptions.GetUnitOfWorkAttributeOrNull(mothod.First());

                object results;

                // 动态调用真正的服务方法
                if (!unitOfWorkAttr.IsDisabled)
                {
                    using (var uow = _unitOfWorkManager.Begin(unitOfWorkAttr.CreateOptions()))
                    {
                        results = newDel.DynamicInvoke(parameters);
                        uow.Complete();
                    }
                }
                else
                {
                    results = newDel.DynamicInvoke(parameters);
                }

                var last = parameters.LastOrDefault();
                JsonRpcException contextException;
                if (Task.CurrentId.HasValue && RpcExceptions.TryRemove(Task.CurrentId.Value, out contextException))
                {
                    JsonResponse response = new JsonResponse() { Error = ProcessException(Rpc, contextException), Id = Rpc.Id };
                    callback.Invoke(response);
                    CompletedProcess(Rpc, response, RpcContext);
                    return response;
                }
                if (expectsRefException && last != null && last is JsonRpcException)
                {
                    JsonResponse response = new JsonResponse() { Error = ProcessException(Rpc, last as JsonRpcException), Id = Rpc.Id };
                    callback.Invoke(response);
                    CompletedProcess(Rpc, response, RpcContext);
                    return response;
                }
                //return response, if callback is set (method is asynchronous) - result could be empty string and future result operations
                //will be processed in the callback
                var reponse = new JsonResponse() { Result = results };
                CompletedProcess(Rpc, reponse, RpcContext);
                return reponse;

  有问题可以QQ或者邮箱交流:邮箱:[email protected],QQ:523477776

原文地址:https://www.cnblogs.com/spritekuang/p/10805574.html

时间: 2024-10-12 14:10:35

企业级工作流解决方案(三)--微服务消息处理模型之服务端处理的相关文章

企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务? 这个就涉及到服务发现.服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,

企业级工作流解决方案(一)--总体介绍

引言:国内对于流程引擎的介绍非常的少,但是不能否认流程引擎的重要性,流程引擎在各个行业都有应用,OA管理的请假流程.出差流程,项目管理上的合同审批流程.验收流程.启动流程,Erp中的采购流程.入库出库流程,政府里面的招标流程.结算流程等,都有流程引擎的身影,当然这里说的流程指审批意义的流程,还有一些不用人为参与的生产作业生产过程也是可以用工作流来解决的. 工作了多年,是时候把工作中的一些东西沉淀下来,准备写一系列文章,系统的介绍企业级工作流管理平台的搭建以及设计思路,希望能对其他人有所帮助.这一

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

服务端启动 服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器. JsonRpcServerModule代码如下,见备注说明 [DependsOn(typeof(AbpKernelModule))] public class JsonRpcServerModule : AbpModule { public override vo

企业级工作流解决方案(二)--微服务总体介绍

微服务好处和概念性的东西就不介绍了,对于微服务,个人认为并不是越复杂就越好,相反要结合自己公司的现状,适当的做一些裁剪,比如对于规模和业务量不是特别大的企业,就没有必要把服务总线,服务健康检查,服务路由选择,熔断等等加进来,相反,这一部分职责可以通过配置文件,nginx代理,api网关等等外围的技术来控制,当企业达到一定的规模之后,再来完善这部分内容,但是对于微服务处理过程来说,没有任何影响. 我这里根据json-rpc 2.0标准,结合网上一些开源的微服务架构思想,采用dotnetty技术,搭

企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式. LocalInvoker 本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说. HttpInvoker 即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务

企业级工作流解决方案(六)--微服务消息处理模型之与Abp集成

身份认证传递 对于Abp比较熟悉的朋友应该对他里面的用户身份认证比较熟悉,他是通过实现微软提供的权限认证方式实现的,用户登录身份信息存储在System.Security.Claims.ClaimsPrincipal里面,但是用户的身份信息如何在不同的服务之间传递呢,不可能每一个服务都必须实现这套身份认证吧?比如我们请求调用过程如下: Portal站点获取用户信息没有问题,但如何传递到调用的其他微服务呢? 这在之前文章中提到的传输Header就起到了作用,有两种方式可以处理,第一种我们可以直接把a

企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理

客户端启动 客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中.2. 指定客户端编解码器工厂.3. 预连接,即预先建立与服务端的通信Chanel. [DependsOn(typeof(AbpKernelModule))] public class JsonRpcClientModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketClientConfig

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空. /// <summary>

Hadoop高级编程之为Hadoop实现构建企业级安全解决方案

本章内容提要 ●    理解企业级应用的安全顾虑 ●    理解Hadoop尚未为企业级应用提供的安全机制 ●    考察用于构建企业级安全解决方案的方法 第10章讨论了Hadoop安全性以及Hadoop中用于提供安全控制的机制.当构建企业级安全解决方案(它可能会围绕着与Hadoop数据集交互的许多应用程序和企业级服务)时,保证Hadoop自身的安全仅仅是安全解决方案的一个方面.各种组织努力对数据采用一致的安全机制,而数据是从采用了不同安全策略的异构数据源中提取的.当这些组织从多个源获取数据,接