企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理

客户端启动

  客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中。2. 指定客户端编解码器工厂。3. 预连接,即预先建立与服务端的通信Chanel。

[DependsOn(typeof(AbpKernelModule))]
    public class JsonRpcClientModule : AbpModule
    {
        public override void PreInitialize()
        {
            // 注册客户端配置,固定从Xml文件读取
            SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml");
            IocManager.IocContainer.Register(
                Component
                    .For<ISocketClientConfiguration>()
                    .Instance(socketClientConfiguration)
            );
            switch (socketClientConfiguration.MessageCode)
            {
                case EMessageCode.Json:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.MessagePack:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.ProtoBuffer:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
            }
        }

        public override void Initialize()
        {
            IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly());
            var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger);

            IocManager.IocContainer.Register(
                Component
                    .For<ITransportClientFactory>()
                    .Instance(dotNettyTransportClientFactory)
            );
        }

        public override void PostInitialize()
        {
            var socketClientConfiguration = Configuration.Modules.RpcClientConfig();
            var transportClientFactory = IocManager.Resolve<ITransportClientFactory>();
            try
            {
                foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接
                {
                    if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp)
                    {
                        var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { ‘:‘ }, StringSplitOptions.RemoveEmptyEntries);
                        transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint());
                    }
                }
            }
            catch(Exception ex) // 预连,出错不处理
            {

            }
        }
    }

客户端全局Chanel设计

  每一个服务连接创建一个TransportClient与之对应,存储在全局变量中private readonly ConcurrentDictionary<EndPoint, Lazy<ITransportClient>> _clients = new ConcurrentDictionary<EndPoint, Lazy<ITransportClient>>();

  TransportClient即处理客户端传输消息对象,每当发起客户端调用时,创建transportClient对象,并存储到_clients集合中,下次对同一个服务端调用时,直接复用此对象,如果与服务器的通信Chanel断开,则从_clients对象中移除,达到了复用Chanel的作用。

/// <summary>
        /// 创建客户端。
        /// </summary>
        /// <param name="endPoint">终结点。</param>
        /// <returns>传输客户端实例。</returns>
        public ITransportClient CreateClient(EndPoint endPoint)
        {
            var key = endPoint;
            _logger.Debug($"准备为服务端地址:{key}创建客户端。");
            try
            {
                return _clients.GetOrAdd(key
                    , k => new Lazy<ITransportClient>(() =>
                    {
                        var bootstrap = _bootstrap;
                        var channel = bootstrap.ConnectAsync(k).Result;
                        var messageListener = new MessageListener();
                        channel.GetAttribute(messageListenerKey).Set(messageListener);
                        var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel);
                        channel.GetAttribute(messageSenderKey).Set(messageSender);
                        channel.GetAttribute(origEndPointKey).Set(k);
                        var client = new TransportClient(messageSender, messageListener, _logger);
                        return client;
                    }
                    )).Value;
            }
            catch
            {
                _clients.TryRemove(key, out var value);
                var ipEndPoint = endPoint as IPEndPoint;
                throw;
            }
        }
protected class DefaultChannelHandler : ChannelHandlerAdapter
        {

            public override void ChannelInactive(IChannelHandlerContext context)
            {
                _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value);
            }
        }

TransportClient

  默认的客户端传输实现,Rpc调用时,直接组装请求参数,调用SendAsync方法。注意里面的ManualResetValueTaskSource的设计。

/// <summary>
    /// 一个默认的传输客户端实现。
    /// </summary>
    public class TransportClient : ITransportClient, IDisposable
    {
        #region Field

        private readonly IMessageSender _messageSender;
        private readonly IMessageListener _messageListener;
        private readonly ILogger _logger;

        private readonly ConcurrentDictionary<string, ManualResetValueTaskSource<TransportMessage>> _resultDictionary =
            new ConcurrentDictionary<string, ManualResetValueTaskSource<TransportMessage>>();

        #endregion Field

        #region Constructor

        public TransportClient(IMessageSender messageSender, IMessageListener messageListener, ILogger logger)
        {
            _messageSender = messageSender;
            _messageListener = messageListener;
            _logger = logger;
            messageListener.Received += MessageListener_Received;
        }

        #endregion Constructor

        #region Implementation of ITransportClient

        /// <summary>
        /// 发送消息。
        /// </summary>
        /// <param name="message">远程调用消息模型。</param>
        /// <returns>远程调用消息的传输消息。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public async Task<JsonResponse> SendAsync(JsonRequest message, NameValueCollection contextNameValueCollection, CancellationToken cancellationToken)
        {
            try
            {
                _logger.Debug("准备发送消息。");

                var transportMessage = TransportMessage.CreateInvokeMessage(message, contextNameValueCollection);

                //注册结果回调
                var callbackTask = RegisterResultCallbackAsync(transportMessage.Id, cancellationToken);

                try
                {
                    //发送
                    await _messageSender.SendAndFlushAsync(transportMessage);
                }
                catch (Exception exception)
                {
                    throw new CommunicationException("与服务端通讯时发生了异常。", exception);
                }

                _logger.Debug("消息发送成功。");

                return await callbackTask;
            }
            catch (Exception exception)
            {
                _logger.Error("消息发送失败。");
                throw;
            }
        }

        #endregion Implementation of ITransportClient

        #region Implementation of IDisposable

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            (_messageSender as IDisposable)?.Dispose();
            (_messageListener as IDisposable)?.Dispose();
            foreach (var taskCompletionSource in _resultDictionary.Values)
            {
                taskCompletionSource.SetCanceled();
            }
        }

        #endregion Implementation of IDisposable

        #region Private Method

        /// <summary>
        /// 注册指定消息的回调任务。
        /// </summary>
        /// <param name="id">消息Id。</param>
        /// <returns>远程调用结果消息模型。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        private async Task<JsonResponse> RegisterResultCallbackAsync(string id, CancellationToken cancellationToken)
        {
            _logger.Debug($"准备获取Id为:{id}的响应内容。");

            var task = new ManualResetValueTaskSource<TransportMessage>();
            _resultDictionary.TryAdd(id, task);
            try
            {
                var result = await task.AwaitValue(cancellationToken);
                return result.GetContent<JsonResponse>();
            }
            finally
            {
                //删除回调任务
                ManualResetValueTaskSource<TransportMessage> value;
                _resultDictionary.TryRemove(id, out value);
                value.SetCanceled();
            }
        }

        private async Task MessageListener_Received(IMessageSender sender, TransportMessage message)
        {
            _logger.Debug("服务消费者接收到消息。");

            ManualResetValueTaskSource<TransportMessage> task;
            if (!_resultDictionary.TryGetValue(message.Id, out task))
                return;

            if (message.IsInvokeResultMessage())
            {
                var content = message.GetContent<JsonResponse>();
                if (content.Error != null)
                {
                    task.SetException(content.Error);
                }
                else
                {
                    task.SetResult(message);
                }
            }
        }

        #endregion Private Method
    }

