企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

  Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging

  Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。

/// <summary>
    /// 传输消息模型。
    /// </summary>
    public class TransportMessage
    {

        public TransportMessage()
        {
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage(object content,object headers)
        {
            if (content == null)
                throw new ArgumentNullException(nameof(content));

            Content = content;
            Headers = headers;
            ContentType = content.GetType().FullName;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage(object content, object headers, string fullName)
        {
            if (content == null)
                throw new ArgumentNullException(nameof(content));

            Headers = headers;
            Content = content;
            ContentType = fullName;
        }

        /// <summary>
        /// 消息Id。
        /// </summary>
        public string Id { get; set; }

        /// <summary>
        /// 消息内容。
        /// </summary>
        public object Content { get; set; }

        /// <summary>
        /// 消息传输Header
        /// </summary>
        public object Headers { get; set; }

        /// <summary>
        /// 内容类型。
        /// </summary>
        public string ContentType { get; set; }

        /// <summary>
        /// 是否调用消息。
        /// </summary>
        /// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        /// <summary>
        /// 是否是调用结果消息。
        /// </summary>
        /// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        /// <summary>
        /// 获取内容。
        /// </summary>
        /// <typeparam name="T">内容类型。</typeparam>
        /// <returns>内容实例。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T GetContent<T>()
        {
            return (T)Content;
        }

        /// <summary>
        /// 获取Header。
        /// </summary>
        /// <typeparam name="T">Header类型。</typeparam>
        /// <returns>Header实例。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T GetHeaders<T>()
        {
            return (T)Headers;
        }

        /// <summary>
        /// 创建一个调用传输消息。
        /// </summary>
        /// <param name="invokeMessage">调用实例。</param>
        /// <returns>调用传输消息。</returns>
        public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection)
        {
            return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName)
            {
                Id = Guid.NewGuid().ToString("N")
            };
        }

        /// <summary>
        /// 创建一个调用结果传输消息。
        /// </summary>
        /// <param name="id">消息Id。</param>
        /// <param name="invokeResultMessage">调用结果实例。</param>
        /// <returns>调用结果传输消息。</returns>
        public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection)
        {
            return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName)
            {
                Id = id
            };
        }
    }

  TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码

消息编解码器

public interface ITransportMessageEncoder
    {
        byte[] Encode(TransportMessage message);
}
public interface ITransportMessageDecoder
    {
        TransportMessage Decode(byte[] data);
    }

Json编解码

  平时编码中经常用的方式

public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder
    {
        #region Implementation of ITransportMessageEncoder

        public byte[] Encode(TransportMessage message)
        {
            var content = JsonConvert.SerializeObject(message);
            return Encoding.UTF8.GetBytes(content);
        }

        #endregion Implementation of ITransportMessageEncoder
}

public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder
    {
        #region Implementation of ITransportMessageDecoder

        public TransportMessage Decode(byte[] data)
        {
            var content = Encoding.UTF8.GetString(data);
            var message = JsonConvert.DeserializeObject<TransportMessage>(content);
            if (message.IsInvokeMessage())
            {
                message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString());
            }
            if (message.IsInvokeResultMessage())
            {
                message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString());
            }
            return message;
        }

        #endregion Implementation of ITransportMessageDecoder
    }

MessagePack

  官网地址:https://msgpack.org/

  贴出代码,不过多的解释

