企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

  微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务?

  这个就涉及到服务发现、服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,服务的寻址和健康检查就通过nginx来实现。等企业足够强大的时候,再来补全这部分内容,但对于微服务Rpc调用过程来说,没有任何影响。

客户端配置

  客户端配置文件,格式如下:

<!--MessageCode(Json = 1,MessagePack = 2,ProtoBuffer = 3)-->
<SocketClientConfiguration MessageCode="MessagePack">
  <ClientConnectServerInfos>
    <!--Url(Http连接服务方式为Url地址,Tcp连接方式为:IP:Port)、ConnectServerType(Tcp = 1,Http = 2,Local = 3)、ServerName(服务名称)-->
    <!--<ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Tcp" Url="127.0.0.1:1314"></ClientConnectServerInfo>-->
    <ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Http" Url="http://localhost:9527/json.rpc"></ClientConnectServerInfo>
    <ClientConnectServerInfo ServerName="FlowDefineService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo>
    <ClientConnectServerInfo ServerName="WorkflowRuntimeService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo>
  </ClientConnectServerInfos>
</SocketClientConfiguration>

  ServerName即服务名称,与之前文章介绍的服务名称必须一致,ConnectServerType即传输方式,如果传输方式为Local的,可以不在配置文件里面配置,系统自动在本地服务中查找服务调用,Url为服务地址,Tcp方式为”IP地址:端口”,客户端启动的时候,会读取配置文件,存储到全局配置里面,代码如下:

[DependsOn(typeof(AbpKernelModule))]
    public class JsonRpcClientModule : AbpModule
    {
        public override void PreInitialize()
        {
            // 注册客户端配置,固定从Xml文件读取
            SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml");
            IocManager.IocContainer.Register(
                Component
                    .For<ISocketClientConfiguration>()
                    .Instance(socketClientConfiguration)
            );
            switch (socketClientConfiguration.MessageCode)
            {
                case EMessageCode.Json:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.MessagePack:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.ProtoBuffer:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
            }
        }

        public override void Initialize()
        {
            IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly());
            var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger);

            IocManager.IocContainer.Register(
                Component
                    .For<ITransportClientFactory>()
                    .Instance(dotNettyTransportClientFactory)
            );
        }

        public override void PostInitialize()
        {
            var socketClientConfiguration = Configuration.Modules.RpcClientConfig();
            var transportClientFactory = IocManager.Resolve<ITransportClientFactory>();
            try
            {
                foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接
                {
                    if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp)
                    {
                        var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { ‘:‘ }, StringSplitOptions.RemoveEmptyEntries);
                        transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint());
                    }
                }
            }
            catch(Exception ex) // 预连,出错不处理
            {

            }
        }
    }

客户端调用

  客户端调用示例:var loginResult = Rpc.Call<AuthenticateResultModel>("AuthCenterService.AuthCenterServiceAppService.Authenticate", userId, password);

  代码实现如下:

/// <summary>
        /// 调用JSON-RPC服务(服务地址需要配置在App.config中的Setting下的JsonRpcServiceUrl的Key值)。
        /// </summary>
        /// <typeparam name="T">JSON-RPC方法返回的Result的对象类型。</typeparam>
        /// <param name="method">JSON-RPC方法名。</param>
        /// <param name="args">JSON-RPC方法接收的参数,此参数为可变数组</param>
        /// <returns></returns>
        public static T Call<T>(string method, params object[] args)
        {
            var rpcInvoker = GetRpcInvoker(method, null);
            var jresp = rpcInvoker.Invoke<T>(method, args);
            if (jresp.Error != null)
                throw jresp.Error;

            return jresp.Result;
        }

private static RpcInvoker GetRpcInvoker(string method, RpcOption option)
        {
            var socketClientConfiguration = Dependency.IocManager.Instance.Resolve<ISocketClientConfiguration>();
            var methodInfos = method.Split(new char[] { ‘.‘ });
            var callConfiguration = socketClientConfiguration.ClientConnectServerInfos.FirstOrDefault(r => r.ServerName == methodInfos[0]);

            RpcInvoker invoker = null;
            if(callConfiguration == null)
            {
                invoker = new LocalRpcInvoker();
            }
            else
            {
                switch (callConfiguration.ConnectServerType)
                {
                    case EConnectServerType.Tcp:
                        invoker = new TcpRpcInvoker();
                        break;
                    case EConnectServerType.Http:
                        invoker = new HttpRpcInvoker();
                        break;
                    case EConnectServerType.Local:
                        invoker = new LocalRpcInvoker();
                        break;
                    default:
                        break;
                }
                invoker.ServiceAddress = callConfiguration.Url;
            }

            invoker.Option = option;
            return invoker;
        }
    }

  三种传输方式的Invoker处理逻辑有所不同,LocalInvoker直接调用服务端处理程序处理,HttpInvoker则构造Http请求,TcpInvoker则发起Tcp调用过程传输消息,三种过程都需要处理请求Header。

