c#网络通信框架networkcomms内核解析之七 数据包创建器(PacketBuilder)

PacketBuilder 数据包创建器,用于辅助创建数据包。

程序把Tcp连接上收到的二进制数据暂时存储在 packetBuilder中,如果收到的数据足够多,程序会把数据包包头解析出来,并根据数据包包头中的数据,解析出数据包大小,根据数据包大小,从PacketBuilder中截取相应的二进制数据,把这部分数据以内存流(MemoryStream)的形式,加上数据包包头一起交给NetworkComms.CompleteIncomingItemTask()方法进行处理。

PacketBuilder 中的数据被提取过的数据,会被删除。Tcp连接上进入的数据,又会加入进来,好似一个流动的容器。

 /// <summary>
    /// Packet data is generally broken into multiple variable sized byte chunks or ‘partial packets‘. This class provides features to effortlessly rebuild whole packets.
    /// 数据包创建器,用来临时存储接收到的二进制数据,如果收到的二级制数据足够合成一个数据包,主程序会介入处理这个部分数据,处理完成后,会从PacketBuilder中删除已经处理的数据
    /// </summary>
    public class PacketBuilder
    {
        //存储的二级制数据
        List<byte[]> packets = new List<byte[]>();
        //存放二进制数据对应的大小
        List<int> packetActualBytes = new List<int>();

        /// <summary>
        /// Locker object used for performing thread safe operations over this packet builder
        /// </summary>
        public object Locker { get; private set; }

        int totalBytesCached = 0;
        int totalBytesExpected = 0;

        /// <summary>
        /// Create a new instance of the ConnectionPacketBuilder class
        /// </summary>
        public PacketBuilder()
        {
            Locker = new object();
        }

        /// <summary>
        /// The total number of cached bytes. This is the sum of all bytes across all cached partial packets. See <see cref="TotalPartialPacketCount"/>.
        /// 已经存储的数据的带下
        /// </summary>
        public int TotalBytesCached
        {
            get { return totalBytesCached; }
        }

        /// <summary>
        /// The total number of cached partial packets. This is different from <see cref="TotalBytesCached"/> because each partial packet may contain a variable number of bytes.
       /// </summary>
        public int TotalPartialPacketCount
        {
            get { lock (Locker) return packets.Count; }
        }

        /// <summary>
        /// The total number of bytes required to rebuild the next whole packet.
        /// 期待的数据大小,也就是根据数据包包头解析出的数据包大小
        /// </summary>
        public int TotalBytesExpected
        {
            get { lock (Locker) return totalBytesExpected; }
            set { lock (Locker) totalBytesExpected = value; }
        }

        /// <summary>
        /// Clear N bytes from cache, starting with oldest bytes first.
        /// 此方法用于,当此部分数据被主程序处理完成后,删除此部分数据
        /// </summary>
        /// <param name="numBytesToRemove">The total number of bytes to be removed.</param>
        public void ClearNTopBytes(int numBytesToRemove)
        {
            lock (Locker)
            {
                if (numBytesToRemove > 0)
                {
                    if (numBytesToRemove > totalBytesCached)
                        throw new CommunicationException("Attempting to remove " + numBytesToRemove.ToString() + " bytes when ConnectionPacketBuilder only contains " + totalBytesCached.ToString());

                    int bytesRemoved = 0;

                    //We will always remove bytes in order of the entries
                    for (int i = 0; i < packets.Count; i++)
                    {
                        if (packetActualBytes[i] > numBytesToRemove - bytesRemoved)
                        {
                            //Remove the necessary bytes from this packet and rebuild
                            //New array length is the original length minus the amount we need to remove
                            byte[] newPacketByteArray = new byte[packetActualBytes[i] - (numBytesToRemove - bytesRemoved)];
                            Buffer.BlockCopy(packets[i], numBytesToRemove - bytesRemoved, newPacketByteArray, 0, newPacketByteArray.Length);

                            bytesRemoved += packetActualBytes[i] - newPacketByteArray.Length;
                            packets[i] = newPacketByteArray;
                            packetActualBytes[i] = newPacketByteArray.Length;

                            //Stop removing data here
                            break;
                        }
                        else if (i > packets.Count - 1)
                        {
                            //When i == (packet.Count - 1) I would expect the above if condition to always be true
                            throw new CommunicationException("This should be impossible.");
                        }
                        else
                        {
                            //If we want to remove this entire packet we can just set the list reference to null
                            bytesRemoved += packetActualBytes[i];
                            packets[i] = null;
                            packetActualBytes[i] = -1;
                        }
                    }

                    if (bytesRemoved != numBytesToRemove)
                        throw new CommunicationException("bytesRemoved should really equal the requested numBytesToRemove");

                    //Reset the totalBytesRead
                    totalBytesCached -= bytesRemoved;

                    //Get rid of any null packets
                    List<byte[]> newPackets = new List<byte[]>(packets.Count);
                    for (int i = 0; i < packets.Count; i++)
                    {
                        if (packets[i] != null)
                            newPackets.Add(packets[i]);
                    }
                    packets = newPackets;

                    //Remove any -1 entries
                    List<int> newPacketActualBytes = new List<int>(packetActualBytes.Count);
                    for (int i = 0; i < packetActualBytes.Count; i++)
                    {
                        if (packetActualBytes[i] > -1)
                            newPacketActualBytes.Add(packetActualBytes[i]);
                    }
                    packetActualBytes = newPacketActualBytes;

                    //This is a really bad place to put a garbage collection as it hammers the CPU
                    //GC.Collect();
                }
            }
        }

        /// <summary>
        /// Add a partial packet to the end of the cache by reference.
        /// 添加接收的数据
        /// </summary>
        /// <param name="packetBytes">The number of valid bytes in the provided partial packet</param>
        /// <param name="partialPacket">A buffer which may or may not be full with valid bytes</param>
        public void AddPartialPacket(int packetBytes, byte[] partialPacket)
        {
            lock (Locker)
            {
                totalBytesCached += packetBytes;

                packets.Add(partialPacket);
                packetActualBytes.Add(packetBytes);

                if (NetworkComms.LoggingEnabled)
                {
                    if (TotalBytesExpected == 0 && totalBytesCached > (10 * 1024 * 1024))
                        NetworkComms.Logger.Warn("Packet builder cache contains " + (totalBytesCached / 1024.0).ToString("0.0") + "KB when 0KB are currently expected.");
                    else if (TotalBytesExpected > 0 && totalBytesCached > totalBytesExpected * 2)
                        NetworkComms.Logger.Warn("Packet builder cache contains " + (totalBytesCached / 1024.0).ToString("0.0") + "KB when only " + (TotalBytesExpected / 1024.0).ToString("0.0") + "KB were expected.");
                }
            }
        }

        /// <summary>
        /// Returns the most recently cached partial packet and removes it from the cache.
        /// Used to more efficiently utilise allocated memory space.
        /// </summary>
        /// <param name="lastPacketBytesRead">The number of valid bytes in the last partial packet added</param>
        /// <returns>A byte[] corresponding with the last added partial packet</returns>
        public byte[] RemoveMostRecentPartialPacket(ref int lastPacketBytesRead)
        {
            lock (Locker)
            {
                if (packets.Count > 0)
                {
                    int lastPacketIndex = packets.Count - 1;

                    lastPacketBytesRead = packetActualBytes[lastPacketIndex];
                    byte[] returnArray = packets[lastPacketIndex];

                    totalBytesCached -= packetActualBytes[lastPacketIndex];

                    packets.RemoveAt(lastPacketIndex);
                    packetActualBytes.RemoveAt(lastPacketIndex);

                    return returnArray;
                }
                else
                    throw new Exception("Unable to remove most recent packet as packet list is empty.");
            }
        }

        /// <summary>
        /// Returns the number of unused bytes in the most recently cached partial packet.
        /// </summary>
        /// <returns>The number of unused bytes in the most recently cached partial packet.</returns>
        public int NumUnusedBytesMostRecentPartialPacket()
        {
            lock (Locker)
            {
                if (packets.Count > 0)
                {
                    int lastPacketIndex = packets.Count - 1;
                    return packets[lastPacketIndex].Length - packetActualBytes[lastPacketIndex];
                }
                else
                    throw new Exception("Unable to return requested size as packet list is empty.");
            }
        }

        /// <summary>
        /// Returns the value of the first cached byte.
        /// 获取第一个字节中的数据
        /// </summary>
        /// <returns>The value of the first cached byte.</returns>
        public byte FirstByte()
        {
            lock (Locker)
                return packets[0][0];
        }

        /// <summary>
        /// Copies all cached bytes into a single array and returns. Original data is left unchanged.
        /// </summary>
        /// <returns>All cached data as a single byte[]</returns>
        public byte[] GetAllData()
        {
            lock (Locker)
            {
                byte[] returnArray = new byte[totalBytesCached];

                int currentStart = 0;
                for (int i = 0; i < packets.Count; i++)
                {
                    Buffer.BlockCopy(packets[i], 0, returnArray, currentStart, packetActualBytes[i]);
                    currentStart += packetActualBytes[i];
                }

                return returnArray;
            }
        }

        /// <summary>
        /// Copies the requested cached bytes into a single array and returns. Original data is left unchanged.
        /// 拷贝需要的数据到一个字节数组中,并以内存流的形式返回给主程序,主程序会把“数据包包头”和此部分数据一起交给NetworkComms.CompleteIncomingItemTask()方法进行处理
        ///
        /// </summary>
        /// <param name="startIndex">The inclusive byte index to use as the starting position.</param>
        /// <param name="length">The total number of desired bytes.</param>
        /// <returns>The requested bytes as a single array.</returns>
        public MemoryStream ReadDataSection(int startIndex, int length)
        {
            lock (Locker)
            {
                byte[] returnArray = new byte[length];
                int runningTotal = 0, writeTotal = 0;
                int startingPacketIndex;

                int firstPacketStartIndex = 0;
                //First find the correct starting packet
                for (startingPacketIndex = 0; startingPacketIndex < packets.Count; startingPacketIndex++)
                {
                    if (startIndex - runningTotal <= packetActualBytes[startingPacketIndex])
                    {
                        firstPacketStartIndex = startIndex - runningTotal;
                        break;
                    }
                    else
                        runningTotal += packetActualBytes[startingPacketIndex];
                }

                //Copy the bytes of interest
                for (int i = startingPacketIndex; i < packets.Count; i++)
                {
                    if (i == startingPacketIndex)
                    {
                        if (length > packetActualBytes[i] - firstPacketStartIndex)
                            //If we want from some starting point to the end of the packet
                            Buffer.BlockCopy(packets[i], firstPacketStartIndex, returnArray, writeTotal, packetActualBytes[i] - firstPacketStartIndex);
                        else
                        {
                            //We only want part of the packet
                            Buffer.BlockCopy(packets[i], firstPacketStartIndex, returnArray, writeTotal, length);
                            writeTotal += length;
                            break;
                        }

                        writeTotal = packetActualBytes[i] - firstPacketStartIndex;
                    }
                    else
                    {
                        //We are no longer on the first packet
                        if (packetActualBytes[i] + writeTotal >= length)
                        {
                            //We have reached the last packet of interest
                            Buffer.BlockCopy(packets[i], 0, returnArray, writeTotal, length - writeTotal);
                            writeTotal += length - writeTotal;
                            break;
                        }
                        else
                        {
                            Buffer.BlockCopy(packets[i], 0, returnArray, writeTotal, packetActualBytes[i]);
                            writeTotal += packetActualBytes[i];
                        }
                    }
                }

                if (writeTotal != length) throw new Exception("Not enough data available in packetBuilder to complete request. Requested " + length.ToString() + " bytes but only " + writeTotal.ToString() + " bytes were copied.");

                return new MemoryStream(returnArray, 0, returnArray.Length, false, true);
            }
        }
    }

