c#网络通信框架networkcomms内核解析之八 数据包的核心处理器

我们先回顾一个 c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据 中,主程序把PacketBuilder 中的数据交给核心处理器处理的过程

   //创建优先级队列项目
  PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions);

   NetworkComms.CompleteIncomingItemTask(item);

上面的代码中

第一行 生成了一个优先级队列项目 (主要方便交给带有优先级的自定义线程池进行处理)

其参数包括

1:优先级 2:相关Tcp连接 (有返回数据的话,还要通过此连接返回) 3:数据包包头 4:数据包的二进制数据(内存流,还没有解析成数据包,留待下一步解析)。

自定义线程池,暂时不表,看一下核心处理器。(自定义线程池,是创建多个线程,并按照优先级对数据进行处理)

NetworkComms.CompleteIncomingItemTask(item);
   /// <summary>
        /// Once we have received all incoming data we handle it further. This is performed at the global level to help support different priorities.
        /// 数据包的核心处理器
        /// </summary>
        /// <param name="itemAsObj">Possible PriorityQueueItem. If null is provided an item will be removed from the global item queue</param>
        internal static void CompleteIncomingItemTask(object itemAsObj)
        {
            if (itemAsObj == null)
                throw new ArgumentNullException("itemAsObj", "Provided parameter itemAsObj cannot be null.");
            //优先级队列项目
            PriorityQueueItem item = null;
            try
            {
                //If the packetBytes are null we need to ask the incoming packet queue for what we should be running
                //把对象还原成优先级队列项目
                item = itemAsObj as PriorityQueueItem;

                if (item == null)
                    throw new InvalidCastException("Cast from object to PriorityQueueItem resulted in null reference, unable to continue.");

                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Handling a " + item.PacketHeader.PacketType + " packet from " + item.Connection.ConnectionInfo + " with a priority of " + item.Priority.ToString() + ".");

#if !WINDOWS_PHONE
                if (Thread.CurrentThread.Priority != (ThreadPriority)item.Priority) Thread.CurrentThread.Priority = (ThreadPriority)item.Priority;
#endif

                //Check for a shutdown connection
                if (item.Connection.ConnectionInfo.ConnectionState == ConnectionState.Shutdown) return;

                //We only look at the check sum if we want to and if it has been set by the remote end
                //如果这是一个检验和消息
                if (NetworkComms.EnablePacketCheckSumValidation && item.PacketHeader.ContainsOption(PacketHeaderStringItems.CheckSumHash))
                {
                    var packetHeaderHash = item.PacketHeader.GetOption(PacketHeaderStringItems.CheckSumHash);

                    //Validate the checkSumhash of the data
                    string packetDataSectionMD5 = NetworkComms.MD5Bytes(item.DataStream);
                    if (packetHeaderHash != packetDataSectionMD5)
                    {
                        if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn(" ... corrupted packet detected, expected " + packetHeaderHash + " but received " + packetDataSectionMD5 + ".");

                        //We have corruption on a resend request, something is very wrong so we throw an exception.
                        if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend)) throw new CheckSumException("Corrupted md5CheckFailResend packet received.");

                        if (item.PacketHeader.PayloadPacketSize < NetworkComms.CheckSumMismatchSentPacketCacheMaxByteLimit)
                        {
                            //Instead of throwing an exception we can request the packet to be resent
                            Packet returnPacket = new Packet(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend), packetHeaderHash, NetworkComms.InternalFixedSendReceiveOptions);
                            item.Connection.SendPacket(returnPacket);
                            //We need to wait for the packet to be resent before going further
                            return;
                        }
                        else
                            throw new CheckSumException("Corrupted packet detected from " + item.Connection.ConnectionInfo + ", expected " + packetHeaderHash + " but received " + packetDataSectionMD5 + ".");
                    }
                }

                //Remote end may have requested packet receive confirmation so we send that now
                //如果对象发送的消息,需要确认收到,在此处进行确认
                if (item.PacketHeader.ContainsOption(PacketHeaderStringItems.ReceiveConfirmationRequired))
                {
                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... sending requested receive confirmation packet.");

                    var hash = item.PacketHeader.ContainsOption(PacketHeaderStringItems.CheckSumHash) ? item.PacketHeader.GetOption(PacketHeaderStringItems.CheckSumHash) : "";

                    Packet returnPacket = new Packet(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.Confirmation), hash, NetworkComms.InternalFixedSendReceiveOptions);
                    item.Connection.SendPacket(returnPacket);
                }

                //处理保留数据类型的数据包
                //We can now pass the data onto the correct delegate
                //First we have to check for our reserved packet types
                //The following large sections have been factored out to make reading and debugging a little easier
                if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.CheckSumFailResend))
                    item.Connection.CheckSumFailResendHandler(item.DataStream);
                else if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.ConnectionSetup))
                    item.Connection.ConnectionSetupHandler(item.DataStream);
                else if (item.PacketHeader.PacketType == Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.AliveTestPacket) &&
                    (NetworkComms.InternalFixedSendReceiveOptions.DataSerializer.DeserialiseDataObject<byte[]>(item.DataStream,
                        NetworkComms.InternalFixedSendReceiveOptions.DataProcessors,
                        NetworkComms.InternalFixedSendReceiveOptions.Options))[0] == 0)
                {
                    //If we have received a ping packet from the originating source we reply with true
                    Packet returnPacket = new Packet(Enum.GetName(typeof(ReservedPacketType), ReservedPacketType.AliveTestPacket), new byte[1] { 1 }, NetworkComms.InternalFixedSendReceiveOptions);
                    item.Connection.SendPacket(returnPacket);
                }

                //We allow users to add their own custom handlers for reserved packet types here
                //else
                if (true)
                {
                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Triggering handlers for packet of type ‘" + item.PacketHeader.PacketType + "‘ from " + item.Connection.ConnectionInfo);

                    //We trigger connection specific handlers first
                    //触发连接上的消息处理器
                    bool connectionSpecificHandlersTriggered = item.Connection.TriggerSpecificPacketHandlers(item.PacketHeader, item.DataStream, item.SendReceiveOptions);

                    //We trigger global handlers second
                   //触发具体的数据包处理器
                    NetworkComms.TriggerGlobalPacketHandlers(item.PacketHeader, item.Connection, item.DataStream, item.SendReceiveOptions, connectionSpecificHandlersTriggered);

                }
            }
            catch (CommunicationException)
            {
                if (item != null)
                {
                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("A communcation exception occured in CompleteIncomingPacketWorker(), connection with " + item.Connection.ConnectionInfo + " be closed.");
                    item.Connection.CloseConnection(true, 2);
                }
            }
            catch (DuplicateConnectionException ex)
            {
                if (item != null)
                {
                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn(ex.Message != null ? ex.Message : "A possible duplicate connection was detected with " + item.Connection + ". Closing connection.");
                    item.Connection.CloseConnection(true, 42);
                }
            }
            catch (Exception ex)
            {
                NetworkComms.LogError(ex, "CompleteIncomingItemTaskError");

                if (item != null)
                {
                    //If anything goes wrong here all we can really do is log the exception
                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An unhandled exception occured in CompleteIncomingPacketWorker(), connection with " + item.Connection.ConnectionInfo + " be closed. See log file for more information.");
                    item.Connection.CloseConnection(true, 3);
                }
            }
            finally
            {
                //We need to dispose the data stream correctly
                if (item!=null) item.DataStream.Close();

#if !WINDOWS_PHONE
                //Ensure the thread returns to the pool with a normal priority
                if (Thread.CurrentThread.Priority != ThreadPriority.Normal) Thread.CurrentThread.Priority = ThreadPriority.Normal;
#endif
            }
        }

