【转】C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装

http://blog.csdn.net/sqldebug_fan/article/details/17557341

1、SocketAsyncEventArgs介绍

SocketAsyncEventArgs是微软提供的高性能异步Socket实现类,主要为高性能网络服务器应用程序而设计,主要是为了避免在在异步套接字 I/O 量非常大时发生重复的对象分配和同步。使用此类执行异步套接字操作的模式包含以下步骤:
1.分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
2.将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。
3.调用适当的套接字方法 (xxxAsync) 以启动异步操作。
4.如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
5.如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
6.将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。

2、SocketAsyncEventArgs封装

使用SocketAsyncEventArgs之前需要先建立一个Socket监听对象,使用如下代码:

[csharp] view plain copy

  1. public void Start(IPEndPoint localEndPoint)
  2. {
  3. listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  4. listenSocket.Bind(localEndPoint);
  5. listenSocket.Listen(m_numConnections);
  6. Program.Logger.InfoFormat("Start listen socket {0} success", localEndPoint.ToString());
  7. //for (int i = 0; i < 64; i++) //不能循环投递多次AcceptAsync,会造成只接收8000连接后不接收连接了
  8. StartAccept(null);
  9. m_daemonThread = new DaemonThread(this);
  10. }

然后开始接受连接,SocketAsyncEventArgs有连接时会通过Completed事件通知外面,所以接受连接的代码如下:

[csharp] view plain copy

  1. public void StartAccept(SocketAsyncEventArgs acceptEventArgs)
  2. {
  3. if (acceptEventArgs == null)
  4. {
  5. acceptEventArgs = new SocketAsyncEventArgs();
  6. acceptEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
  7. }
  8. else
  9. {
  10. acceptEventArgs.AcceptSocket = null; //释放上次绑定的Socket,等待下一个Socket连接
  11. }
  12. m_maxNumberAcceptedClients.WaitOne(); //获取信号量
  13. bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
  14. if (!willRaiseEvent)
  15. {
  16. ProcessAccept(acceptEventArgs);
  17. }
  18. }

接受连接响应事件代码:

[csharp] view plain copy

  1. void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
  2. {
  3. try
  4. {
  5. ProcessAccept(acceptEventArgs);
  6. }
  7. catch (Exception E)
  8. {
  9. Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", acceptEventArgs.AcceptSocket, E.Message);
  10. Program.Logger.Error(E.StackTrace);
  11. }
  12. }

[csharp] view plain copy

  1. private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
  2. {
  3. Program.Logger.InfoFormat("Client connection accepted. Local Address: {0}, Remote Address: {1}",
  4. acceptEventArgs.AcceptSocket.LocalEndPoint, acceptEventArgs.AcceptSocket.RemoteEndPoint);
  5. AsyncSocketUserToken userToken = m_asyncSocketUserTokenPool.Pop();
  6. m_asyncSocketUserTokenList.Add(userToken); //添加到正在连接列表
  7. userToken.ConnectSocket = acceptEventArgs.AcceptSocket;
  8. userToken.ConnectDateTime = DateTime.Now;
  9. try
  10. {
  11. bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求
  12. if (!willRaiseEvent)
  13. {
  14. lock (userToken)
  15. {
  16. ProcessReceive(userToken.ReceiveEventArgs);
  17. }
  18. }
  19. }
  20. catch (Exception E)
  21. {
  22. Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", userToken.ConnectSocket, E.Message);
  23. Program.Logger.Error(E.StackTrace);
  24. }
  25. StartAccept(acceptEventArgs); //把当前异步事件释放,等待下次连接
  26. }

接受连接后,从当前Socket缓冲池AsyncSocketUserTokenPool中获取一个用户对象AsyncSocketUserToken,AsyncSocketUserToken包含一个接收异步事件m_receiveEventArgs,一个发送异步事件m_sendEventArgs,接收数据缓冲区m_receiveBuffer,发送数据缓冲区m_sendBuffer,协议逻辑调用对象m_asyncSocketInvokeElement,建立服务对象后,需要实现接收和发送的事件响应函数:

