使用Newlife网络库管道模式解决数据粘包(二)

上一篇我们讲了 如何创建一个基本的Newlife网络服务端 这边我们来讲一下如何解决粘包的问题

在上一篇总我们注册了Newlife的管道处理器 ,我们来看看他是如何实现粘包处理的

svr.Add<ReciveFilter>();//粘包处理管道

首先看一下我们设备的上传数据协议

设备上报的数据包头包含了固定的包头包尾,整个包的数据长度,设备编号。

包头:板卡类型,帧类型 2个字节 0x01 0x70

帧长度: 为两个字节 并且数据的字节序为  高字节在前 ,C#正常默认为低字节在前。

设备号:15位的ASCII 字符串

包尾: 两个字节 0x0D 0x0A  固定

下面来解决粘包的问题

Newlife网络库提供了几种常见的封包协议来解决粘包的问题,其中有一个 LengthFieldCodec解码器 这个解码器以长度字段作为头部 恰好符合我们的需求,我们就以这个解码器稍作改造来解决我们的粘包问题吧

由于这个解码器是适用于 只包含包头和包体的数据结构,且长度为包体的长度,而我们的协议 是包含包头包体包尾,并且帧长度为整个包的长度,长度为高位在前的数据结构,所以我们需要对整个解码器稍微做一些改造来符合我们的数据结构 。

我们来看下代码 其中

        #region 属性
        /// <summary>长度所在位置</summary>
        public Int32 Offset { get; set; }=2;

        /// <summary>长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2</summary>
        public Int32 Size { get; set; } = 2;

        /// <summary>过期时间,超过该时间后按废弃数据处理,默认500ms</summary>
        public Int32 Expire { get; set; } = 500;
        #endregion

在我们的协议中可以看到 设置了数据包的长度位置,长度占据的字节数,下面我们来获取一下整个包的长度

/// <summary>从数据流中获取整帧数据长度</summary>
        /// <param name="pk"></param>
        /// <param name="offset"></param>
        /// <param name="size"></param>
        /// <returns>数据帧长度(包含头部长度位)</returns>
        protected  Int32 GetLength(Packet pk, Int32 offset, Int32 size)
        {
            if (offset < 0) return pk.Total - pk.Offset;
            // 数据不够,连长度都读取不了
            if (offset >= pk.Total) return 0;

            // 读取大小
            var len = 0;
            switch (size)
            {
                case 2:
                    var lenArry = pk.ReadBytes(offset, 2);
                    //高位在前,反转数组,获取长度
                    Array.Reverse(lenArry);
                    len = lenArry.ToUInt16();
                    break;
                default:
                    throw new NotSupportedException();
            }

            // 判断后续数据是否足够
            if (len > pk.Total) return 0;

            return len;
        }