[MessagePackObject]
    public class MessagePackTransportMessage
    {
        public MessagePackTransportMessage(TransportMessage transportMessage)
        {
            Id = transportMessage.Id;
            ContentType = transportMessage.ContentType;

            object contentObject;
            if (transportMessage.IsInvokeMessage())
            {
                contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>());
            }
            else if (transportMessage.IsInvokeResultMessage())
            {
                contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>());
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }

            Content = SerializerUtilitys.Serialize(contentObject);

            var headersObject = transportMessage.GetHeaders<NameValueCollection>();

            Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject)));
        }

        public MessagePackTransportMessage()
        {
        }

        [Key(0)]
        public string Id { get; set; }

        [Key(1)]
        public byte[] Content { get; set; }

        [Key(2)]
        public byte[] Headers { get; set; }

        [Key(3)]
        public string ContentType { get; set; }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        public TransportMessage GetTransportMessage()
        {
            var message = new TransportMessage
            {
                ContentType = ContentType,
                Id = Id,
                Content = null,
                Headers = null,
            };

            object contentObject;
            if (IsInvokeMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest();
            }
            else if (IsInvokeResultMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content)
                        .GetJsonResponse();
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }
            message.Content = contentObject;
            var headers = SerializerUtilitys.Deserialize<string>(Headers);
            message.Headers = JsonConvert.DeserializeObject(headers);
            return message;
        }
}

public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder
    {
        #region Implementation of ITransportMessageEncoder

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte[] Encode(TransportMessage message)
        {
            var transportMessage = new MessagePackTransportMessage(message)
            {
                Id = message.Id,
                ContentType = message.ContentType,
            };
            return SerializerUtilitys.Serialize(transportMessage);
        }
        #endregion Implementation of ITransportMessageEncoder
    }

public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder
    {
        #region Implementation of ITransportMessageDecoder

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage Decode(byte[] data)
        {
            var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data);
            return message.GetTransportMessage();
        }

        #endregion Implementation of ITransportMessageDecoder
    }

ProtoBuffer

  这个应该听得比较多

[ProtoContract]
    public class ProtoBufferTransportMessage
    {
        public ProtoBufferTransportMessage(TransportMessage transportMessage)
        {
            Id = transportMessage.Id;
            ContentType = transportMessage.ContentType;

            object contentObject;
            if (transportMessage.IsInvokeMessage())
            {
                contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>());
            }
            else if (transportMessage.IsInvokeResultMessage())
            {
                contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>());
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }

            Content = SerializerUtilitys.Serialize(contentObject);
            Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>());
        }

        public ProtoBufferTransportMessage()
        {
        }

        [ProtoMember(1)]
        public string Id { get; set; }

        [ProtoMember(2)]
        public byte[] Content { get; set; }

        [ProtoMember(3)]
        public byte[] Headers { get; set; }

        [ProtoMember(4)]
        public string ContentType { get; set; }

        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        public TransportMessage GetTransportMessage()
        {
            var message = new TransportMessage
            {
                ContentType = ContentType,
                Id = Id,
                Content = null,
                Headers = null,
            };

            object contentObject;
            if (IsInvokeMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest();
            }
            else if (IsInvokeResultMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content)
                        .GetJsonResponse();
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }

            message.Content = contentObject;
            message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers);

            return message;
        }
}

public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder
    {
        #region Implementation of ITransportMessageEncoder

        public byte[] Encode(TransportMessage message)
        {
            var transportMessage = new ProtoBufferTransportMessage(message)
            {
                Id = message.Id,
                ContentType = message.ContentType,
            };

            return SerializerUtilitys.Serialize(transportMessage);
        }

        #endregion Implementation of ITransportMessageEncoder
    }

public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder
    {
        #region Implementation of ITransportMessageDecoder

        public TransportMessage Decode(byte[] data)
        {
            var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data);
            return message.GetTransportMessage();
        }

        #endregion Implementation of ITransportMessageDecoder
    }

原文地址:https://www.cnblogs.com/spritekuang/p/10805754.html

时间: 2024-09-28 11:19:08

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码的相关文章

企业级工作流解决方案(一)--总体介绍