internal class LocalRpcInvoker : RpcInvoker
    {
        internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
        {
            int myId;

            if (jsonRpc.Id == null)
            {
                lock (idLock)
                {
                    myId = ++id;
                }

                jsonRpc.Id = myId.ToString();
            }

            var jsonReqStr = JsonConvert.SerializeObject(jsonRpc);
            var contextNameValueCollection = new NameValueCollection();
            Rpc.GlobalContextSet?.Invoke(contextNameValueCollection);
            MergeContextValues(contextNameValueCollection, this.Option);

            var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>();
            if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务
            {
                contextNameValueCollection.Add("AccessToken", "1");
                contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString());
                contextNameValueCollection.Add("UserName", abpSession.UserName);
                contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString());
                contextNameValueCollection.Add("RoleIds", abpSession.RoleIds);

            }
            var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection);
            JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr);

            if (rjson == null)
            {
                if (!string.IsNullOrEmpty(jsonRespStr))
                {
                    JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject;
                    throw new Exception(jo["Error"].ToString());
                }
                else
                {
                    throw new Exception("Empty response");
                }
            }

            return rjson;
        }

        private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection)
        {
            return JsonRpcProcessor.Process(jsonReqStr, contextNameValueCollection).Result;
        }
    }

internal class HttpRpcInvoker : RpcInvoker
    {

        public static bool EnabledGzip = true;

        private static Stream CopyAndClose(Stream inputStream)
        {
            const int readSize = 4096;
            byte[] buffer = new byte[readSize];
            MemoryStream ms = new MemoryStream();

            int count = inputStream.Read(buffer, 0, readSize);
            while (count > 0)
            {
                ms.Write(buffer, 0, count);
                count = inputStream.Read(buffer, 0, readSize);
            }
            ms.Position = 0;
            inputStream.Close();
            return ms;
        }

        internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
        {
            HttpWebRequest req = null;

            int myId;

            if (jsonRpc.Id == null)
            {
                lock (idLock)
                {
                    myId = ++id;
                }

                jsonRpc.Id = myId.ToString();
            }

            req = WebRequest.Create(new Uri(ServiceAddress + "?callid=" + jsonRpc.Id.ToString() + "&method=" + jsonRpc.Method)) as HttpWebRequest;
            req.KeepAlive = false;
            req.Proxy = null;
            req.Method = "Post";
            req.ContentType = "application/json-rpc";
            if (Rpc.GlobalContextSet != null)
                Rpc.GlobalContextSet(req.Headers);

            var accessToken = Dependency.IocManager.Instance.Resolve<IAbpSession>().AccessToken;
            if(!string.IsNullOrEmpty(accessToken)) // 将用户Token转递到其他微服务
            {
                req.Headers.Add("Authorization", accessToken);

            }
            if(!string.IsNullOrEmpty(Rpc.GlobalAccessToken))
            {
                req.Headers.Add("Authorization", "Bearer " + Rpc.GlobalAccessToken);
            }
            MergeContextValues(req.Headers, this.Option);
            if (this.Option != null && this.Option.Timeout > 0)
            {
                req.Timeout = this.Option.Timeout;
            }
            else
            {
                req.Timeout = 400000;
            }
            req.ReadWriteTimeout = req.Timeout;
            if (EnabledGzip)
            {
                req.Headers["Accept-Encoding"] = "gzip";
            }

            var stream = new StreamWriter(req.GetRequestStream());
            var json = Newtonsoft.Json.JsonConvert.SerializeObject(jsonRpc);
            stream.Write(json);
            stream.Close();

            var resp = req.GetResponse();
            string sstream;

            string contentEncoding = resp.Headers["Content-Encoding"];

            if (contentEncoding != null && contentEncoding.Contains("gzip"))
            {
                var mstream = CopyAndClose(resp.GetResponseStream());

                using (var gstream = new GZipStream(mstream, CompressionMode.Decompress))
                {
                    using (var reader = new StreamReader(gstream, UTF8Encoding))
                    {
                        sstream = reader.ReadToEnd();
                    }
                }
            }
            else
            {
                using (var rstream = new StreamReader(CopyAndClose(resp.GetResponseStream())))
                {
                    sstream = rstream.ReadToEnd();
                }
            }
            resp.Close();
            JsonResponse<T> rjson = Newtonsoft.Json.JsonConvert.DeserializeObject<JsonResponse<T>>(sstream);

            if (rjson == null)
            {
                if (!string.IsNullOrEmpty(sstream))
                {
                    JObject jo = Newtonsoft.Json.JsonConvert.DeserializeObject(sstream) as JObject;
                    throw new Exception(jo["Error"].ToString());
                }
                else
                {
                    throw new Exception("Empty response");
                }
            }

            return rjson;
        }

    }