上面的方法中,我们看到TriggerGlobalPacketHandlers方法,此方法把数据包,和我们在服务器端自定义的数据包处理器关联起来,并用数据包处理器对数据包进行处理

   //触发具体的数据包处理器
      NetworkComms.TriggerGlobalPacketHandlers(item.PacketHeader, item.Connection, item.DataStream, item.SendReceiveOptions, connectionSpecificHandlersTriggered);

我们看一下此方法的具体内容:

     internal static void TriggerGlobalPacketHandlers(PacketHeader packetHeader, Connection connection, MemoryStream incomingDataStream, SendReceiveOptions options, bool ignoreUnknownPacketTypeOverride = false)
        {
            try
            {
                if (options == null) throw new PacketHandlerException("Provided sendReceiveOptions should not be null for packetType " + packetHeader.PacketType);

                //We take a copy of the handlers list incase it is modified outside of the lock
                List<IPacketTypeHandlerDelegateWrapper> handlersCopy = null;
                lock (globalDictAndDelegateLocker)
                    if (globalIncomingPacketHandlers.ContainsKey(packetHeader.PacketType))
                        //根据数据包包头中的消息类型,获取相关的自定义处理器,处理器可以一个,可以多个,当然一般是一个:)
                        handlersCopy = new List<IPacketTypeHandlerDelegateWrapper>(globalIncomingPacketHandlers[packetHeader.PacketType]);

                if (handlersCopy == null && !IgnoreUnknownPacketTypes && !ignoreUnknownPacketTypeOverride)
                {
                    //We may get here if we have not added any custom delegates for reserved packet types
                    bool isReservedType = false;

                    for (int i = 0; i < reservedPacketTypeNames.Length; i++)
                    {
                        if (reservedPacketTypeNames[i] == packetHeader.PacketType)
                        {
                            isReservedType = true;
                            break;
                        }
                    }

                    if (!isReservedType)
                    {
                        //Change this to just a log because generally a packet of the wrong type is nothing to really worry about
                        if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Warn("The received packet type ‘" + packetHeader.PacketType + "‘ has no configured handler and network comms is not set to ignore unknown packet types. Set NetworkComms.IgnoreUnknownPacketTypes=true to prevent this error.");
                        LogError(new UnexpectedPacketTypeException("The received packet type ‘" + packetHeader.PacketType + "‘ has no configured handler and network comms is not set to ignore unknown packet types. Set NetworkComms.IgnoreUnknownPacketTypes=true to prevent this error."), "PacketHandlerErrorGlobal_" + packetHeader.PacketType);
                    }

                    return;
                }
                else if (handlersCopy == null && (IgnoreUnknownPacketTypes || ignoreUnknownPacketTypeOverride))
                    //If we have received and unknown packet type and we are choosing to ignore them we just finish here
                    return;
                else
                {
                    //Idiot check
                    if (handlersCopy.Count == 0)
                        throw new PacketHandlerException("An entry exists in the packetHandlers list but it contains no elements. This should not be possible.");

                    //Deserialise the object only once
                    //把数据包解析处理
                    object returnObject = handlersCopy[0].DeSerialize(incomingDataStream, options);

                    //Pass the data onto the handler and move on.
                    if (LoggingEnabled) logger.Trace(" ... passing completed data packet of type ‘" + packetHeader.PacketType + "‘ to " + handlersCopy.Count.ToString() + " selected global handlers.");

                    //Pass the object to all necessary delgates
                    //We need to use a copy because we may modify the original delegate list during processing
                    foreach (IPacketTypeHandlerDelegateWrapper wrapper in handlersCopy)
                    {
                        try
                        {
                            //用数据包数据里处理数据包
                            //这里的处理器,就是我们自定义的处理器了
                            wrapper.Process(packetHeader, connection, returnObject);
                        }
                        catch (Exception ex)
                        {
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An unhandled exception was caught while processing a packet handler for a packet type ‘" + packetHeader.PacketType + "‘. Make sure to catch errors in packet handlers. See error log file for more information.");
                            NetworkComms.LogError(ex, "PacketHandlerErrorGlobal_" + packetHeader.PacketType);
                        }
                    }

                    if (LoggingEnabled) logger.Trace(" ... all handlers for packet of type ‘" + packetHeader.PacketType + "‘ completed.");
                }
            }
            catch (Exception ex)
            {
                //If anything goes wrong here all we can really do is log the exception
                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("An exception occured in TriggerPacketHandler() for a packet type ‘" + packetHeader.PacketType + "‘. See error log file for more information.");
                NetworkComms.LogError(ex, "PacketHandlerErrorGlobal_" + packetHeader.PacketType);
            }
        }