引言:国内对于流程引擎的介绍非常的少,但是不能否认流程引擎的重要性,流程引擎在各个行业都有应用,OA管理的请假流程.出差流程,项目管理上的合同审批流程.验收流程.启动流程,Erp中的采购流程.入库出库流程,政府里面的招标流程.结算流程等,都有流程引擎的身影,当然这里说的流程指审批意义的流程,还有一些不用人为参与的生产作业生产过程也是可以用工作流来解决的. 工作了多年,是时候把工作中的一些东西沉淀下来,准备写一系列文章,系统的介绍企业级工作流管理平台的搭建以及设计思路,希望能对其他人有所帮助.这一

微服务生态的四层模型

微服务生态的四层模型 第1层:硬件层 硬件层是微服务生态的底层.这一层是服务器物理机所在的层,它们是所有微服务运行的基础.这些服务器被放置在数据中心的机架上,由供电系统供给电力,使用着昂贵的冷却系统.它们有些是某些公司私有的,有些是从所谓的"云服务提供商"那里租来的,比如 AWS EC2.GCP.阿里云等. 是自己购买服务器还是选择云服务器并不容易选择,它需要考虑购买成本.可用性.可靠性和运营成本. 管理服务器是硬件层的职责之一.每台服务器都需要安装标准的操作系统.使用哪种操作系统并没

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

服务端启动 服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器. JsonRpcServerModule代码如下,见备注说明 [DependsOn(typeof(AbpKernelModule))] public class JsonRpcServerModule : AbpModule { public override vo

企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式. LocalInvoker 本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说. HttpInvoker 即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务

企业级工作流解决方案(二)--微服务总体介绍

微服务好处和概念性的东西就不介绍了,对于微服务,个人认为并不是越复杂就越好,相反要结合自己公司的现状,适当的做一些裁剪,比如对于规模和业务量不是特别大的企业,就没有必要把服务总线,服务健康检查,服务路由选择,熔断等等加进来,相反,这一部分职责可以通过配置文件,nginx代理,api网关等等外围的技术来控制,当企业达到一定的规模之后,再来完善这部分内容,但是对于微服务处理过程来说,没有任何影响. 我这里根据json-rpc 2.0标准,结合网上一些开源的微服务架构思想,采用dotnetty技术,搭

企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务? 这个就涉及到服务发现.服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,

企业级工作流解决方案(三)--微服务消息处理模型之服务端处理

1. Json-Rpc 2.0 参考地址:https://www.jsonrpc.org/specification JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议,它允许运行在基于socket,http等诸多不同消息传输环境的同一进程中.其使用JSON(RFC 4627)作为数据格式,它为简单而生. 请求对象 发送一个请求对象至服务端代表一个rpc调用, 一个请求对象包含下列成员: jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0” method包含所

企业级工作流解决方案(六)--微服务消息处理模型之与Abp集成

身份认证传递 对于Abp比较熟悉的朋友应该对他里面的用户身份认证比较熟悉,他是通过实现微软提供的权限认证方式实现的,用户登录身份信息存储在System.Security.Claims.ClaimsPrincipal里面,但是用户的身份信息如何在不同的服务之间传递呢,不可能每一个服务都必须实现这套身份认证吧?比如我们请求调用过程如下: Portal站点获取用户信息没有问题,但如何传递到调用的其他微服务呢? 这在之前文章中提到的传输Header就起到了作用,有两种方式可以处理,第一种我们可以直接把a

微服务实战(二):落地微服务架构到直销系统(构建消息总线框架接口)

从上一篇文章大家可以看出,实现一个自己的消息总线框架是非常重要的内容,消息总线可以将界限上下文之间进行解耦,也可以为大并发访问提供必要的支持. 消息总线的作用: 1.界限上下文解耦:在DDD第一波文章中,当更新了订单信息后,我们通过调用经销商界限上下文的领域模型和仓储,进行了经销商信息的更新,这造成了耦合.通过一个消息总线,可以在订单界限上下文的WebApi服务(来源微服务-生产者)更新了订单信息后,发布一个事件消息到消息总线的某个队列中,经销商界限上下文的WebApi服务(消费者)订阅这个事件