[csharp] view plain copy

  1. void IO_Completed(object sender, SocketAsyncEventArgs asyncEventArgs)
  2. {
  3. AsyncSocketUserToken userToken = asyncEventArgs.UserToken as AsyncSocketUserToken;
  4. userToken.ActiveDateTime = DateTime.Now;
  5. try
  6. {
  7. lock (userToken)
  8. {
  9. if (asyncEventArgs.LastOperation == SocketAsyncOperation.Receive)
  10. ProcessReceive(asyncEventArgs);
  11. else if (asyncEventArgs.LastOperation == SocketAsyncOperation.Send)
  12. ProcessSend(asyncEventArgs);
  13. else
  14. throw new ArgumentException("The last operation completed on the socket was not a receive or send");
  15. }
  16. }
  17. catch (Exception E)
  18. {
  19. Program.Logger.ErrorFormat("IO_Completed {0} error, message: {1}", userToken.ConnectSocket, E.Message);
  20. Program.Logger.Error(E.StackTrace);
  21. }
  22. }

在Completed事件中需要处理发送和接收的具体逻辑代码,其中接收的逻辑实现如下:

[csharp] view plain copy

  1. private void ProcessReceive(SocketAsyncEventArgs receiveEventArgs)
  2. {
  3. AsyncSocketUserToken userToken = receiveEventArgs.UserToken as AsyncSocketUserToken;
  4. if (userToken.ConnectSocket == null)
  5. return;
  6. userToken.ActiveDateTime = DateTime.Now;
  7. if (userToken.ReceiveEventArgs.BytesTransferred > 0 && userToken.ReceiveEventArgs.SocketError == SocketError.Success)
  8. {
  9. int offset = userToken.ReceiveEventArgs.Offset;
  10. int count = userToken.ReceiveEventArgs.BytesTransferred;
  11. if ((userToken.AsyncSocketInvokeElement == null) & (userToken.ConnectSocket != null)) //存在Socket对象,并且没有绑定协议对象,则进行协议对象绑定
  12. {
  13. BuildingSocketInvokeElement(userToken);
  14. offset = offset + 1;
  15. count = count - 1;
  16. }
  17. if (userToken.AsyncSocketInvokeElement == null) //如果没有解析对象,提示非法连接并关闭连接
  18. {
  19. Program.Logger.WarnFormat("Illegal client connection. Local Address: {0}, Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,
  20. userToken.ConnectSocket.RemoteEndPoint);
  21. CloseClientSocket(userToken);
  22. }
  23. else
  24. {
  25. if (count > 0) //处理接收数据
  26. {
  27. if (!userToken.AsyncSocketInvokeElement.ProcessReceive(userToken.ReceiveEventArgs.Buffer, offset, count))
  28. { //如果处理数据返回失败,则断开连接
  29. CloseClientSocket(userToken);
  30. }
  31. else //否则投递下次介绍数据请求
  32. {
  33. bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求
  34. if (!willRaiseEvent)
  35. ProcessReceive(userToken.ReceiveEventArgs);
  36. }
  37. }
  38. else
  39. {
  40. bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求
  41. if (!willRaiseEvent)
  42. ProcessReceive(userToken.ReceiveEventArgs);
  43. }
  44. }
  45. }
  46. else
  47. {
  48. CloseClientSocket(userToken);
  49. }
  50. }

由于我们制定的协议第一个字节是协议标识,因此在接收到第一个字节的时候需要绑定协议解析对象,具体代码实现如下:

