c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据

在networkcomms通信系统中,服务器端收到某连接上的数据后,数据会暂时存放在"数据包创建器"(PacketBuilder)中,PacketBuilder类似一个流动的容器,收到的数据被服务器处理完成后,相应在二进制数据,会从存储他的PacketBuilder中删除。

我们知道在networkcomms的消息体系中,传送的数据的第一个字节用来存储数据包包头长度,解析出数据包包头后,包头中包含数据包长度。所以在读入进入PacketBuilder中的数据,会根据第一个字节中存储的数据,解析出包头的长度,进一步解析出数据包的长度。如果PacketBuilder中的存储的字节大小,大于数据包的长度,那么主程序将会对PacketBuilder中的数据,进行提取,并进行处理。

此节,我们暂时不讨论PacketBuilder的细节,先看一下服务器如何对PacketBuilder中的数据进行处理

protected void IncomingPacketHandleHandOff(PacketBuilder packetBuilder)
        {
            try
            {
                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... checking for completed packet with " + packetBuilder.TotalBytesCached.ToString() + " bytes read.");

                if (packetBuilder.TotalPartialPacketCount == 0)
                    throw new Exception("Executing IncomingPacketHandleHandOff when no packets exist in packetbuilder.");

                //循环,直到我们完成对packetBuilder的处理
                //Loop until we are finished with this packetBuilder
                int loopCounter = 0;
                while (true)
                {
                    //If we have ended up with a null packet at the front, probably due to some form of concatentation we can pull it off here
                    //It is possible we have concatenation of several null packets along with real data so we loop until the firstByte is greater than 0
                    //此处,可能是处理心跳检测时对方发来的0字节数据
                    if (packetBuilder.FirstByte() == 0)
                    {
                        if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandleHandOff() from " + ConnectionInfo + ", loop index - " + loopCounter.ToString());

                        packetBuilder.ClearNTopBytes(1);

                        //Reset the expected bytes to 0 so that the next check starts from scratch
                        packetBuilder.TotalBytesExpected = 0;

                        //If we have run out of data completely then we can return immediately
                        if (packetBuilder.TotalBytesCached == 0) return;
                    }
                    else
                    {
                        //First determine the expected size of a header packet
                        //获取数据包包头长度
                        int packetHeaderSize = packetBuilder.FirstByte() + 1;

                        //Do we have enough data to build a header?
                        //如果没有足够的二级制数据来还原出一个数据包包头
                        if (packetBuilder.TotalBytesCached < packetHeaderSize)
                        {
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet header.");

                            //Set the expected number of bytes and then return
                            packetBuilder.TotalBytesExpected = packetHeaderSize;
                            return;
                        }

                        //We have enough for a header
                        //有足够的数据来还原出数据包包头
                        PacketHeader topPacketHeader;
                        using(MemoryStream headerStream = packetBuilder.ReadDataSection(1, packetHeaderSize - 1))
                            //数据包包头
                            topPacketHeader = new PacketHeader(headerStream, NetworkComms.InternalFixedSendReceiveOptions);

                        //Idiot test
                        if (topPacketHeader.PacketType == null)
                            throw new SerialisationException("packetType value in packetHeader should never be null");

                        //We can now use the header to establish if we have enough payload data
                        //First case is when we have not yet received enough data
                         //如果没有足够的数据来还原出数据包
                        if (packetBuilder.TotalBytesCached < packetHeaderSize + topPacketHeader.PayloadPacketSize)
                        {
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet payload. Expecting " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " total packet bytes.");

                            //Set the expected number of bytes and then return
                            packetBuilder.TotalBytesExpected = packetHeaderSize + topPacketHeader.PayloadPacketSize;
                            return;
                        }
                        //Second case is we have enough data
                        //有足够的数据还原出数据包
                        else if (packetBuilder.TotalBytesCached >= packetHeaderSize + topPacketHeader.PayloadPacketSize)
                        {
                            //We can either have exactly the right amount or even more than we were expecting
                            //We may have too much data if we are sending high quantities and the packets have been concatenated
                            //no problem!!
                            SendReceiveOptions incomingPacketSendReceiveOptions = IncomingPacketSendReceiveOptions(topPacketHeader);
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Received packet of type ‘" + topPacketHeader.PacketType + "‘ from " + ConnectionInfo + ", containing " + packetHeaderSize.ToString() + " header bytes and " + topPacketHeader.PayloadPacketSize.ToString() + " payload bytes.");

                            //If this is a reserved packetType we call the method inline so that it gets dealt with immediately
                            //判断是否为保留类型,保留类型会优先处理
                            //保留类型包括心跳检测、连接建立等
                            bool isReservedType = false;
                            foreach (var tName in NetworkComms.reservedPacketTypeNames)
                            {
                                //isReservedType |= topPacketHeader.PacketType == tName;
                                if (topPacketHeader.PacketType == tName)
                                {
                                    isReservedType = true;
                                    break;
                                }
                            }

                            //Only reserved packet types get completed inline
                            //如果数据包类型为保留类型  设定为高优先级
                            if (isReservedType)
                            {
#if WINDOWS_PHONE
                                var priority = QueueItemPriority.Normal;
#else
                                var priority = (QueueItemPriority)Thread.CurrentThread.Priority;
#endif
                                //创建优先级队列项目
                                PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions);
                                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type ‘" + topPacketHeader.PacketType + "‘ inline. Loop index - " + loopCounter.ToString());
                                NetworkComms.CompleteIncomingItemTask(item);
                            }
                            else
                            {
                                //创建优先级队列项目
                                QueueItemPriority itemPriority = (incomingPacketSendReceiveOptions.Options.ContainsKey("ReceiveHandlePriority") ? (QueueItemPriority)Enum.Parse(typeof(QueueItemPriority), incomingPacketSendReceiveOptions.Options["ReceiveHandlePriority"]) : QueueItemPriority.Normal);
                                //把数据包包头,和数据包数据 赋值给优先级队列项
                                PriorityQueueItem item = new PriorityQueueItem(itemPriority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions);

                                //QueueItemPriority.Highest is the only priority that is executed inline
#if !WINDOWS_PHONE
                                //如果是最高优先级,当前线程处理
                                if (itemPriority == QueueItemPriority.Highest)
                                {
                                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type ‘" + topPacketHeader.PacketType + "‘ with priority HIGHEST inline. Loop index - " + loopCounter.ToString());
                                    NetworkComms.CompleteIncomingItemTask(item);
                                }
                                else
                                {
                                    //不是最高优先级 交给自定义线程池处理   此线程池支持优先级处理
                                    int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item);
                                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + ".");
                                }
#else
                                int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item);
                                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + ".");
#endif
                            }

                            //从PacketBuilder中删除已经处理过的数据
                            //We clear the bytes we have just handed off
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Removing " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " bytes from incoming packet buffer from connection with " + ConnectionInfo +".");
                            packetBuilder.ClearNTopBytes(packetHeaderSize + topPacketHeader.PayloadPacketSize);

                            //Reset the expected bytes to 0 so that the next check starts from scratch
                            packetBuilder.TotalBytesExpected = 0;

                            //If we have run out of data completely then we can return immediately
                            if (packetBuilder.TotalBytesCached == 0) return;
                        }
                        else
                            throw new CommunicationException("This should be impossible!");
                    }

                    loopCounter++;
                }
            }
            catch (Exception ex)
            {
                //Any error, throw an exception.
                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("A fatal exception occured in IncomingPacketHandleHandOff(), connection with " + ConnectionInfo + " be closed. See log file for more information.");

                NetworkComms.LogError(ex, "CommsError");
                CloseConnection(true, 45);
            }
        }