原文地址:https://www.cnblogs.com/spritekuang/p/10805780.html

时间: 2024-10-12 11:03:12

企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理的相关文章

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空. /// <summary>

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

服务端启动 服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器. JsonRpcServerModule代码如下,见备注说明 [DependsOn(typeof(AbpKernelModule))] public class JsonRpcServerModule : AbpModule { public override vo

企业级工作流解决方案(一)--总体介绍

引言:国内对于流程引擎的介绍非常的少,但是不能否认流程引擎的重要性,流程引擎在各个行业都有应用,OA管理的请假流程.出差流程,项目管理上的合同审批流程.验收流程.启动流程,Erp中的采购流程.入库出库流程,政府里面的招标流程.结算流程等,都有流程引擎的身影,当然这里说的流程指审批意义的流程,还有一些不用人为参与的生产作业生产过程也是可以用工作流来解决的. 工作了多年,是时候把工作中的一些东西沉淀下来,准备写一系列文章,系统的介绍企业级工作流管理平台的搭建以及设计思路,希望能对其他人有所帮助.这一

微服务生态的四层模型

微服务生态的四层模型 第1层:硬件层 硬件层是微服务生态的底层.这一层是服务器物理机所在的层,它们是所有微服务运行的基础.这些服务器被放置在数据中心的机架上,由供电系统供给电力,使用着昂贵的冷却系统.它们有些是某些公司私有的,有些是从所谓的"云服务提供商"那里租来的,比如 AWS EC2.GCP.阿里云等. 是自己购买服务器还是选择云服务器并不容易选择,它需要考虑购买成本.可用性.可靠性和运营成本. 管理服务器是硬件层的职责之一.每台服务器都需要安装标准的操作系统.使用哪种操作系统并没

WCF 客户端与服务端消息传输

WCF很多需要认证信息,保证服务的安全,可以使用消息来实现 WCF 实现消息的方式: WCF中有两个接口: IClientMessageInspector [定义一个消息检查器对象,该对象可以添加到 System.ServiceModel.Dispatcher.ClientRuntime.MessageInspectors集合来查看或修改消息] IDispatchMessageInspector  [定义一些方法,通过这些方法,可以在服务应用程序中对入站和出站应用程序消息进行自定义检查或修改.]

.net core ——微服务内通信Thrift和Http客户端响应比较

原文:.net core --微服务内通信Thrift和Http客户端响应比较 目录 1.Benchmark介绍 2.测试下微服务访问效率 3.结果 引用链接 1.Benchmark介绍 wiki中有定义:基准测试是运行计算机程序,一组程序或其他操作的行为,以便评估对象的相对性能,通常是通过对其运行许多标准测试和试验. 目前许多成熟的github开源项目,均采用Benchmark测试结果作为性能依据.在 .net 代码世界中,当然是使用 BenchmarkDotNet类库. 其支持 : 编程环境

企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式. LocalInvoker 本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说. HttpInvoker 即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务

企业级工作流解决方案(二)--微服务总体介绍

微服务好处和概念性的东西就不介绍了,对于微服务,个人认为并不是越复杂就越好,相反要结合自己公司的现状,适当的做一些裁剪,比如对于规模和业务量不是特别大的企业,就没有必要把服务总线,服务健康检查,服务路由选择,熔断等等加进来,相反,这一部分职责可以通过配置文件,nginx代理,api网关等等外围的技术来控制,当企业达到一定的规模之后,再来完善这部分内容,但是对于微服务处理过程来说,没有任何影响. 我这里根据json-rpc 2.0标准,结合网上一些开源的微服务架构思想,采用dotnetty技术,搭

企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务? 这个就涉及到服务发现.服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,