介绍开源的.net通信框架NetworkComms框架 源码分析(二十三 )TCPConnection

原文网址: http://www.cnblogs.com/csdev

Networkcomms 是一款C# 语言编写的TCP/UDP通信框架  作者是英国人  以前是收费的 目前作者已经开源  许可是:Apache License v2

开源地址是:https://github.com/MarcFletcher/NetworkComms.Net

/// <summary>
    /// A connection object which utilises <see href="http://en.wikipedia.org/wiki/Transmission_Control_Protocol">TCP</see> to communicate between peers.
    /// Tcp连接对象   通过TCP协议在端点之间通信
    /// </summary>
    public sealed partial class TCPConnection : IPConnection
    {
#if WINDOWS_PHONE || NETFX_CORE
        /// <summary>
        /// The windows phone socket corresponding to this connection.
        /// </summary>
        StreamSocket socket;
#else
        /// <summary>
        /// The TcpClient corresponding to this connection.
        /// 与此连接相关的TcpClient对象
        /// </summary>
        TcpClient tcpClient;

        /// <summary>
        /// The networkstream associated with the tcpClient.
        /// 与tcpClient相对应额networkstream对象
        /// </summary>
        Stream connectionStream;

        /// <summary>
        /// The SSL options associated with this connection.
        /// SSL(安全套接层) 相关的参数设置
        /// </summary>
        public SSLOptions SSLOptions { get; private set; }
#endif

        /// <summary>
        /// The current incoming data buffer
        ///  进入的数据的缓冲区
        /// </summary>
        byte[] dataBuffer;

        /// <summary>
        /// TCP connection constructor
        /// TCP连接构造器
        /// </summary>
#if WINDOWS_PHONE || NETFX_CORE
        private TCPConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, StreamSocket socket)
#else
        private TCPConnection(ConnectionInfo connectionInfo, SendReceiveOptions defaultSendReceiveOptions, TcpClient tcpClient, SSLOptions sslOptions)