什么是自定义的数据包处理器呢?

举个例子

客户端我们这么写:

  ResMsgContract resMsg = newTcpConnection.SendReceiveObject<ResMsgContract>("ReqLogin", "ResLogin", 5000, contract);

服务器端我们就要写一个对应的自定义数据包处理器,写法如下:

首先要注册自定义的数据包处理器:

  NetworkComms.AppendGlobalIncomingPacketHandler<LoginContract>("ReqLogin", IncomingLoginRequest);

   private void IncomingLoginRequest(PacketHeader header, Connection connection, LoginContract loginContract)
        {
            try
            {
                string resMsg="";
                //为了简单,这里不调用数据库,而是模拟一下登录
                if (loginContract.UserID == "1000" && loginContract.PassWord == "123")

                    resMsg = "登录成功";

                else

                    resMsg = "用户名密码错误";
                //把返回结果写入到契约类中,后面返回给客户端
                ResMsgContract contract = new ResMsgContract();
                contract.Message = resMsg;

                connection.SendObject("ResLogin", contract);

            }
            catch (Exception ex)
            {

            }
        }

IncomingLoginRequest

通过上面的注册语句,Netowrkcomms中就把消息类型“ReqLogin"与他对应的处理方法关联起来,当以后服务器收到数据包包头中消息类型为"ReqLogin"类型的消息,就知道调用哪个方法来进行处理.