internal class TcpRpcInvoker : RpcInvoker
    {
        internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
        {
            int myId;

            if (jsonRpc.Id == null)
            {
                lock (idLock)
                {
                    myId = ++id;
                }

                jsonRpc.Id = myId.ToString();
            }

            var jsonReqStr = JsonConvert.SerializeObject(jsonRpc);
            var contextNameValueCollection = new NameValueCollection();
            Rpc.GlobalContextSet?.Invoke(contextNameValueCollection);
            MergeContextValues(contextNameValueCollection, this.Option);

            var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>();
            if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务
            {
                contextNameValueCollection.Add("AccessToken", "1");
                contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString());
                contextNameValueCollection.Add("UserName", abpSession.UserName);
                contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString());
                contextNameValueCollection.Add("RoleIds", abpSession.RoleIds);

            }

            var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection,this.ServiceAddress);
            JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr);

            if (rjson == null)
            {
                if (!string.IsNullOrEmpty(jsonRespStr))
                {
                    JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject;
                    throw new Exception(jo["Error"].ToString());
                }
                else
                {
                    throw new Exception("Empty response");
                }
            }

            return rjson;
        }

        private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection, string serverAddress)
        {
            var transportClientFactory = Dependency.IocManager.Instance.Resolve<ITransportClientFactory>();
            var req = JsonConvert.DeserializeObject<JsonRequest>(jsonReqStr);
            var tcpAddress = serverAddress.Split(new char[] { ‘:‘ }, StringSplitOptions.RemoveEmptyEntries);
            var result = transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint())          .SendAsync(req, contextNameValueCollection, Task.Factory.CancellationToken).Result;
            return JsonConvert.SerializeObject(result);

        }
    }

原文地址:https://www.cnblogs.com/spritekuang/p/10805700.html

时间: 2024-08-01 11:26:32

企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理的相关文章

企业级工作流解决方案(三)--微服务消息处理模型之服务端处理

1. Json-Rpc 2.0 参考地址:https://www.jsonrpc.org/specification JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议,它允许运行在基于socket,http等诸多不同消息传输环境的同一进程中.其使用JSON(RFC 4627)作为数据格式,它为简单而生. 请求对象 发送一个请求对象至服务端代表一个rpc调用, 一个请求对象包含下列成员: jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0” method包含所

企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式. LocalInvoker 本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说. HttpInvoker 即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务

企业级工作流解决方案(六)--微服务消息处理模型之与Abp集成

身份认证传递 对于Abp比较熟悉的朋友应该对他里面的用户身份认证比较熟悉,他是通过实现微软提供的权限认证方式实现的,用户登录身份信息存储在System.Security.Claims.ClaimsPrincipal里面,但是用户的身份信息如何在不同的服务之间传递呢,不可能每一个服务都必须实现这套身份认证吧?比如我们请求调用过程如下: Portal站点获取用户信息没有问题,但如何传递到调用的其他微服务呢? 这在之前文章中提到的传输Header就起到了作用,有两种方式可以处理,第一种我们可以直接把a

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

服务端启动 服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器. JsonRpcServerModule代码如下,见备注说明 [DependsOn(typeof(AbpKernelModule))] public class JsonRpcServerModule : AbpModule { public override vo

企业级工作流解决方案(二)--微服务总体介绍

微服务好处和概念性的东西就不介绍了,对于微服务,个人认为并不是越复杂就越好,相反要结合自己公司的现状,适当的做一些裁剪,比如对于规模和业务量不是特别大的企业,就没有必要把服务总线,服务健康检查,服务路由选择,熔断等等加进来,相反,这一部分职责可以通过配置文件,nginx代理,api网关等等外围的技术来控制,当企业达到一定的规模之后,再来完善这部分内容,但是对于微服务处理过程来说,没有任何影响. 我这里根据json-rpc 2.0标准,结合网上一些开源的微服务架构思想,采用dotnetty技术,搭

企业级工作流解决方案(一)--总体介绍

引言:国内对于流程引擎的介绍非常的少,但是不能否认流程引擎的重要性,流程引擎在各个行业都有应用,OA管理的请假流程.出差流程,项目管理上的合同审批流程.验收流程.启动流程,Erp中的采购流程.入库出库流程,政府里面的招标流程.结算流程等,都有流程引擎的身影,当然这里说的流程指审批意义的流程,还有一些不用人为参与的生产作业生产过程也是可以用工作流来解决的. 工作了多年,是时候把工作中的一些东西沉淀下来,准备写一系列文章,系统的介绍企业级工作流管理平台的搭建以及设计思路,希望能对其他人有所帮助.这一

微服务操作模型

这里并不是介绍微服务概念,如需要了解微服务,可以阅读Fowler-Microservices文章.本博客假定我们已开始使用微服务解耦单体应用,用来提升可部署性和可扩展性. 当我们在系统范围内部署大量的微服务时,一个新的挑战产生了,单体应用部署时不会发生.这篇文章将针对这些新的挑战,在系统范围内部署大量微服务时定义一套操作模型(operations model). 这篇文章分为如下几个部分: 前提条件: 扩展: 问题: 需要的组件: 参考模型: 下一步: 1. 前提条件 当在系统范围内需要部署大量

企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理

客户端启动 客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中.2. 指定客户端编解码器工厂.3. 预连接,即预先建立与服务端的通信Chanel. [DependsOn(typeof(AbpKernelModule))] public class JsonRpcClientModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketClientConfig

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空. /// <summary>