#endif
            : base(connectionInfo, defaultSendReceiveOptions)
        {
            if (connectionInfo.ConnectionType != ConnectionType.TCP)
                throw new ArgumentException("Provided connectionType must be TCP.", "connectionInfo");

            dataBuffer = new byte[NetworkComms.InitialReceiveBufferSizeBytes];

            //We don‘t guarantee that the tcpClient has been created yet
            //在构造器中  tcpClient对象可能还没有被创建
#if WINDOWS_PHONE || NETFX_CORE
            if (socket != null) this.socket = socket;
#else
            if (tcpClient != null) this.tcpClient = tcpClient;
            this.SSLOptions = sslOptions;
#endif
        }

        /// <inheritdoc />
        /// 创建Tcp连接
        protected override void EstablishConnectionSpecific()
        {
#if WINDOWS_PHONE || NETFX_CORE
            if (socket == null) ConnectSocket();

            //For the local endpoint
            //本地端点
            var localEndPoint = new IPEndPoint(IPAddress.Parse(socket.Information.LocalAddress.CanonicalName.ToString()), int.Parse(socket.Information.LocalPort));

            //We should now be able to set the connectionInfo localEndPoint
            //此处 可以设置connectionInfo对象的本地端点
            NetworkComms.UpdateConnectionReferenceByEndPoint(this, ConnectionInfo.RemoteIPEndPoint, localEndPoint);
            ConnectionInfo.UpdateLocalEndPointInfo(localEndPoint);

            //Set the outgoing buffer size
            //设置发送缓冲区大小
            socket.Control.OutboundBufferSizeInBytes = (uint)NetworkComms.SendBufferSizeBytes;
#else
            if (tcpClient == null) ConnectSocket();

            //We should now be able to set the connectionInfo localEndPoint
            //我们现在应该能够设置 connectionInfo对象的本地端点
            NetworkComms.UpdateConnectionReferenceByEndPoint(this, ConnectionInfo.RemoteIPEndPoint, (IPEndPoint)tcpClient.Client.LocalEndPoint);
            ConnectionInfo.UpdateLocalEndPointInfo((IPEndPoint)tcpClient.Client.LocalEndPoint);

            if (SSLOptions.SSLEnabled)
                ConfigureSSLStream();
            else
                //We are going to be using the networkStream quite a bit so we pull out a reference once here
                //networkStream类型的网络流 后面会有不少地方需要使用 此处做一个引用
                connectionStream = tcpClient.GetStream();

            //When we tell the socket/client to close we want it to do so immediately  当我们告诉socket/client关闭时 我们希望其立即关闭
            //this.tcpClient.LingerState = new LingerOption(false, 0);

            //We need to set the keep alive option otherwise the connection will just die at some random time should we not be using it
            //我们需要设定Keep alive选项 以防止连接断开  (后来没有使用)
            //NOTE: This did not seem to work reliably so was replaced with the keepAlive packet feature
            //注意:设定KeepAlive选项来维护Tcp连接并不可靠   所以后来我们用发送心跳包进行代替
            //this.tcpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);

            tcpClient.ReceiveBufferSize = NetworkComms.MaxReceiveBufferSizeBytes;
            tcpClient.SendBufferSize = NetworkComms.SendBufferSizeBytes;

            //This disables the ‘nagle algorithm‘ 禁用 nagle算法
            //http://msdn.microsoft.com/en-us/library/system.net.sockets.socket.nodelay.aspx
            //Basically we may want to send lots of small packets (<200 bytes) and sometimes those are time critical (e.g. when establishing a connection)
            //If we leave this enabled small packets may never be sent until a suitable send buffer length threshold is passed. i.e. BAD
            //下面的2个选项设定为true,当Tcp连接上有数据时,立即发送,我符合我们的预期。而不是等待Tcp缓冲池中有足够多的数据才一起发送
            tcpClient.NoDelay = true;
            tcpClient.Client.NoDelay = true;
#endif

            //Start listening for incoming data
            //开始监听进来的数据
            StartIncomingDataListen();

            //If the application layer protocol is enabled we handshake the connection
            //如果应用层协议启用 连接进行握手操作
            //应用层协议,系统默认为启用
            if (ConnectionInfo.ApplicationLayerProtocol == ApplicationLayerProtocolStatus.Enabled)
                ConnectionHandshake();
            else
            {
                //If there is no handshake we can now consider the connection established
                //如果此处无握手操作 我们可以考虑创建连接
                TriggerConnectionEstablishDelegates();

                //Trigger any connection setup waits
                //触发任何连接建设等待
                connectionSetupWait.Set();
            }

#if !WINDOWS_PHONE && !NETFX_CORE
            //Once the connection has been established we may want to re-enable the ‘nagle algorithm‘ used for reducing network congestion (apparently).
            //By default we leave the nagle algorithm disabled because we want the quick through put when sending small packets
            //一旦连接建立了,某些情况下我们可能要重新启用“Nagle算法”用于减少网络拥塞。
            //默认情况下我们不启用 nagle 算法,启用nagle算法将导致小的数据不能被及时的发送
            if (EnableNagleAlgorithmForNewConnections)
            {
                tcpClient.NoDelay = false;
                tcpClient.Client.NoDelay = false;
            }
#endif
        }

        /// <summary>
        /// If we were not provided with a tcpClient on creation we need to create one
        /// 如果没有tcpClient对象则创建一个
        /// </summary>
        private void ConnectSocket()
        {
            try
            {
                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Connecting TCP client with " + ConnectionInfo);

                bool connectSuccess = true;
#if WINDOWS_PHONE || NETFX_CORE
                //We now connect to our target
                //我们现在开始连接到我们的目标
                socket = new StreamSocket();
                socket.Control.NoDelay = !EnableNagleAlgorithmForNewConnections;

                CancellationTokenSource cancelAfterTimeoutToken = new CancellationTokenSource(NetworkComms.ConnectionEstablishTimeoutMS);

                try
                {
                    if (ConnectionInfo.LocalEndPoint != null && ConnectionInfo.LocalIPEndPoint.Address != IPAddress.IPv6Any && ConnectionInfo.LocalIPEndPoint.Address != IPAddress.Any)
                    {
                        var endpointPairForConnection = new Windows.Networking.EndpointPair(new Windows.Networking.HostName(ConnectionInfo.LocalIPEndPoint.Address.ToString()), ConnectionInfo.LocalIPEndPoint.Port.ToString(),
                                                        new Windows.Networking.HostName(ConnectionInfo.RemoteIPEndPoint.Address.ToString()), ConnectionInfo.RemoteIPEndPoint.Port.ToString());                        

                        var task = socket.ConnectAsync(endpointPairForConnection).AsTask(cancelAfterTimeoutToken.Token);
                        task.Wait();
                    }
                    else
                    {
                        var task = socket.ConnectAsync(new Windows.Networking.HostName(ConnectionInfo.RemoteIPEndPoint.Address.ToString()), ConnectionInfo.RemoteIPEndPoint.Port.ToString()).AsTask(cancelAfterTimeoutToken.Token);
                        task.Wait();
                    }
                }
                catch (Exception)
                {
                    socket.Dispose();
                    connectSuccess = false;
                }
#else
                //We now connect to our target
                //我们现在开始连接到我们的目标
                tcpClient = new TcpClient(ConnectionInfo.RemoteEndPoint.AddressFamily);

                //Start the connection using the async version
                //以异步方式开始连接
                //This allows us to choose our own connection establish timeout
                //允许设定连接创建超时时间
                IAsyncResult ar = tcpClient.BeginConnect(ConnectionInfo.RemoteIPEndPoint.Address, ConnectionInfo.RemoteIPEndPoint.Port, null, null);
                WaitHandle connectionWait = ar.AsyncWaitHandle;
                try
                {
                    if (!connectionWait.WaitOne(NetworkComms.ConnectionEstablishTimeoutMS, false))
                        connectSuccess = false;
                    else
                        tcpClient.EndConnect(ar);
                }
                finally
                {
                    connectionWait.Close();
                }
#endif

                if (!connectSuccess) throw new ConnectionSetupException("Timeout waiting for remoteEndPoint to accept TCP connection.");
            }
            catch (Exception ex)
            {
                CloseConnection(true, 17);
                throw new ConnectionSetupException("Error during TCP connection establish with destination (" + ConnectionInfo + "). Destination may not be listening or connect timed out. " + ex.ToString());
            }
        }

        /// <inheritdoc />
        /// 开始监听进入的数据局
        protected override void StartIncomingDataListen()
        {
            if (!NetworkComms.ConnectionExists(ConnectionInfo.RemoteIPEndPoint, ConnectionInfo.LocalIPEndPoint, ConnectionType.TCP, ConnectionInfo.ApplicationLayerProtocol))
            {
                CloseConnection(true, 18);
                throw new ConnectionSetupException("A connection reference by endPoint should exist before starting an incoming data listener.");
            }

#if WINDOWS_PHONE
            var stream = socket.InputStream.AsStreamForRead();
            stream.BeginRead(dataBuffer, 0, dataBuffer.Length, new AsyncCallback(IncomingTCPPacketHandler), stream);
#elif NETFX_CORE
            Task readTask = new Task(async () =>
            {
                var buffer = Windows.Security.Cryptography.CryptographicBuffer.CreateFromByteArray(dataBuffer);
                var readBuffer = await socket.InputStream.ReadAsync(buffer, buffer.Capacity, InputStreamOptions.Partial);
                await IncomingTCPPacketHandler(readBuffer);
            });

            readTask.Start();
#else
            lock (SyncRoot)
            {
                //以同步方式
                if (NetworkComms.ConnectionListenModeUseSync)
                {
                    if (incomingDataListenThread == null)
                    {
                        //创建一个线程
                        incomingDataListenThread = new Thread(IncomingTCPDataSyncWorker);
                        //Incoming data always gets handled in a time critical fashion
                        //设定数据处理的优先级
                        incomingDataListenThread.Priority = NetworkComms.timeCriticalThreadPriority;
                        incomingDataListenThread.Name = "UDP_IncomingDataListener";
                        incomingDataListenThread.IsBackground = true;
                        incomingDataListenThread.Start();
                    }
                }
                //以异步方式处理
                else
                {
                    if (asyncListenStarted) throw new ConnectionSetupException("Async listen already started. Why has this been called twice?.");

                    asyncListenerInRead = true;
                    //开始读取连接上的数据
                    connectionStream.BeginRead(dataBuffer, 0, dataBuffer.Length, new AsyncCallback(IncomingTCPPacketHandler), connectionStream);

                    asyncListenStarted = true;
                }
            }
#endif

            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Listening for incoming data from " + ConnectionInfo);
        }

        /// <summary>
        /// Asynchronous incoming connection data delegate
        /// 以异步的方式处理连接上的数据
        /// </summary>
        /// <param name="ar">The call back state object</param> 回调状态对象