[csharp] view plain copy

  1. private void BuildingSocketInvokeElement(AsyncSocketUserToken userToken)
  2. {
  3. byte flag = userToken.ReceiveEventArgs.Buffer[userToken.ReceiveEventArgs.Offset];
  4. if (flag == (byte)SocketFlag.Upload)
  5. userToken.AsyncSocketInvokeElement = new UploadSocketProtocol(this, userToken);
  6. else if (flag == (byte)SocketFlag.Download)
  7. userToken.AsyncSocketInvokeElement = new DownloadSocketProtocol(this, userToken);
  8. else if (flag == (byte)SocketFlag.RemoteStream)
  9. userToken.AsyncSocketInvokeElement = new RemoteStreamSocketProtocol(this, userToken);
  10. else if (flag == (byte)SocketFlag.Throughput)
  11. userToken.AsyncSocketInvokeElement = new ThroughputSocketProtocol(this, userToken);
  12. else if (flag == (byte)SocketFlag.Control)
  13. userToken.AsyncSocketInvokeElement = new ControlSocketProtocol(this, userToken);
  14. else if (flag == (byte)SocketFlag.LogOutput)
  15. userToken.AsyncSocketInvokeElement = new LogOutputSocketProtocol(this, userToken);
  16. if (userToken.AsyncSocketInvokeElement != null)
  17. {
  18. Program.Logger.InfoFormat("Building socket invoke element {0}.Local Address: {1}, Remote Address: {2}",
  19. userToken.AsyncSocketInvokeElement, userToken.ConnectSocket.LocalEndPoint, userToken.ConnectSocket.RemoteEndPoint);
  20. }
  21. }

发送响应函数实现需要注意,我们是把发送数据放到一个列表中,当上一个发送事件完成响应Completed事件,这时我们需要检测发送队列中是否存在未发送的数据,如果存在则继续发送。

[csharp] view plain copy

  1. private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
  2. {
  3. AsyncSocketUserToken userToken = sendEventArgs.UserToken as AsyncSocketUserToken;
  4. if (userToken.AsyncSocketInvokeElement == null)
  5. return false;
  6. userToken.ActiveDateTime = DateTime.Now;
  7. if (sendEventArgs.SocketError == SocketError.Success)
  8. return userToken.AsyncSocketInvokeElement.SendCompleted(); //调用子类回调函数
  9. else
  10. {
  11. CloseClientSocket(userToken);
  12. return false;
  13. }
  14. }

SendCompleted用于回调下次需要发送的数据,具体实现过程如下:

[csharp] view plain copy

  1. public virtual bool SendCompleted()
  2. {
  3. m_activeDT = DateTime.UtcNow;
  4. m_sendAsync = false;
  5. AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;
  6. asyncSendBufferManager.ClearFirstPacket(); //清除已发送的包
  7. int offset = 0;
  8. int count = 0;
  9. if (asyncSendBufferManager.GetFirstPacket(ref offset, ref count))
  10. {
  11. m_sendAsync = true;
  12. return m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,
  13. asyncSendBufferManager.DynamicBufferManager.Buffer, offset, count);
  14. }
  15. else
  16. return SendCallback();
  17. }
  18. //发送回调函数,用于连续下发数据
  19. public virtual bool SendCallback()
  20. {
  21. return true;
  22. }

当一个SocketAsyncEventArgs断开后,我们需要断开对应的Socket连接,并释放对应资源,具体实现函数如下:

[csharp] view plain copy

  1. public void CloseClientSocket(AsyncSocketUserToken userToken)
  2. {
  3. if (userToken.ConnectSocket == null)
  4. return;
  5. string socketInfo = string.Format("Local Address: {0} Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,
  6. userToken.ConnectSocket.RemoteEndPoint);
  7. Program.Logger.InfoFormat("Client connection disconnected. {0}", socketInfo);
  8. try
  9. {
  10. userToken.ConnectSocket.Shutdown(SocketShutdown.Both);
  11. }
  12. catch (Exception E)
  13. {
  14. Program.Logger.ErrorFormat("CloseClientSocket Disconnect client {0} error, message: {1}", socketInfo, E.Message);
  15. }
  16. userToken.ConnectSocket.Close();
  17. userToken.ConnectSocket = null; //释放引用,并清理缓存,包括释放协议对象等资源
  18. m_maxNumberAcceptedClients.Release();
  19. m_asyncSocketUserTokenPool.Push(userToken);
  20. m_asyncSocketUserTokenList.Remove(userToken);
  21. }

3、SocketAsyncEventArgs封装和MSDN的不同点

MSDN在http://msdn.microsoft.com/zh-cn/library/system.NET.sockets.socketasynceventargs(v=vs.110).aspx实现了示例代码,并实现了初步的池化处理,我们是在它的基础上扩展实现了接收数据缓冲,发送数据队列,并把发送SocketAsyncEventArgs和接收SocketAsyncEventArgs分开,并实现了协议解析单元,这样做的好处是方便后续逻辑实现文件的上传,下载和日志输出。

DEMO下载地址:http://download.csdn.Net/detail/sqldebug_fan/7467745
免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:[email protected]。

时间: 2024-10-13 09:33:57

【转】C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装的相关文章

C#高性能大容量SOCKET并发(转)

C#高性能大容量SOCKET并发(零):代码结构说明 C#高性能大容量SOCKET并发(一):IOCP完成端口例子介绍 C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装 C#高性能大容量SOCKET并发(三):接收.发送 C#高性能大容量SOCKET并发(四):缓存设计 C#高性能大容量SOCKET并发(五):粘包.分包.解包 C#高性能大容量SOCKET并发(六):超时Socket断开(守护线程)和心跳包 C#高性能大容量SOCKET并发(七):协议字符集

DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包

DELPHI高性能大容量SOCKET并发(四):粘包.分包.解包 粘包 使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.粘包可能由发送方造成,也可能由接收方造成.TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连.如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包. 粘包一般的解决办法是制定通讯协议,由协议来规

C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装

1.SocketAsyncEventArgs介绍 SocketAsyncEventArgs是微软提供的高性能异步Socket实现类,主要为高性能网络服务器应用程序而设计,主要是为了避免在在异步套接字 I/O 量非常大时发生重复的对象分配和同步.使用此类执行异步套接字操作的模式包含以下步骤:1.分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象.2.将该上下文对象的属性设置为要执行的操作(例如,完成回调方法.数据缓冲区.缓冲区偏移量以及要传

C#高性能大容量SOCKET并发(八):通讯协议

协议种类 开发Socket程序有两种协议类型,一种是用文本描述的,类似HTTP协议,定义字符集,好处是兼容性和调试方便,缺点是解析文本会损耗一些性能:一种是用Code加结构体,定义字节顺序,好处是性能高,缺点是兼容性和调试不方便.这个可以根据应用场景灵活选择,如果您的应用相对稳定,需求变化少,性能要求高,则可以使用Code加结构体的方式.如果您的应用需要不停的扩充功能,但是对性能要求不苛刻,则可以使用文本解析的方式.这两种协议有两个比较典型的应用场景,Code加结构体更多应用在中间件上,因为协议

java架构师、高性能、高并发、高可用、高可扩展、性能优化、集群、电商网站架构

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to

高性能、高并发网络通信系统的架构设计

1 引言 随着互联网和物联网的高速发展,使用网络的人数和电子设备的数量急剧增长,其也对互联网后台服务程序提出了更高的性能和并发要求.本文的主要目的是阐述如何进行高并发.高性能网络通信系统的架构设计,以及这样的系统的常用技术,但不对其技术细节进行讨论.本篇只起抛砖引玉的之效,如有更好的设计方案和思路,望您共分享之![注:此篇用select来讲解,如想用epoll,设计思路是一致的] 我们首先来看看课本或学习资料上关于处理并发网络编程的三种常用方案,以及对应的大体思路和优缺点,其描述如下所示:  

15套java架构师、集群、高可用、高可扩 展、高性能、高并发、性能优化大型分布 式项目实战视频教程

2017-08-09 * { font-family: "Microsoft YaHei" !important } h1 { color: #FF0 } 15套java架构师.集群.高可用.高可扩 展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布 式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.

java架构师、集群、高可用、高可扩展、高性能、高并发、性能优化

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to

优秀开源项目之三:高性能、高并发、高扩展性和可读性的网络服务器架构State Threads

译文在后面. State Threads for Internet Applications Introduction State Threads is an application library which provides a foundation for writing fast and highly scalable Internet Applications on UNIX-like platforms. It combines the simplicity of the multi