企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

服务端启动

  服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器。

  JsonRpcServerModule代码如下,见备注说明

[DependsOn(typeof(AbpKernelModule))]
    public class JsonRpcServerModule : AbpModule
    {
        public override void PreInitialize()
        {
            // 注册客户端配置,固定从Xml文件读取
            SocketServiceConfiguration socketServiceConfiguration = XmlConfigProvider.GetConfig<SocketServiceConfiguration>("SocketServiceConfiguration.xml");
            IocManager.IocContainer.Register(
                Component
                    .For<ISocketServiceConfiguration>()
                    .Instance(socketServiceConfiguration)
            );
            IocManager.Register<IServiceExecutor, DefaultServiceExecutor>(Dependency.DependencyLifeStyle.Singleton);
        }

        public override void Initialize()
        {
            IocManager.RegisterAssemblyByConvention(typeof(JsonRpcServerModule).GetAssembly());
            var socketServiceConfiguration = Configuration.Modules.RpcServiceConfig();
            switch (socketServiceConfiguration.MessageCode) //  根据配置文件,编解码配置选择
            {
                case EMessageCode.Json:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.MessagePack:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.ProtoBuffer:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                default:
                    break;
            }

            RegisterDefaultProtocol();
        }

        public override void PostInitialize()
        {
            var socketServiceConfiguration = IocManager.Resolve<ISocketServiceConfiguration>();
            // 方法里面调用ServiceHost构造函数传入的委托,启动dotnetty监听
            IocManager.Resolve<IServiceHost>().StartAsync(new IpAddressModel("0.0.0.0", socketServiceConfiguration.Port).CreateEndPoint());

            // 从配置文件读取json-rpc服务配置,解析消息处理模型
            JsonRpcRegister.LoadFromConfig(IocManager);
        }

        private void RegisterDefaultProtocol()
        {
            var dotNettyServerMessageListener = new DotNettyServerMessageListener(Logger,
                       IocManager.Resolve<ITransportMessageCodecFactory>(), IocManager.Resolve<ISocketServiceConfiguration>());

            IocManager.IocContainer.Register(
                Component
                    .For<IMessageListener>()
                    .Instance(dotNettyServerMessageListener)
            );

            var serviceExecutor = IocManager.Resolve<IServiceExecutor>();

            // 新建一个ServiceHost对象,放入容器,这个时候dotnetty还未启动,只是定义了执行方法。
            var serverHost = new DefaultServiceHost(async endPoint =>
            {
                await dotNettyServerMessageListener.StartAsync(endPoint); // 启动dotnetty监听
                return dotNettyServerMessageListener;
            }, serviceExecutor);

            IocManager.IocContainer.Register(
                Component
                    .For<IServiceHost>()
                    .Instance(serverHost)
            );
        }
}

  Dotnetty启动监听代码,参考dotnetty提供的实例代码,ServerHandler为自定义消息处理Chanel

/// <summary>
        /// 触发接收到消息事件。
        /// </summary>
        /// <param name="sender">消息发送者。</param>
        /// <param name="message">接收到的消息。</param>
        /// <returns>一个任务。</returns>
        public async Task OnReceived(IMessageSender sender, TransportMessage message)
        {
            if (Received == null)
                return;
            await Received(sender, message);
        }
public async Task StartAsync(EndPoint endPoint)
        {
            _logger.Debug($"准备启动服务主机,监听地址:{endPoint}。");

            IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1);
            IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2
            var bootstrap = new ServerBootstrap();

            bossGroup = new MultithreadEventLoopGroup(1);
            workerGroup = new MultithreadEventLoopGroup();
            bootstrap.Channel<TcpServerSocketChannel>();
            bootstrap
            .Option(ChannelOption.SoBacklog, _socketServiceConfiguration.Backlog)
            .ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default)
            .Group(bossGroup, workerGroup)
            .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
            {
                var pipeline = channel.Pipeline;
                pipeline.AddLast(new LengthFieldPrepender(4));
                pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
                pipeline.AddLast(new ServerHandler(async (contenxt, message) =>
                {
                    var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt);
                    await OnReceived(sender, message);
                }, _logger));
            }));
            try
            {
                _channel = await bootstrap.BindAsync(endPoint);
                _logger.Debug($"服务主机启动成功,监听地址:{endPoint}。");
            }
            catch
            {
                _logger.Error($"服务主机启动失败,监听地址:{endPoint}。 ");
            }
        }

  消息最终经过解码处理之后,会落到DefaultServiceExecutor类进行处理,在这里调用JsonRpcProcessor静态类的Process方法,处理Json-Rpc请求,并构造答复消息,答复客户端。