#if NETFX_CORE
        private async Task IncomingTCPPacketHandler(IBuffer buffer)
#else
        private void IncomingTCPPacketHandler(IAsyncResult ar)
#endif
        {
            //Initialised with false so that logic still works in WP8
            //初始化状态为false  所以可以在wp8系统下运行
            bool dataAvailable = false;

#if !WINDOWS_PHONE && !NETFX_CORE
            //Incoming data always gets handled in a timeCritical fashion at this point
            //Windows phone and RT platforms do not support thread priorities
            //设定数据处理的优先级
            //wp 和rt平台不支持线程优先级的设定
            Thread.CurrentThread.Priority = NetworkComms.timeCriticalThreadPriority;
#endif

            try
            {
#if WINDOWS_PHONE
                Stream stream = ar.AsyncState as Stream;
                totalBytesRead = stream.EndRead(ar) + totalBytesRead;
#elif NETFX_CORE
                buffer.CopyTo(0, dataBuffer, totalBytesRead, (int)buffer.Length);
                totalBytesRead = (int)buffer.Length + totalBytesRead;
#else
                Stream stream;
                if (SSLOptions.SSLEnabled)
                    stream = (SslStream)ar.AsyncState;
                else
                    stream = (NetworkStream)ar.AsyncState;

                if (!stream.CanRead)
                    throw new ObjectDisposedException("Unable to read from stream.");

                if (!asyncListenerInRead) throw new InvalidDataException("The asyncListenerInRead flag should be true. 1");
                totalBytesRead = stream.EndRead(ar) + totalBytesRead;
                asyncListenerInRead = false;

                if (SSLOptions.SSLEnabled)
                    //SSLstream does not have a DataAvailable property. We will just assume false.
                    //SSLstream (安全套接层)数据流    不支持DataAvailable属性
                    dataAvailable = false;
                else
                    dataAvailable = ((NetworkStream)stream).DataAvailable;
#endif
                if (totalBytesRead > 0)
                {
                    ConnectionInfo.UpdateLastTrafficTime();

                    //If we have read a single byte which is 0 and we are not expecting other data
                    //如果只读取到一个字节,内容为0 (猜测这样的单字节一般用于心跳检测)
                    if (ConnectionInfo.ApplicationLayerProtocol == ApplicationLayerProtocolStatus.Enabled &&
                        totalBytesRead == 1 && dataBuffer[0] == 0 && packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached == 0)
                    {
                        if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandler() from " + ConnectionInfo + ". 1");
                    }
                    else
                    {
                        if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + totalBytesRead.ToString() + " bytes added to packetBuilder for " + ConnectionInfo + ". Cached " + packetBuilder.TotalBytesCached.ToString() + " bytes, expecting " + packetBuilder.TotalBytesExpected.ToString() + " bytes.");

                        //If there is more data to get then add it to the packets lists;
                        //packetBuilder  数据包创建器
                        //把读取到的字节添加到"数据包创建器"中
                        //totalBytesRead  此次读取到的字节数量(int类型)   dataBuffer  (byte[] 字节数组)
                        packetBuilder.AddPartialPacket(totalBytesRead, dataBuffer);

#if !WINDOWS_PHONE && !NETFX_CORE
                        //If we have more data we might as well continue reading synchronously
                        //In order to deal with data as soon as we think we have sufficient we will leave this loop
                        //如果我们有更多的数据 继续同步读取
                        //如果接收到的数据足够多,将离开此循环
                        //根据networkcomms的机制,第一个字节为数据包包头大小,系统会根据数据包包头大小解析出数据包包头,从数据包包头中读取数据包大小
                        //这个数据包大小就是packetBuilder中totalBytesExpected,即期望的大小
                        while (dataAvailable && packetBuilder.TotalBytesCached < packetBuilder.TotalBytesExpected)
                        {
                            int bufferOffset = 0;

                            //We need a buffer for our incoming data
                            //First we try to reuse a previous buffer
                            //我们需要为进入的数据准备缓冲区
                            //首先我们尝试使用以前的缓冲区
                            if (packetBuilder.TotalPartialPacketCount > 0 && packetBuilder.NumUnusedBytesMostRecentPartialPacket() > 0)
                                dataBuffer = packetBuilder.RemoveMostRecentPartialPacket(ref bufferOffset);
                            else
                            //If we have nothing to reuse we allocate a new buffer. As we are in this loop this can only be a suplementary buffer for THIS packet.
                            //Therefore we choose a buffer size between the initial amount and the maximum amount based on the expected size
                            //如果没有可重复使用的缓冲区,我们将新分配一个新的缓冲区
                            //缓冲区的大小为  数据包创建器.期待的大小 减去  数据包创建器.已收到的大小  (即 数据包长度 减去  已经接收的长度)
                            {
                                long additionalBytesNeeded = packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached;
                                dataBuffer = new byte[Math.Max(Math.Min(additionalBytesNeeded, NetworkComms.MaxReceiveBufferSizeBytes), NetworkComms.InitialReceiveBufferSizeBytes)];
                            }

                            totalBytesRead = stream.Read(dataBuffer, bufferOffset, dataBuffer.Length - bufferOffset) + bufferOffset;

                            if (totalBytesRead > 0)
                            {
                                ConnectionInfo.UpdateLastTrafficTime();

                                //If we have read a single byte which is 0 and we are not expecting other data
                                //处理单字节数据  一般为心跳消息
                                if (ConnectionInfo.ApplicationLayerProtocol == ApplicationLayerProtocolStatus.Enabled && totalBytesRead == 1 && dataBuffer[0] == 0 && packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached == 0)
                                {
                                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet ignored in IncomingPacketHandler() from " + ConnectionInfo + ". 2");
                                    //LastTrafficTime = DateTime.Now;
                                }
                                else
                                {
                                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + totalBytesRead.ToString() + " bytes added to packetBuilder for " + ConnectionInfo + ". Cached " + packetBuilder.TotalBytesCached.ToString() + " bytes, expecting " + packetBuilder.TotalBytesExpected.ToString() + " bytes.");
                                    packetBuilder.AddPartialPacket(totalBytesRead, dataBuffer);

                                    if (SSLOptions.SSLEnabled)

                                        //SSLstream does not have a DataAvailable property. We will just assume false.
                                        //SSLstream 没有DataAvailable属性
                                        dataAvailable = false;
                                    else
                                        dataAvailable = ((NetworkStream)stream).DataAvailable;
                                }
                            }
                            else
                                break;
                        }
#endif
                    }
                }

                if (packetBuilder.TotalBytesCached > 0 && packetBuilder.TotalBytesCached >= packetBuilder.TotalBytesExpected)
                {
                    //Once we think we might have enough data we call the incoming packet handle hand off
                    //Should we have a complete packet this method will start the appropriate task
                    //This method will now clear byes from the incoming packets if we have received something complete.
                    //一旦我们的数据包创建器(packetBuilder)中接收到足够一个数据包的数据,我们将会对数据进行处理
                    //IncomingPacketHandleHandOff方法处理完packetBuilder中相应的数据后,会把已经处理完的数据删除掉
                    IncomingPacketHandleHandOff(packetBuilder);
                }

                if (totalBytesRead == 0 && (!dataAvailable || ConnectionInfo.ConnectionState == ConnectionState.Shutdown))
                    CloseConnection(false, -2);
                else
                {
                    //We need a buffer for our incoming data
                    //First we try to reuse a previous buffer

                    //我们需要一个缓冲区来接收数据
                    //我们先尝试使用以前的缓冲区
                    if (packetBuilder.TotalPartialPacketCount > 0 && packetBuilder.NumUnusedBytesMostRecentPartialPacket() > 0)
                        dataBuffer = packetBuilder.RemoveMostRecentPartialPacket(ref totalBytesRead);
                    else
                    {
                        //If we have nothing to reuse we allocate a new buffer
                        //If packetBuilder.TotalBytesExpected is 0 we know we‘re going to start waiting for a fresh packet. Therefore use the initial buffer size
                        //如果没有可重复使用的缓冲区,我们将新分配一个新的缓冲区
                        //如果 数据包创建器 期待的数据大小为0,我们将开始接收一个新的数据包的数据   此时 把缓冲区设置为初始大小
                        if (packetBuilder.TotalBytesExpected == 0)
                            dataBuffer = new byte[NetworkComms.InitialReceiveBufferSizeBytes];
                        else
                        //Otherwise this can only be a supplementary buffer for THIS packet. Therefore we choose a buffer size between the initial amount and the maximum amount based on the expected size
                        //如果没有可重复使用的缓冲区,我们将新分配一个新的缓冲区
                        //缓冲区的大小为  数据包创建器.期待的大小 减去  数据包创建器.已收到的大小  (即 数据包长度 减去  已经接收的长度)
                        {
                            long additionalBytesNeeded = packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached;
                            dataBuffer = new byte[Math.Max(Math.Min(additionalBytesNeeded, NetworkComms.MaxReceiveBufferSizeBytes), NetworkComms.InitialReceiveBufferSizeBytes)];
                        }

                        totalBytesRead = 0;
                    }

#if NETFX_CORE
                    IBuffer newBuffer = Windows.Security.Cryptography.CryptographicBuffer.CreateFromByteArray(dataBuffer);
                    var task = IncomingTCPPacketHandler(await socket.InputStream.ReadAsync(newBuffer, newBuffer.Capacity - (uint)totalBytesRead, InputStreamOptions.Partial));
#elif WINDOWS_PHONE
                    stream.BeginRead(dataBuffer, totalBytesRead, dataBuffer.Length - totalBytesRead, IncomingTCPPacketHandler, stream);
#else
                    if (asyncListenerInRead) throw new InvalidDataException("The asyncListenerInRead flag should be false. 2");
                    asyncListenerInRead = true;
                    stream.BeginRead(dataBuffer, totalBytesRead, dataBuffer.Length - totalBytesRead, IncomingTCPPacketHandler, stream);
#endif
                }

            }
            catch (IOException)
            {
                CloseConnection(true, 12);
            }
            catch (ObjectDisposedException)
            {
                CloseConnection(true, 13);
            }
            catch (SocketException)
            {
                CloseConnection(true, 14);
            }
            catch (InvalidOperationException)
            {
                CloseConnection(true, 15);
            }
            catch (Exception ex)
            {
                LogTools.LogException(ex, "Error_TCPConnectionIncomingPacketHandler");
                CloseConnection(true, 31);
            }

