企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

  消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式。

LocalInvoker

  本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说。

HttpInvoker

  即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务端的JsonRpcProcessor身上进行处理,.net framework是通过增加IHttpAsyncHandler处理类处理调用请求,需要在web.config中增加处理handlers,

<add name="jsonrpc" type="CK.Sprite.JsonRpcFramework.JsonRpcHandler, CK.Sprite.JsonRpcFramework" verb="*" path="/json.rpc"/>

,实现类代码如下:

public class JsonRpcHandler : IHttpAsyncHandler
    {
        public IAsyncResult BeginProcessRequest(HttpContext context, AsyncCallback cb, object extraData)
        {
            context.Response.ContentType = "application/json";

            var async = new JsonRpcStateAsync(cb, context);
            async.JsonRpc = GetJsonRpcString(context.Request);
            JsonRpcProcessor.Process(async,context.Request);
            return async;
        }

        private static string GetJsonRpcString(System.Web.HttpRequest request)
        {
            string json = string.Empty;
            if (request.RequestType == "GET")
            {
                json = request.Params["jsonrpc"] ?? string.Empty;
            }
            else if (request.RequestType == "POST")
            {
                if (request.ContentType == "application/x-www-form-urlencoded")
                {
                    json = request.Params["jsonrpc"] ?? string.Empty;
                }
                else
                {
                    json = new StreamReader(request.InputStream).ReadToEnd();
                }
            }
            return json;
        }
    }

  .net core这边的做法是增加RpcHttpRouter类,实现IRouter接口,并在StartUp的Configure方法中,增加处理Router,最终处理还是落到JsonRpcProcessor身上进行处理,代码如下:

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseAbp(); //Initializes ABP framework. Should be called first.
            // ......
            app.Map("/json.rpc", rpcApp =>
            {
                rpcApp.UseJsonRpc();
            });
        }
public static IApplicationBuilder UseJsonRpc(this IApplicationBuilder app)
        {
            if (app == null)
            {
                throw new ArgumentNullException(nameof(app));
            }
            var router = new RpcHttpRouter();
            return app.UseRouter(router);
        }
public class RpcHttpRouter : IRouter
    {
        public VirtualPathData GetVirtualPath(VirtualPathContext context)
        {
            return null;
        }

        /// <summary>
        /// Takes a route/http contexts and attempts to parse, invoke, respond to an Rpc request
        /// </summary>
        /// <param name="context">Route context</param>
        /// <returns>Task for async routing</returns>
        public async Task RouteAsync(RouteContext context)
        {
            try
            {
                string jsonString = await GetJsonRpcString(context);

                string responseJson = await JsonRpcProcessor.Process(jsonString);

                if (string.IsNullOrEmpty(responseJson))
                {
                    //No response required, but status code must be 204
                    context.HttpContext.Response.StatusCode = 204;
                    context.MarkAsHandled();
                    return;
                }

                context.HttpContext.Response.ContentType = "application/json";
                await context.HttpContext.Response.WriteAsync(responseJson);

                context.MarkAsHandled();

            }
            catch (Exception ex)
            {
                context.MarkAsHandled();
            }
        }

        private async Task<string> GetJsonRpcString(RouteContext context)
        {
            string jsonString;
            if (context.HttpContext.Request.Body == null)
            {
                jsonString = null;
            }
            else
            {
                using (StreamReader streamReader = new StreamReader(context.HttpContext.Request.Body, Encoding.UTF8,
                    detectEncodingFromByteOrderMarks: true,
                    bufferSize: 1024,
                    leaveOpen: true))
                {
                    try
                    {
                        jsonString = await streamReader.ReadToEndAsync();
                    }
                    catch (TaskCanceledException ex)
                    {
                        throw new CPlatformException("Cancelled while reading the request.", ex);
                    }
                    jsonString = jsonString.Trim();
                }

            }

            return jsonString;
        }
    }

    public static class RouteContextExtensions
    {
        public static void MarkAsHandled(this RouteContext context)
        {
            context.Handler = c => Task.FromResult(0);
        }
    }