下一篇分析一下,自定义数据包处理器的底层运行机制

www.cnblogs.com/networkcomms

www.networkcomms.cn

时间: 2024-10-21 01:26:14

c#网络通信框架networkcomms内核解析之八 数据包的核心处理器的相关文章

c#网络通信框架networkcomms内核解析之七 数据包创建器(PacketBuilder)

PacketBuilder 数据包创建器,用于辅助创建数据包. 程序把Tcp连接上收到的二进制数据暂时存储在 packetBuilder中,如果收到的数据足够多,程序会把数据包包头解析出来,并根据数据包包头中的数据,解析出数据包大小,根据数据包大小,从PacketBuilder中截取相应的二进制数据,把这部分数据以内存流(MemoryStream)的形式,加上数据包包头一起交给NetworkComms.CompleteIncomingItemTask()方法进行处理. PacketBuilder

c#网络通信框架networkcomms内核解析之五 数据监听

在networkcomms中,服务器端可以同步监听数据,也可以异步监听数据. 以开源的networkcomms.2.31为例 服务器端监听代码: protected override void StartIncomingDataListen() { if (!NetworkComms.ConnectionExists(ConnectionInfo.RemoteEndPoint, ConnectionType.TCP)) { CloseConnection(true, 18); throw new

c#网络通信框架networkcomms内核解析 序言

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

c#网络通信框架networkcomms内核解析

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据

在networkcomms通信系统中,服务器端收到某连接上的数据后,数据会暂时存放在"数据包创建器"(PacketBuilder)中,PacketBuilder类似一个流动的容器,收到的数据被服务器处理完成后,相应在二进制数据,会从存储他的PacketBuilder中删除. 我们知道在networkcomms的消息体系中,传送的数据的第一个字节用来存储数据包包头长度,解析出数据包包头后,包头中包含数据包长度.所以在读入进入PacketBuilder中的数据,会根据第一个字节中存储的数据

c#网络通信框架networkcomms内核解析之一 消息传送

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn 在网络通信程序中,本地的类或者对象,要传输到通信的另一端,在网络上传输的时候是二进制流的形式. 那么在发送消息的时候要把对象序列化为二进制流 对方接收到二进制数据流要还原成对象. 我们知道使用Tcp协议传输消息的时候有消息边界问题,要解决这个问题,方法有很多,比如: (1)固定尺寸的消息 (2) 使用消息尺寸信息 (3) 使用消息标记 我们

c#网络通信框架networkcomms内核解析之十一 TCP连接与UDP连接

连接是通信的核心 客户端一般只会有一个连接 服务器端会维护成千上万的连接 在服务器端连接的维护工作是由NetworkComms静态类来完成的,当有新的客户端请求,服务器上会创建相应的连接,并把连接注册到NetworkComms静态类中.当连接断开后,NetworkComms通信框架会自动把相应连接的引用从NetworkComms静态类中删除. 连接的类图: 在V3以上版本中,数据监听部分已从Connnection类中提取出去成为一个单独的类: TCPConnectionListener   ,使

c#网络通信框架networkcomms内核解析之三 消息同步调用

networkcomms.net 来自英国的网络通信框架 官方网址 www.networkcomms.net 中文网址www.networkcomms.cn 客户端发送消息给服务器,服务器计算结果返回给客户端,这是网络通信应用程序中常见的使用情境. 拿用户登录举例子,客户端把包含用户名和密码的契约类发送给服务器,服务器从数据获取数据后与收到的契约类数据进行对比,如果一致,返回登录成功的信息,如果不一致,返回登陆不成功的信息 NetworkComms框架支持消息的同步调用,就像调用本地方法一般.