#if !WINDOWS_PHONE && !NETFX_CORE
            Thread.CurrentThread.Priority = ThreadPriority.Normal;
#endif
        }

#if !WINDOWS_PHONE && !NETFX_CORE
        /// <summary>
        /// Synchronous incoming connection data worker
        /// 同步处理连接上进入的数据的处理器
        /// </summary>
        private void IncomingTCPDataSyncWorker()
        {
            bool dataAvailable = false;

            try
            {
                while (true)
                {
                    if (ConnectionInfo.ConnectionState == ConnectionState.Shutdown)
                        break;

                    int bufferOffset = 0;

                    //We need a buffer for our incoming data
                    //First we try to reuse a previous buffer
                    //我们需要为进入的数据准备缓冲区
                    //首先我们尝试使用以前的缓冲区
                    if (packetBuilder.TotalPartialPacketCount > 0 && packetBuilder.NumUnusedBytesMostRecentPartialPacket() > 0)
                        dataBuffer = packetBuilder.RemoveMostRecentPartialPacket(ref bufferOffset);
                    else
                    {
                        //If we have nothing to reuse we allocate a new buffer
                        //If packetBuilder.TotalBytesExpected is 0 we know we‘re going to start waiting for a fresh packet. Therefore use the initial buffer size
                        //如果没有可重复使用的缓冲区,我们将新分配一个新的缓冲区
                        //如果 数据包创建器 期待的数据大小为0,我们将开始接收一个新的数据包的数据   此时 把缓冲区设置为初始大小
                        if (packetBuilder.TotalBytesExpected == 0)
                            dataBuffer = new byte[NetworkComms.InitialReceiveBufferSizeBytes];
                        else
                        //Otherwise this can only be a supplementary buffer for THIS packet. Therefore we choose a buffer size between the initial amount and the maximum amount based on the expected size
                        //如果没有可重复使用的缓冲区,我们将新分配一个新的缓冲区
                        //缓冲区的大小为  数据包创建器.期待的大小 减去  数据包创建器.已收到的大小  (即 数据包长度 减去  已经接收的长度)
                        {
                            long additionalBytesNeeded = packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached;
                            dataBuffer = new byte[Math.Max(Math.Min(additionalBytesNeeded, NetworkComms.MaxReceiveBufferSizeBytes), NetworkComms.InitialReceiveBufferSizeBytes)];
                        }
                    }

                    //We block here until there is data to read
                    //When we read data we read until method returns or we fill the buffer length
                    //程序将会阻塞  直到读取到数据
                    //我们读取数据 直到方法返回或者读取到缓冲区大小的数据
                    totalBytesRead = connectionStream.Read(dataBuffer, bufferOffset, dataBuffer.Length - bufferOffset) + bufferOffset;

                    //Check to see if there is more data ready to be read
                    //检查是否还有数据需要读取
                    if (SSLOptions.SSLEnabled)
                        //SSLstream does not have a DataAvailable property. We will just assume false.
                        //SSLStream没有DataAvailable属性
                        dataAvailable = false;
                    else
                        dataAvailable = ((NetworkStream)connectionStream).DataAvailable;

                    //If we read any data it gets handed off to the packetBuilder
                    //如果我们读取到数据,交给packetBuilder处理
                    if (totalBytesRead > 0)
                    {
                        ConnectionInfo.UpdateLastTrafficTime();

                        //If we have read a single byte which is 0 and we are not expecting other data
                        //处理单字节 内容为0 的数据  (一般为心跳检测)
                        if (ConnectionInfo.ApplicationLayerProtocol == ApplicationLayerProtocolStatus.Enabled && totalBytesRead == 1 && dataBuffer[0] == 0 && packetBuilder.TotalBytesExpected - packetBuilder.TotalBytesCached == 0)
                        {
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingDataSyncWorker() from "+ConnectionInfo+".");
                        }
                        else
                        {
                            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... " + totalBytesRead.ToString() + " bytes added to packetBuilder for " + ConnectionInfo + ". Cached " + packetBuilder.TotalBytesCached.ToString() + " bytes, expecting " + packetBuilder.TotalBytesExpected.ToString() + " bytes.");

                            //添加数据到 数据包创建器中
                            packetBuilder.AddPartialPacket(totalBytesRead, dataBuffer);
                        }
                    }
                    else if (totalBytesRead == 0 && (!dataAvailable || ConnectionInfo.ConnectionState == ConnectionState.Shutdown))
                    {
                        //If we read 0 bytes and there is no data available we should be shutting down
                        //如果我们没有读取到数据 应该关闭连接
                        CloseConnection(false, -10);
                        break;
                    }

                    //If we have read some data and we have more or equal what was expected we attempt a data hand off
                    //如果我们读取到足够的数据(可以解析出一个数据包),对数据进行处理
                    if (packetBuilder.TotalBytesCached > 0 && packetBuilder.TotalBytesCached >= packetBuilder.TotalBytesExpected)
                        IncomingPacketHandleHandOff(packetBuilder);
                }
            }
            //On any error here we close the connection
            //捕捉异常  关闭连接
            catch (NullReferenceException)
            {
                CloseConnection(true, 7);
            }
            catch (IOException)
            {
                CloseConnection(true, 8);
            }
            catch (ObjectDisposedException)
            {
                CloseConnection(true, 9);
            }
            catch (SocketException)
            {
                CloseConnection(true, 10);
            }
            catch (InvalidOperationException)
            {
                CloseConnection(true, 11);
            }
            catch (Exception ex)
            {
                LogTools.LogException(ex, "Error_TCPConnectionIncomingPacketHandler");
                CloseConnection(true, 39);
            }

            //Clear the listen thread object because the thread is about to end
            //清除监听线程 因为线程将关闭
            incomingDataListenThread = null;

            if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Incoming data listen thread ending for " + ConnectionInfo);
        }

        /// <summary>
        /// Configure the SSL stream from this connection
        /// 配置当期连接上 SSL 数据流
        /// </summary>
        private void ConfigureSSLStream()
        {
            try
            {
                if (ConnectionInfo.ServerSide)
                {
                    connectionStream = new SslStream(tcpClient.GetStream(), false,
                        new RemoteCertificateValidationCallback(CertificateValidationCallback),
                        new LocalCertificateSelectionCallback(CertificateSelectionCallback));

                    ((SslStream)connectionStream).AuthenticateAsServer(SSLOptions.Certificate, SSLOptions.RequireMutualAuthentication, SslProtocols.Default, false);
                }
                else
                {
                    X509CertificateCollection certs = new X509CertificateCollection();

                    if (SSLOptions.Certificate != null) certs.Add(SSLOptions.Certificate);

                    //If we have a certificate set we use that to authenticate
                    connectionStream = new SslStream(tcpClient.GetStream(), false,
                        new RemoteCertificateValidationCallback(CertificateValidationCallback),
                        new LocalCertificateSelectionCallback(CertificateSelectionCallback));

                    ((SslStream)connectionStream).AuthenticateAsClient(SSLOptions.CertificateName, certs, SslProtocols.Default, false);

                }
            }
            catch (AuthenticationException ex)
            {
                throw new ConnectionSetupException("SSL authentication failed. Please check configuration and try again.", ex);
            }

            SSLOptions.Authenticated = true;
        }

        /// <summary>
        /// Callback used to determine if the provided certificate should be accepted
        /// 回调方法 用于确定是否应该接受所提供的证书
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="certificate"></param>
        /// <param name="chain"></param>
        /// <param name="sslPolicyErrors"></param>
        /// <returns></returns>
        private bool CertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            if (sslPolicyErrors == SslPolicyErrors.None)
                return true;
            else if (sslPolicyErrors == SslPolicyErrors.RemoteCertificateNotAvailable && ConnectionInfo.ServerSide)
                //If the client did not provide a remote certificate it may well be because
                //we were not requesting one
                //如果客户没有提供远程认证它很可能是因为我们没有要求
                return !SSLOptions.RequireMutualAuthentication;
            else if (SSLOptions.AllowSelfSignedCertificate && //If we allows self signed certificates we make sure the errors are correct  如果我们允许自签名认证 我们确定这些错误是正确的
                chain.ChainStatus.Length == 1 && //Only a single chain error 只是一个单一的链错误
                sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors &&
                chain.ChainStatus[0].Status == X509ChainStatusFlags.UntrustedRoot)
            {
                //If we have a local certificate we compare them  如果我们有一个本地的证书 我们比较他们
                if (SSLOptions.Certificate != null)
                    return certificate.Equals(SSLOptions.Certificate);
                else
                    return true;
            }
            else
                return false;
        }

        /// <summary>
        /// Certificate selection callback
        /// 证书选择 回调方法
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="targetHost"></param>
        /// <param name="localCertificates"></param>
        /// <param name="remoteCertificate"></param>
        /// <param name="acceptableIssuers"></param>
        /// <returns></returns>
        private X509Certificate CertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates,
            X509Certificate remoteCertificate, string[] acceptableIssuers)
        {
            return SSLOptions.Certificate;
        }