TcpInvoker

  Tcp方式的调用后续文章会继续进行分解,也是微服务核心价值的地方。

原文地址:https://www.cnblogs.com/spritekuang/p/10805629.html

时间: 2024-10-09 22:02:40

企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道的相关文章

企业级工作流解决方案(三)--微服务消息处理模型之服务端处理

1. Json-Rpc 2.0 参考地址:https://www.jsonrpc.org/specification JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议,它允许运行在基于socket,http等诸多不同消息传输环境的同一进程中.其使用JSON(RFC 4627)作为数据格式,它为简单而生. 请求对象 发送一个请求对象至服务端代表一个rpc调用, 一个请求对象包含下列成员: jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0” method包含所

企业级工作流解决方案(六)--微服务消息处理模型之与Abp集成

身份认证传递 对于Abp比较熟悉的朋友应该对他里面的用户身份认证比较熟悉,他是通过实现微软提供的权限认证方式实现的,用户登录身份信息存储在System.Security.Claims.ClaimsPrincipal里面,但是用户的身份信息如何在不同的服务之间传递呢,不可能每一个服务都必须实现这套身份认证吧?比如我们请求调用过程如下: Portal站点获取用户信息没有问题,但如何传递到调用的其他微服务呢? 这在之前文章中提到的传输Header就起到了作用,有两种方式可以处理,第一种我们可以直接把a

企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务? 这个就涉及到服务发现.服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,

企业级工作流解决方案(一)--总体介绍

引言:国内对于流程引擎的介绍非常的少,但是不能否认流程引擎的重要性,流程引擎在各个行业都有应用,OA管理的请假流程.出差流程,项目管理上的合同审批流程.验收流程.启动流程,Erp中的采购流程.入库出库流程,政府里面的招标流程.结算流程等,都有流程引擎的身影,当然这里说的流程指审批意义的流程,还有一些不用人为参与的生产作业生产过程也是可以用工作流来解决的. 工作了多年,是时候把工作中的一些东西沉淀下来,准备写一系列文章,系统的介绍企业级工作流管理平台的搭建以及设计思路,希望能对其他人有所帮助.这一

微服务操作模型

这里并不是介绍微服务概念,如需要了解微服务,可以阅读Fowler-Microservices文章.本博客假定我们已开始使用微服务解耦单体应用,用来提升可部署性和可扩展性. 当我们在系统范围内部署大量的微服务时,一个新的挑战产生了,单体应用部署时不会发生.这篇文章将针对这些新的挑战,在系统范围内部署大量微服务时定义一套操作模型(operations model). 这篇文章分为如下几个部分: 前提条件: 扩展: 问题: 需要的组件: 参考模型: 下一步: 1. 前提条件 当在系统范围内需要部署大量

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

服务端启动 服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器. JsonRpcServerModule代码如下,见备注说明 [DependsOn(typeof(AbpKernelModule))] public class JsonRpcServerModule : AbpModule { public override vo

企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理

客户端启动 客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中.2. 指定客户端编解码器工厂.3. 预连接,即预先建立与服务端的通信Chanel. [DependsOn(typeof(AbpKernelModule))] public class JsonRpcClientModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketClientConfig

企业级工作流解决方案(二)--微服务总体介绍

微服务好处和概念性的东西就不介绍了,对于微服务,个人认为并不是越复杂就越好,相反要结合自己公司的现状,适当的做一些裁剪,比如对于规模和业务量不是特别大的企业,就没有必要把服务总线,服务健康检查,服务路由选择,熔断等等加进来,相反,这一部分职责可以通过配置文件,nginx代理,api网关等等外围的技术来控制,当企业达到一定的规模之后,再来完善这部分内容,但是对于微服务处理过程来说,没有任何影响. 我这里根据json-rpc 2.0标准,结合网上一些开源的微服务架构思想,采用dotnetty技术,搭

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空. /// <summary>