public class DefaultServiceExecutor : IServiceExecutor
    {
        private readonly ILogger _logger;
        public DefaultServiceExecutor(ILogger logger)
        {
            _logger = logger;
        }
        public async Task ExecuteAsync(IMessageSender sender, TransportMessage message)
        {
            _logger.Debug("服务提供者接收到消息");

            if (!message.IsInvokeMessage())
                return;

            JsonRequest jsonRequest;
            try
            {
                jsonRequest = message.GetContent<JsonRequest>();
            }
            catch (Exception exception)
            {
                _logger.Error("将接收到的消息反序列化成 TransportMessage<JsonRequest> 时发送了错误。", exception);
                return;
            }

            _logger.Debug("准备执行本地逻辑。");

            var resultMessage = await LocalExecuteAsync(jsonRequest, message.Headers);

            //向客户端发送调用结果。
            await SendRemoteInvokeResult(sender, message.Id, JsonConvert.DeserializeObject<JsonResponse>(resultMessage));
        }

        private async Task<string> LocalExecuteAsync(JsonRequest jsonRequest,object headers)
        {
            return await JsonRpcProcessor.Process(JsonConvert.SerializeObject(jsonRequest), headers);
        }

        private async Task SendRemoteInvokeResult(IMessageSender sender, string messageId, JsonResponse resultMessage)
        {
            try
            {

                _logger.Debug("准备发送响应消息。");

                await sender.SendAndFlushAsync(TransportMessage.CreateInvokeResultMessage(messageId, resultMessage, new NameValueCollection()));
                _logger.Debug("响应消息发送成功。");
            }
            catch (Exception exception)
            {
                _logger.Error("发送响应消息时候发生了异常。", exception);
            }
        }
    }

这部分内容没有太多的说明,参见surging

原文地址:https://www.cnblogs.com/spritekuang/p/10805768.html

时间: 2024-08-28 21:25:29

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理的相关文章

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空. /// <summary>

企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理

客户端启动 客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中.2. 指定客户端编解码器工厂.3. 预连接,即预先建立与服务端的通信Chanel. [DependsOn(typeof(AbpKernelModule))] public class JsonRpcClientModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketClientConfig

企业级工作流解决方案(一)--总体介绍

引言:国内对于流程引擎的介绍非常的少,但是不能否认流程引擎的重要性,流程引擎在各个行业都有应用,OA管理的请假流程.出差流程,项目管理上的合同审批流程.验收流程.启动流程,Erp中的采购流程.入库出库流程,政府里面的招标流程.结算流程等,都有流程引擎的身影,当然这里说的流程指审批意义的流程,还有一些不用人为参与的生产作业生产过程也是可以用工作流来解决的. 工作了多年,是时候把工作中的一些东西沉淀下来,准备写一系列文章,系统的介绍企业级工作流管理平台的搭建以及设计思路,希望能对其他人有所帮助.这一

企业级工作流解决方案(二)--微服务总体介绍

微服务好处和概念性的东西就不介绍了,对于微服务,个人认为并不是越复杂就越好,相反要结合自己公司的现状,适当的做一些裁剪,比如对于规模和业务量不是特别大的企业,就没有必要把服务总线,服务健康检查,服务路由选择,熔断等等加进来,相反,这一部分职责可以通过配置文件,nginx代理,api网关等等外围的技术来控制,当企业达到一定的规模之后,再来完善这部分内容,但是对于微服务处理过程来说,没有任何影响. 我这里根据json-rpc 2.0标准,结合网上一些开源的微服务架构思想,采用dotnetty技术,搭

企业级工作流解决方案(三)--微服务消息处理模型之服务端处理

1. Json-Rpc 2.0 参考地址:https://www.jsonrpc.org/specification JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议,它允许运行在基于socket,http等诸多不同消息传输环境的同一进程中.其使用JSON(RFC 4627)作为数据格式,它为简单而生. 请求对象 发送一个请求对象至服务端代表一个rpc调用, 一个请求对象包含下列成员: jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0” method包含所

企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式. LocalInvoker 本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说. HttpInvoker 即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务

企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务? 这个就涉及到服务发现.服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,

企业级工作流解决方案(六)--微服务消息处理模型之与Abp集成

身份认证传递 对于Abp比较熟悉的朋友应该对他里面的用户身份认证比较熟悉,他是通过实现微软提供的权限认证方式实现的,用户登录身份信息存储在System.Security.Claims.ClaimsPrincipal里面,但是用户的身份信息如何在不同的服务之间传递呢,不可能每一个服务都必须实现这套身份认证吧?比如我们请求调用过程如下: Portal站点获取用户信息没有问题,但如何传递到调用的其他微服务呢? 这在之前文章中提到的传输Header就起到了作用,有两种方式可以处理,第一种我们可以直接把a

.Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信

RabbitMQ是什么,怎么使用我就不介绍了,大家可以到园子里搜一下教程.本篇的重点在于实现服务与服务之间的异步通信. 首先说一下为什么要使用消息队列来实现服务通信:1.提高接口并发能力.  2.保证服务各方数据最终一致.  3.解耦. 使用消息队列通信的有点就是直接调用的缺点,比如在直接调用过程中发生未知错误,很可能就会出现数据不一致的问题,这个时候就需要人工修补数据,如果有过这个经历的同学一定是可怜的,人工修补数据简直痛苦!!再比如高并发情况下接口直接挂点,这就更直白了,接口挂了,功能就挂了