#endif

        /// <inheritdoc />
        protected override void CloseConnectionSpecific(bool closeDueToError, int logLocation = 0)
        {
#if WINDOWS_PHONE || NETFX_CORE
            //Try to close the socket
            try
            {
                socket.Dispose();
            }
            catch (Exception)
            {
            }
#else
            //The following attempts to correctly close the connection
            //Try to close the networkStream first
            //以下试图正确的关闭连接
            //首先尝试关闭网络流
            try
            {
                if (connectionStream != null) connectionStream.Close();
            }
            catch (Exception)
            {
            }
            finally
            {
                connectionStream = null;
            }

            //Try to close the tcpClient
            //尝试关闭tcpClient对象
            try
            {
                if (tcpClient.Client!=null)
                {
                    tcpClient.Client.Disconnect(false);

#if !ANDROID
                    //Throws uncatchable exception in android
                    //在安卓系统中抛出不可捕捉的异常
                    tcpClient.Client.Close();
#endif
                }
            }
            catch (Exception)
            {
            }

            //Try to close the tcpClient
            //尝试关闭连接
            try
            {
#if !ANDROID
                //Throws uncatchable exception in android
                //在安卓系统中抛出不可捕捉的异常
                tcpClient.Close();
#endif
            }
            catch (Exception)
            {
            }
#endif
        }

        /// <inheritdoc />
        protected override double[] SendStreams(StreamTools.StreamSendWrapper[] streamsToSend, double maxSendTimePerKB, long totalBytesToSend)
        {
            double[] timings = new double[streamsToSend.Length];

            Stream sendingStream;
#if WINDOWS_PHONE || NETFX_CORE
            sendingStream = socket.OutputStream.AsStreamForWrite();
#else
            sendingStream = connectionStream;
#endif

            for(int i=0; i<streamsToSend.Length; i++)
            {
                if (streamsToSend[i].Length > 0)
                {
                    //Write each stream
                    //写入数据流
                    timings[i] = streamsToSend[i].ThreadSafeStream.CopyTo(sendingStream, streamsToSend[i].Start, streamsToSend[i].Length, NetworkComms.SendBufferSizeBytes, maxSendTimePerKB, MinSendTimeoutMS);
                    streamsToSend[i].ThreadSafeStream.Dispose();
                }
                else
                    timings[i] = 0;
            }

#if WINDOWS_PHONE || NETFX_CORE
            sendingStream.Flush();
#endif

#if !WINDOWS_PHONE && !NETFX_CORE
            if (!tcpClient.Connected)
            {
                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Error("TCPClient is not marked as connected after write to networkStream. Possibly indicates a dropped connection.");

                throw new CommunicationException("TCPClient is not marked as connected after write to networkStream. Possibly indicates a dropped connection.");
            }
#endif

            return timings;
        }
    }
时间: 2024-08-06 12:28:51

介绍开源的.net通信框架NetworkComms框架 源码分析(二十三 )TCPConnection的相关文章

缓存框架OSCache部分源码分析

在并发量比较大的场景,如果采用直接访问数据库的方式,将会对数据库带来巨大的压力,严重的情况下可能会导致数据库不可用状态,并且时间的消耗也是不能容忍的.在这种情况下,一般采用缓存的方式.将经常访问的热点数据提前加载到内存中,这样能够大大降低数据库的压力. OSCache是一个开源的缓存框架,虽然现在已经停止维护了,但是对于OSCache的实现还是值得学习和借鉴的.下面通过OSCache的部分源码分析OSCache的设计思想. 缓存数据结构 通常缓存都是通过<K,V>这种数据结构存储,但缓存都是应

android-----XUtils框架之HttpUtils源码分析

之前我们对Volley框架源码进行了分析,知道了他适用于频繁的网络请求,但是不太适合post较大数据以及文件的上传操作,在项目中为了弥补Volley的这个缺陷,使用了XUtils框架的HttpUtils实现了文件上传的操作,上一篇博客我们通过HttpUtils实现了照片上传的实例,见:android-----基于XUtils客户端以及服务器端实现,当然文件上传的方法类似于照片上传,有时间的话单独写一篇博客介绍,这篇博客我们从源码角度来分析HttpUtils的实现原理,希望对这几天的学习做个总结:

ABP源码分析二十六:核心框架中的一些其他功能

本文是ABP核心项目源码分析的最后一篇,介绍一些前面遗漏的功能 AbpSession AbpSession: 目前这个和CLR的Session没有什么直接的联系.当然可以自定义的去实现IAbpSession使之与CLR的Session关联 IAbpSession:定义如下图中的四个属性. NullAbpSession:IAbpSession的一个缺省实现,给每个属性都给予null值,无实际作用 ClaimsAbpSession:实现了从ClaimsPrincipal/ClaimsIdentity

KopDB 框架学习2——源码分析

我的博客:http://mrfufufu.github.io/ 上次我们主要是对 KopDB 框架的使用进行了分析,它是非常简单有用的.这次主要是对它的源码进行分析,进一步的来了解它的真面目. 点击这里去往 "KopDB 框架学习1--使用" 因为 KopDB 采用的是对象关系映射(ORM)模式,即我们使用的编程语言是面向对象语言,而我们使用的数据库则是关系型数据库,那么将面向对象的语言和面向关系的数据库之间建立一种映射关系,这就是对象关系映射了. 使用 ORM 模式,主要是因为我们平

android-----XUtils框架之BitmapUtils源码分析

上一篇使用XUtils的BitmapUtils实现了一个照片墙的功能,参见:android-----XUtils框架之BitmapUtils加载照片实现,这一篇我们从源码的角度分析下BitmapUtils到底是怎么一个执行流程的: 先来回顾下之前我们使用BitmapUtils的步骤: 很简单,就只有两步: (1)通过BitmapUtils的构造函数创建对象: (2)调用BitmapUtils对象的display方法: 好了,我们先从创建BitmapUtils对象开始分析,很自然想到了Bitmap

首选项框架PreferenceFragment部分源码分析

因为要改一些settings里面的bug以及之前在里面有做过勿扰模式,准备对勿扰模式做一个总结,那先分析一下settings的源码,里面的核心应该就是android3.0 上面的首选项框架PreferenceFragment.因为在3.0之前都是把这些东西放在PreferenceActivity的,但是3.0之后google建议把setting放在PreferenceFragment,但是PreferenceActivity也同时在用的,下面就此总结一下: PreferenceActivity的

thinkphp5 源码分析二 框架引导

框架引导文件源代码 (/thinkphp/start.php) 1. 引入基础文件(/thinkphp/base.php) // 加载基础文件 require __DIR__ . '/base.php'; 基础文件(/thinkphp/base.php) 2. 定义系统常量 1 define('THINK_VERSION', '5.0.9'); 2 define('THINK_START_TIME', microtime(true)); 3 define('THINK_START_MEM', m

Java依赖注入库框架 Dagger的源码分析(一)

1.GeneratedAdapters 对应的注释上面是这么说的: A single point for API used in common by Adapters and Adapter generators 被Adapters以及Adapter的生产者广泛使用 通过代码,可以注意到,这是一个final类,是不允许被重载的. 他的构造函数是一个空的构造函数. 同时带有下面的常量的定义: private static final String SEPARATOR = "$$"; pu

Mahout协同过滤框架Taste的源码分析

推荐过程 主要分成了如下几步来完成推荐1. 输入数据预处理2. 获取评分矩阵3. 计算物品相似度4. 矩阵乘法5. 数据过滤6. 计算推荐 测试数据 user&item 1 2 3 4 5 1 3 3 3 2 0 2 4 4 4 0 0 3 5 5 5 0 3 4 4 4 4 1 4 继续阅读 →

Django框架——CBV及源码分析

CBV (基于类的视图函数) 代码示例: urls.py url(r'^login/',views.MyLogin.as_view()) views.py from django.views import View class MyLogin(View): def get(self,request): print("from MyLogin get方法") return render(request,'login.html') def post(self,request): retur