获取长度后我们就可以从数据流中读取一个完整的包了

        /// <summary>解码</summary>
        /// <param name="context"></param>
        /// <param name="pk"></param>
        /// <returns></returns>
        protected override IList<Packet> Decode(IHandlerContext context, Packet pk)
        {
            var ss = context.Owner as IExtend;
            var mcp = ss["CodecItem"] as CodecItem;
            if (mcp == null) ss["CodecItem"] = mcp = new CodecItem();

            var pks = ParseNew(pk, mcp, 0, ms => GetLength(ms, Offset, Size), Expire);

            // 跳过头部长度
            var len = Offset + Math.Abs(Size);
            foreach (var item in pks)
            {
                item.Set(item.Data, item.Offset + len, item.Count - len);
                //item.SetSub(len, item.Count - len);
            }

            return pks;
        }

        #region 粘包处理
        /// <summary>分析数据流,得到一帧数据</summary>
        /// <param name="pk">待分析数据包</param>
        /// <param name="codec">参数</param>
        /// <param name="getLength">获取长度</param>
        /// <param name="expire">缓存有效期</param>
        /// <returns></returns>
        protected IList<Packet> ParseNew(Packet pk, CodecItem codec, int startIndex, Func<Packet, Int32> getLength, Int32 expire = 5000)
        {
            var _ms = codec.Stream;
            var nodata = _ms == null || _ms.Position < 0 || _ms.Position >= _ms.Length;

            var list = new List<Packet>();
            // 内部缓存没有数据,直接判断输入数据流是否刚好一帧数据,快速处理,绝大多数是这种场景
            if (nodata)
            {
                if (pk == null) return list.ToArray();

                var idx = 0;
                while (idx < pk.Total)
                {
                    //var pk2 = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                    var pk2 = pk.Slice(idx);
                    var len = getLength(pk2);
                    if (len <= 0 || len > pk2.Count) break;

                    pk2.Set(pk2.Data, startIndex, len);
                    //pk2.SetSub(0, len);
                    list.Add(pk2);
                    idx += len;
                }
                // 如果没有剩余,可以返回
                if (idx == pk.Total) return list.ToArray();

                // 剩下的
                //pk = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                pk = pk.Slice(idx);
            }

            if (_ms == null) codec.Stream = _ms = new MemoryStream();

            // 加锁,避免多线程冲突
            lock (_ms)
            {
                // 超过该时间后按废弃数据处理
                var now = TimerX.Now;
                if (_ms.Length > _ms.Position && codec.Last.AddMilliseconds(expire) < now)
                {
                    _ms.SetLength(0);
                    _ms.Position = 0;
                }
                codec.Last = now;

                // 合并数据到最后面
                if (pk != null && pk.Total > 0)
                {
                    var p = _ms.Position;
                    _ms.Position = _ms.Length;
                    pk.WriteTo(_ms);
                    _ms.Position = p;
                }

                // 尝试解包
                while (_ms.Position < _ms.Length)
                {
                    //var pk2 = new Packet(_ms.GetBuffer(), (Int32)_ms.Position, (Int32)_ms.Length);
                    var pk2 = new Packet(_ms);
                    var len = getLength(pk2);

                    // 资源不足一包
                    if (len <= 0 || len > pk2.Total) break;

                    // 解包成功
                    pk2.Set(pk2.Data, startIndex, len);
                    //pk2.SetSub(0, len);
                    list.Add(pk2);

                    _ms.Seek(len, SeekOrigin.Current);
                }

                // 如果读完了数据,需要重置缓冲区
                if (_ms.Position >= _ms.Length)
                {
                    _ms.SetLength(0);
                    _ms.Position = 0;
                }

                return list;
            }
        }

粘包处理管道完成后,就可以在Recive中去处理一个完整的数据包啦,我来解析一下这个状态的数据并且来保存设备连接

首先定义一个字典项用来保存设备的连接信息.设备号,连接的SessionId

   /// <summary>
        /// newLife连接保持
        /// </summary>
        private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();

由于我们的数据中 帧类型不同的请求中帧类型是不一样的 所以解析数据需要做区分处理 我们来或者状态上传信息中的设备号并且和连接关联

 private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();
        private object _lock=new object();
        private void Recive(object sender, ReceivedEventArgs e)
        {

            INetSession session = (INetSession)sender;
            var pk = e.Message as Packet;
            if (pk.Count == 0)
            {
                XTrace.WriteLine("数据包解析错误");
                return;

            }
            try
            {
                //数据包
                var respBytes = pk.Data;
                //获取帧类型
                var dataTypeBytes = respBytes[1];

                if (dataTypeBytes == 0x70)
                {
                    //数值
                    byte[] deviceNoByte = new byte[15];
                    Buffer.BlockCopy(respBytes, 4, deviceNoByte, 0, 15); //从缓冲区里读取包头的字节
                    string deviceNo = Encoding.ASCII.GetString(deviceNoByte);
                    XTrace.WriteLine("设备编号:" + deviceNo);                    //保存连接信息                    SaveClientConnection(deviceNo, session.ID);
                    //获取设备号后保存连接信息
                } //支付宝
            }
            catch (Exception ex)
            {
                XTrace.WriteLine(ex.Message);
            }
        }

        /// <summary>
        /// 保存在线信息
        /// </summary>
        /// <param name="deviceNo"></param>
        /// <param name="sessionId"></param>
        private void SaveClientConnection(string deviceNo, int sessionId)
        {
            lock (_lock)
            {
                if (OnLineClients.ContainsKey(deviceNo))
                {
                    OnLineClients[deviceNo] = sessionId;
                }
                else
                {
                    OnLineClients.Add(deviceNo,sessionId);
                }
            }

        }

好了数据粘包问题解决啦同时保存了设备连接信息,下面来解决如何定时检查测试在线状态。

原文地址:https://www.cnblogs.com/yushuo/p/10286472.html

时间: 2024-10-10 01:56:44

使用Newlife网络库管道模式解决数据粘包(二)的相关文章

使用NewLife网络库构建可靠的自动售货机Socket服务端(一)