www.cnblogs.com/networkcomms

www.networkcomms.cn

时间: 2024-08-24 20:51:51

c#网络通信框架networkcomms内核解析之七 数据包创建器(PacketBuilder)的相关文章

c#网络通信框架networkcomms内核解析之八 数据包的核心处理器

我们先回顾一个 c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据 中,主程序把PacketBuilder 中的数据交给核心处理器处理的过程 //创建优先级队列项目 PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPac

c#网络通信框架networkcomms内核解析之五 数据监听

在networkcomms中,服务器端可以同步监听数据,也可以异步监听数据. 以开源的networkcomms.2.31为例 服务器端监听代码: protected override void StartIncomingDataListen() { if (!NetworkComms.ConnectionExists(ConnectionInfo.RemoteEndPoint, ConnectionType.TCP)) { CloseConnection(true, 18); throw new

c#网络通信框架networkcomms内核解析 序言

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

c#网络通信框架networkcomms内核解析

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

c#网络通信框架networkcomms内核解析之十一 TCP连接与UDP连接

连接是通信的核心 客户端一般只会有一个连接 服务器端会维护成千上万的连接 在服务器端连接的维护工作是由NetworkComms静态类来完成的,当有新的客户端请求,服务器上会创建相应的连接,并把连接注册到NetworkComms静态类中.当连接断开后,NetworkComms通信框架会自动把相应连接的引用从NetworkComms静态类中删除. 连接的类图: 在V3以上版本中,数据监听部分已从Connnection类中提取出去成为一个单独的类: TCPConnectionListener   ,使

c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据

在networkcomms通信系统中,服务器端收到某连接上的数据后,数据会暂时存放在"数据包创建器"(PacketBuilder)中,PacketBuilder类似一个流动的容器,收到的数据被服务器处理完成后,相应在二进制数据,会从存储他的PacketBuilder中删除. 我们知道在networkcomms的消息体系中,传送的数据的第一个字节用来存储数据包包头长度,解析出数据包包头后,包头中包含数据包长度.所以在读入进入PacketBuilder中的数据,会根据第一个字节中存储的数据

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

c#网络通信框架networkcomms内核解析之一 消息传送

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn 在网络通信程序中,本地的类或者对象,要传输到通信的另一端,在网络上传输的时候是二进制流的形式. 那么在发送消息的时候要把对象序列化为二进制流 对方接收到二进制数据流要还原成对象. 我们知道使用Tcp协议传输消息的时候有消息边界问题,要解决这个问题,方法有很多,比如: (1)固定尺寸的消息 (2) 使用消息尺寸信息 (3) 使用消息标记 我们

c#网络通信框架networkcomms内核解析之三 消息同步调用

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn 客户端发送消息给服务器,服务器计算结果返回给客户端,这是网络通信应用程序中常见的使用情境. 拿用户登录举例子,客户端把包含用户名和密码的契约类发送给服务器,服务器从数据获取数据后与收到的契约类数据进行对比,如果一致,返回登录成功的信息,如果不一致,返回登陆不成功的信息 NetworkComms框架支持消息的同步调用,就像调用本地方法一般.