大致浏览了一遍,Scut 的网络模型采用的是 SAEA 模型, 它是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。
通过分析 Scut 的套接字监听控制,就能大致明白它是如何使用 SAEA 架构的。
1. 套接字缓冲区内存管理器
先来看下 Scut 对套接字缓冲区的内存管理:
class BufferManager { int capacity; byte[] bufferBlock; Stack<int> freeIndexPool; int currentIndex; int saeaSize; /* * capacity 表示为所有套接字准备的内存容量 * saeaSzie 表示单个套接字所需的内存量 */ public BufferManager(int capacity, int saeaSize) { this.capacity = capacity; this.saeaSize = saeaSize; this.freeIndexPool = new Stack<int>(); } //申请整份的内存空间 internal void InitBuffer() { this.bufferBlock = new byte[capacity]; } //为每个 SAEA 向缓存管理器申请缓存 internal bool SetBuffer(SocketAsyncEventArgs args) { if (this.freeIndexPool.Count > 0) //用一个堆栈记录非顺序释放的内存块,优先使用这些内存块作为缓存 { args.SetBuffer(this.bufferBlock, this.freeIndexPool.Pop(), this.saeaSize); } else { if ((capacity - this.saeaSize) < this.currentIndex) { return false; } args.SetBuffer(this.bufferBlock, this.currentIndex, this.saeaSize); this.currentIndex += this.saeaSize; } return true; } //为SAEA将缓存还给缓存管理器 internal void FreeBuffer(SocketAsyncEventArgs args) { this.freeIndexPool.Push(args.Offset); args.SetBuffer(null, 0, 0); } }
使用一个堆栈来管理”碎片大小相同、随时取用与释放”的内存块,这段代码算是十分高效与简介了。
2. SocketListener 的初始化
private void Init() { this.bufferManager.InitBuffer(); for (int i = 0; i < this.socketSettings.MaxAcceptOps; i++) //创建一个接受连接的SAEA池子 { this.acceptEventArgsPool.Push(CreateAcceptEventArgs());
private SocketAsyncEventArgs CreateAcceptEventArgs() { SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Accept_Completed); //这部分SAEA绑定的都是“完成连接”事件处理API return acceptEventArg; }
} SocketAsyncEventArgs ioEventArgs; for (int i = 0; i < this.socketSettings.NumOfSaeaForRecSend; i++) //创建一个处理IO的SAEA池子 { ioEventArgs = new SocketAsyncEventArgs(); this.bufferManager.SetBuffer(ioEventArgs); ioEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); //这部分SAEA绑定的都是“IO”事件处理API DataToken dataToken = new DataToken(); dataToken.bufferOffset = ioEventArgs.Offset; //每个SAEA在缓存管理器中获取的内存块的起始偏移都是唯一的,可以用来做唯一标识 ioEventArgs.UserToken = dataToken; this.ioEventArgsPool.Push(ioEventArgs); } _summaryTimer = new Timer(OnSummaryTrace, null, 600, 60000);
public class SummaryStatus //日志定时记录连接的状态 { /// <summary> /// /// </summary> public long TotalConnectCount; /// <summary> /// /// </summary> public int CurrentConnectCount; /// <summary> /// /// </summary> public int RejectedConnectCount; /// <summary> /// /// </summary> public int CloseConnectCount; }
}
3. 监听-连接-数据传输流程
那么,这么多SAEA是如何工作的呢?
public void StartListen() { listenSocket = new Socket(this.socketSettings.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); //建立TCP监听套接字 listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); //进一步设置监听套接字参数 listenSocket.Bind(this.socketSettings.LocalEndPoint); //绑定端口 listenSocket.Listen(socketSettings.Backlog); //开始监听,并设置最大排队连接数 _isStart = true; requestHandler.Bind(this); PostAccept(); }
看一下 SocketOptionLevel 的作用:
SocketOptionLevel.IP:仅适用于 IP 套接字;
SocketOptionLevel.IPv6:仅适用于 IPv6 套接字;
SocketOptionLevel.Socket:适用于所有套接字;
SocketOptionLevel.Tcp、SocketOptionLevel.Udp:适用于TCP、UDP套接字;
SocketOptionName.ReuseAddress:允许将套接字绑定到已在使用中的地址。
private void PostAccept() { try { if (!_isStart) { return; } SocketAsyncEventArgs acceptEventArgs = acceptEventArgsPool.Pop() ?? CreateAcceptEventArgs(); //从accept SAEA池中取出一个SAEA交给监听套接字去获取连接参数 bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs); if (!willRaiseEvent) //直接同步取得连接则顺序执行,异步取得则触发 Accept_Completed 事件处理函数 { ProcessAccept(acceptEventArgs); //处理连接 } } catch (Exception ex) { TraceLog.WriteError("Post accept listen error:{0}", ex); } }
我们可以看到在 Accept_Completed 中也是同样调用了 ProcessAccept;
private void Accept_Completed(object sender, SocketAsyncEventArgs acceptEventArgs) { try { ProcessAccept(acceptEventArgs); } catch (Exception ex) { ... ... } }
继续看 ProcessAccept 是如何工作的:
private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs) { try { Interlocked.Increment(ref _summaryStatus.TotalConnectCount); //向监控器提交“总连接数+1” maxConnectionsEnforcer.WaitOne(); //堵塞一个信号量,由此可知,信号量的总数控制了可并发处理的accept连接数 if (acceptEventArgs.SocketError != SocketError.Success) { Interlocked.Increment(ref _summaryStatus.RejectedConnectCount); //向监控器提交“被拒绝连接数+1” HandleBadAccept(acceptEventArgs); } else { Interlocked.Increment(ref _summaryStatus.CurrentConnectCount); //向监控器提交“当前连接数+1” SocketAsyncEventArgs ioEventArgs = this.ioEventArgsPool.Pop(); //获取IO SAEA 池中的一个SAEA ioEventArgs.AcceptSocket = acceptEventArgs.AcceptSocket; //将 accept 建立的 io 套接字交给该 SAEA var dataToken = (DataToken)ioEventArgs.UserToken; ioEventArgs.SetBuffer(dataToken.bufferOffset, socketSettings.BufferSize); //为 io SAEA 提供缓存 var exSocket = new ExSocket(ioEventArgs.AcceptSocket); // 将 io 套接字用 ExSocket 管理起来 exSocket.LastAccessTime = DateTime.Now; dataToken.Socket = exSocket; acceptEventArgs.AcceptSocket = null; //release connect when socket has be closed. ReleaseAccept(acceptEventArgs, false); //该 accept SAEA 已经完成任务,释放其资源 try { OnConnected(new ConnectionEventArgs { Socket = exSocket }); //OnConnected 是 SocketListener 的“连接事件订阅器”,成功连接时触发该订阅 } catch (Exception ex) { TraceLog.WriteError("OnConnected error:{0}", ex); } PostReceive(ioEventArgs); } } finally { PostAccept(); //处理完毕后又重新开始监听 } }
可以看到这个api 做的最重要的事情:1. 将建立连接的socket交给ioSAEA;2. ioSAEA去底层获取消息;3. 继续监听;
疑问:如果只有1个监听套接字,为什么要做一个 acceptpool?
再来看下 ioSAEA 的工作流程:
private void PostReceive(SocketAsyncEventArgs ioEventArgs) { if (ioEventArgs.AcceptSocket == null) return; bool willRaiseEvent = ioEventArgs.AcceptSocket.ReceiveAsync(ioEventArgs); //异步接收io数据 if (!willRaiseEvent) //如果同步获得直接处理,异步获得则由异步回调处理 { ProcessReceive(ioEventArgs); } }
无论哪种处理方式,都是调用 ProcessReceive,其中比较重要的部分:
bool needPostAnother = requestHandler.TryReceiveMessage(ioEventArgs, out messages, out hasHandshaked);
在 SocketListener 启动的时候我们注意到:
requestHandler.Bind(this);
监听套接字管理器自带 requestHandle,这是个什么东西?
public class RequestHandler { public RequestHandler(BaseMessageProcessor messageProcessor) { MessageProcessor = messageProcessor; } internal virtual void Bind(ISocket appServer) { AppServer = appServer; } public ISocket AppServer { get; private set; } ... ... }
protected GameSocketHost() : this(new RequestHandler(new MessageHandler())) { } protected GameWebSocketHost(bool isSecurity = false) : this(new WebSocketRequestHandler(isSecurity)) { }
从 Scut 的以上代码应该可以得知:什么类型的套接字宿主应该绑定相应类型的消息处理API。
进一步观察,websocket 与 socket 的“消息发送API”是一致的,而“消息读取”API则完全不同,有空再回来研究这块内容。
继续回到 ProcessReceive:
switch (message.OpCode) { case OpCode.Close: var statusCode = requestHandler.MessageProcessor != null ? requestHandler.MessageProcessor.GetCloseStatus(message.Data) : OpCode.Empty; if (statusCode != OpCode.Empty) { DoClosedStatus(exSocket, statusCode); } Closing(ioEventArgs, OpCode.Empty); needPostAnother = false; break; case OpCode.Ping: DoPing(new ConnectionEventArgs { Socket = exSocket, Meaage = message }); break; case OpCode.Pong: DoPong(new ConnectionEventArgs { Socket = exSocket, Meaage = message }); break; default: OnDataReceived(new ConnectionEventArgs { Socket = exSocket, Meaage = message }); break; }
如果是常规数据,则调用 OnDataReceived,这是更上一层注册的逻辑消息处理API,正常来说,到了进入“应用消息分发器”-IActionDispatcher 的节奏了。
继续往上查,果然不出意料。