在上面的数据处理过程中,不知道您是否注意到一个重要的概念,即“自定义线程池”,

CommsThreadPool

此线程池,可以说是Networkcomms通信框架中一颗璀璨的明珠,由他负责处理接收到的所有数据,此线程池支持优先级,即高优先级的数据会被优先处理

www.networkcomms.cnwww.cnblogs.com/networkcomms
时间: 2024-10-12 10:48:59

c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据的相关文章

c#网络通信框架networkcomms内核解析 序言

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

c#网络通信框架networkcomms内核解析之八 数据包的核心处理器

我们先回顾一个 c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据 中,主程序把PacketBuilder 中的数据交给核心处理器处理的过程 //创建优先级队列项目 PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPac

c#网络通信框架networkcomms内核解析

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

c#网络通信框架networkcomms内核解析之七 数据包创建器(PacketBuilder)

PacketBuilder 数据包创建器,用于辅助创建数据包. 程序把Tcp连接上收到的二进制数据暂时存储在 packetBuilder中,如果收到的数据足够多,程序会把数据包包头解析出来,并根据数据包包头中的数据,解析出数据包大小,根据数据包大小,从PacketBuilder中截取相应的二进制数据,把这部分数据以内存流(MemoryStream)的形式,加上数据包包头一起交给NetworkComms.CompleteIncomingItemTask()方法进行处理. PacketBuilder

c#网络通信框架networkcomms内核解析之一 消息传送

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn 在网络通信程序中,本地的类或者对象,要传输到通信的另一端,在网络上传输的时候是二进制流的形式. 那么在发送消息的时候要把对象序列化为二进制流 对方接收到二进制数据流要还原成对象. 我们知道使用Tcp协议传输消息的时候有消息边界问题,要解决这个问题,方法有很多,比如: (1)固定尺寸的消息 (2) 使用消息尺寸信息 (3) 使用消息标记 我们

c#网络通信框架networkcomms内核解析之三 消息同步调用

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn 客户端发送消息给服务器,服务器计算结果返回给客户端,这是网络通信应用程序中常见的使用情境. 拿用户登录举例子,客户端把包含用户名和密码的契约类发送给服务器,服务器从数据获取数据后与收到的契约类数据进行对比,如果一致,返回登录成功的信息,如果不一致,返回登陆不成功的信息 NetworkComms框架支持消息的同步调用,就像调用本地方法一般.

c#网络通信框架networkcomms内核解析之二 消息处理流程

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn   发送端发送消息给接收端 ,接收端进行处理    举例:客户端把某用户信息(用户ID,用户密码)传输给服务器,服务器存储到数据库中. 发送方 1.契约类(用户信息类) [ProtoContract] public class UserInfo { [ProtoMember(1)] public string UserID; [ProtoM

c#网络通信框架networkcomms内核解析之十一 TCP连接与UDP连接

连接是通信的核心 客户端一般只会有一个连接 服务器端会维护成千上万的连接 在服务器端连接的维护工作是由NetworkComms静态类来完成的,当有新的客户端请求,服务器上会创建相应的连接,并把连接注册到NetworkComms静态类中.当连接断开后,NetworkComms通信框架会自动把相应连接的引用从NetworkComms静态类中删除. 连接的类图: 在V3以上版本中,数据监听部分已从Connnection类中提取出去成为一个单独的类: TCPConnectionListener   ,使