最近有个基于tcp socket 协议和设备交互需求,想到了新生命团队的各种组件,所以决定用NewLife网络库作为服务端来完成一系列的信息交互. 第一,首先说一下我们需要实现的功能需求吧 1,首先客户有一堆自动售货机的设备,设备连接socket服务端后 定时发送设备实时状态作为心跳信息,并且服务端需要下发信息予以确认. 2,需要知道设备的实时在线状态 3,设备需要实现微信,支付宝扫码支付需求,当客户买东西的时候选择扫码支付时,设备上报产品价格信息,支付方式,服务器下发微信或者支付宝的当面付二维

C#下利用封包、拆包原理解决Socket粘包、半包问题(新手篇)

介于网络上充斥着大量的含糊其辞的Socket初级教程,扰乱着新手的学习方向,我来扼要的教一下新手应该怎么合理的处理Socket这个玩意儿. 一般来说,教你C#下Socket编程的老师,很少会教你如何解决Socket粘包.半包问题. 更甚至,某些师德有问题的老师,根本就没跟你说过Socket的粘包.半包问题是什么玩意儿. 直到有一天,你的Socket程序在传输信息时出现了你预期之外的结果(多于的信息.不完整的信息.乱码.Bug等等). 任你喊了一万遍“我擦”,依旧是不知道问题出在哪儿! 好了,不说

Netty中LineBasedFrameDecoder解码器使用与分析:解决TCP粘包问题

[toc] Netty中LineBasedFrameDecoder解码器使用与分析:解决TCP粘包问题 上一篇文章<Netty中TCP粘包问题代码示例与分析>演示了使用了时间服务器的例子演示了TCP的粘包问题,这里使用LineBasedFrameDecoder就是用来解决这个问题的. 不过需要注意的是,LineBasedFrameDecoder见名知其义,可见其是与行相关的,而在前面演示TCP粘包问题时,作者是有意在发送的消息中都加入了换行符,目的也是为了在后面去讲解LineBasedFram

TCP网络通讯如何解决分包粘包问题(有模拟代码)

TCP作为常用的网络传输协议,数据流解析是网络应用开发人员永远绕不开的一个问题. TCP数据传输是以无边界的数据流传输形式,所谓无边界是指数据发送端发送的字节数,在数据接收端接受时并不一定等于发送的字节数,可能会出现粘包情况. 一.TCP粘包情况: 1. 发送端发送了数量比较的数据,接收端读取数据时候数据分批到达,造成一次发送多次读取:通常网络路由的缓存大小有关系,一个数据段大小超过缓存大小,那么就要拆包发送. 2. 发送端发送了几次数据,接收端一次性读取了所有数据,造成多次发送一次读取:通常是

Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器

服务端 package org.zln.netty.five.timer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; impo

python 网络编程(远程执行命令与粘包)

远程执行命令 先来学习一个新模块 , 一会用到的.. 新模块: subprocess 执行系统命令 r = subprocess.Popen('ls',shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE) subprocess.Popen(a,b,c,d) a: 要执行的系统命令(str) b: shell = True 表示确定我当前执行的命令为系统命令 c: 表示正确信息的输出管道 d: 表示错误信息的输出管道 下边直接上代码,

netty 解决TCP粘包与拆包问题(二)

TCP以流的方式进行数据传输,上层应用协议为了对消息的区分,采用了以下几种方法. 1.消息固定长度 2.第一篇讲的回车换行符形式 3.以特殊字符作为消息结束符的形式 4.通过消息头中定义长度字段来标识消息的总长度 一.采用指定分割符解决粘包与拆包问题 服务端 1 package com.ming.netty.nio.stickpack; 2 3 4 5 import java.net.InetSocketAddress; 6 7 import io.netty.bootstrap.ServerB

python全栈开发基础【补充】解决tcp粘包

一.什么是粘包 须知:只有TCP有粘包现象,UDP永远不会粘包 粘包不一定会发生 如果发生了:1.可能是在客户端已经粘了 2.客户端没有粘,可能是在服务端粘了 首先需要掌握一个socket收发消息的原理 应用程序所看到的数据是一个整体,或说是一个流(stream),一条消息有多少字节对应用程序是不可见的,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因.(因为TCP是流式协议,不知道啥时候开始,啥时候结束).而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提

使用Netty如何解决拆包粘包的问题

首先,我们通过一个DEMO来模拟TCP的拆包粘包的情况:客户端连续向服务端发送100个相同消息.服务端的代码如下: AtomicLong count = new AtomicLong(0); NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstra