class MessageReceiver { private RelayEngine<MessageCollection> _MessageRelayEngine; private string _Hostname; private int _MessageDispatchServerPort; private string _SessionId; private TcpClient _Client; private bool _Stopped = false; private Thread _Worker = null; private ulong _LastMessageSequence = 0; internal MessageReceiver(RelayEngine<MessageCollection> messageRelayEngine) { this._MessageRelayEngine = messageRelayEngine; } internal void Start(string hostname, int messageDispatchServerPort, string sessionId) { this.Stop(); this._Stopped = false; this._Hostname = hostname; this._MessageDispatchServerPort = messageDispatchServerPort; this._SessionId = sessionId; this._Worker = new Thread(ConnectServerAndReceiveMessage); this._Worker.IsBackground = true; this._Worker.Start(); } internal void SetSessionId(string newSessionId) { this._SessionId = newSessionId; } private void ConnectServerAndReceiveMessage(object state) { byte[] header = new byte[4]; int defaultBufferLen = 1024 * 64; byte[] defaultBuffer = new byte[defaultBufferLen]; while (!this._Stopped) { this._Client = null; NetworkStream stream = null; try { Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage to connect {0}:{1}", _Hostname, _MessageDispatchServerPort); this._Client = new TcpClient(); this._Client.Connect(_Hostname, _MessageDispatchServerPort); stream = this._Client.GetStream(); byte[] sessionData = ASCIIEncoding.ASCII.GetBytes(this._SessionId); byte[] sessionDataLen = new byte[2] { (byte)(sessionData.Length >> 8), (byte)sessionData.Length }; stream.Write(sessionDataLen, 0, sessionDataLen.Length); stream.Write(sessionData, 0, sessionData.Length); } catch (Exception ex) { Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage error:\r\n{0}", ex); this.CloseConectionSilently(); continue; } while (true) { Array.Clear(header, 0, header.Length); if (!this.ReadAll(stream, header)) { Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read header data failed"); this.CloseConectionSilently(); break;//reconnect } int dataLength = ((int)header[0] << 24) + ((int)header[1] << 16) + ((int)header[2] << 8) + header[3]; byte[] buffer = dataLength <= defaultBufferLen ? defaultBuffer : new byte[dataLength]; if (!this.ReadAll(stream, buffer, dataLength)) { Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read message data failed"); this.CloseConectionSilently(); break;//reconnect } try { MessageCollection message = CompressHelper.FromByteArray<MessageCollection>(buffer, dataLength); if (message.Sequence != this._LastMessageSequence) { Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ConnectServerAndReceiveMessage got message with worng sequence {0}, excepted sequece is {1}", message.Sequence, this._LastMessageSequence); } this._LastMessageSequence++; this._MessageRelayEngine.AddItem(message); ConsoleClient.Instance.RefreshLastMsgTime(); } catch (Exception ex) { Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage add message to engine error:\r\n{0}", ex); } } } } private void CloseConectionSilently() { try { if (this._Client != null) { this._Client.Close(); this._Client = null; } } catch (Exception ex) { Logger.TraceEvent(TraceEventType.Error, "MessageReceiver CloseConectionSilently error:\r\n{0}", ex); } } private bool ReadAll(NetworkStream stream, byte[] buffer, int? dataLength = null) { try { int offset = 0; int len = dataLength.HasValue ? dataLength.Value : buffer.Length; while (len > 0) { if (!stream.DataAvailable) { Thread.Sleep(100); continue; } int readLength = stream.Read(buffer, offset, len); if (readLength == 0) { return false; } else { offset += readLength; len -= offset; } } return true; } catch (Exception ex) { Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ReadAll error:\r\n{0}", ex.ToString()); return false; } } internal void Stop() { this._Stopped = true; this._LastMessageSequence = 0; this.CloseConectionSilently(); if (this._Worker != null) { this._Worker.Join(1000); this._Worker = null; } } }
时间: 2024-10-11 23:14:23