C#高性能Socket服务器的实现(IOCP)
https://www.jianshu.com/p/c65c0eb59f22
引言
我一直在探寻一个高性能的Socket客户端代码。以前,我使用Socket类写了一些基于传统异步编程模型的代码(BeginSend、BeginReceive,等等)也看过很多博客的知识,在linux中有poll和epoll来实现,在windows下面
微软MSDN中也提供了SocketAsyncEventArgs这个类来实现IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
NET Framework中的APM也称为Begin/End模式。这是因为会调用Begin方法来启动异步操作,然后返回一个IAsyncResult 对象。可以选择将一个代理作为参数提供给Begin方法,异步操作完成时会调用该方法。或者,一个线程可以等待 IAsyncResult.AsyncWaitHandle。当回调被调用或发出等待信号时,就会调用End方法来获取异步操作的结果。这种模式很灵活,使用相对简单,在 .NET Framework 中非常常见。
但是,您必须注意,如果进行大量异步套接字操作,是要付出代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响性能。为了解决这个问题,新版本提供了另一个使用套接字上执行异步I/O的方法模式。这种新模式并不要求为每个套接字操作分配操作上下文对象。
代码下载:http://download.csdn.net/detail/zhujunxxxxx/8431289这里的代码优化了的
目标
在上面微软提供的例子我觉得不是很完整,没有具体一个流程,只是受到客户端消息后发送相同内容给客户端,初学者不容易看懂流程,因为我花了一天的时间来实现一个功能齐全的IOCP服务器,
效果如下
代码
首先是ICOPServer.cs 这个类是IOCP服务器的核心类,目前这个类是网络上比较全的代码,MSDN上面的例子都没有我的全
using?System;??
using?System.Collections.Generic;??
using?System.Linq;??
using?System.Text;??
using?System.Net.Sockets;??
using?System.Net;??
using?System.Threading;??
namespace?ServerTest??
{??
///???
///?IOCP?SOCKET服务器??
///???
public?class?IOCPServer?:?IDisposable??
????{??
const?int?opsToPreAlloc?=?2;??
????????#region?Fields??
///???
///?服务器程序允许的最大客户端连接数??
///???
private?int?_maxClient;??
///???
///?监听Socket,用于接受客户端的连接请求??
///???
private?Socket?_serverSock;??
///???
///?当前的连接的客户端数??
///???
private?int?_clientCount;??
///???
///?用于每个I/O?Socket操作的缓冲区大小??
///???
private?int?_bufferSize?=?1024;??
///???
///?信号量??
///???
????????Semaphore?_maxAcceptedClients;??
///???
///?缓冲区管理??
///???
????????BufferManager?_bufferManager;??
///???
///?对象池??
///???
????????SocketAsyncEventArgsPool?_objectPool;??
private?bool?disposed?=?false;??
????????#endregion??
????????#region?Properties??
///???
///?服务器是否正在运行??
///???
public?bool?IsRunning?{?get;?private?set;?}??
///???
///?监听的IP地址??
///???
public?IPAddress?Address?{?get;?private?set;?}??
///???
///?监听的端口??
///???
public?int?Port?{?get;?private?set;?}??
///???
///?通信使用的编码??
///???
public?Encoding?Encoding?{?get;?set;?}??
????????#endregion??
????????#region?Ctors??
///???
///?异步IOCP?SOCKET服务器??
///???
///?监听的端口??
///?最大的客户端数量??
public?IOCPServer(int?listenPort,int?maxClient)??
:this(IPAddress.Any,?listenPort,?maxClient)??
????????{??
????????}??
///???
///?异步Socket?TCP服务器??
///???
///?监听的终结点??
///?最大客户端数量??
public?IOCPServer(IPEndPoint?localEP,?int?maxClient)??
:this(localEP.Address,?localEP.Port,maxClient)??
????????{??
????????}??
///???
///?异步Socket?TCP服务器??
///???
///?监听的IP地址??
///?监听的端口??
///?最大客户端数量??
public?IOCPServer(IPAddress?localIPAddress,?int?listenPort,?int?maxClient)??
????????{??
this.Address?=?localIPAddress;??
this.Port?=?listenPort;??
this.Encoding?=?Encoding.Default;??
????????????_maxClient?=?maxClient;??
_serverSock?=new?Socket(localIPAddress.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);??
_bufferManager?=new?BufferManager(_bufferSize?*?_maxClient?*?opsToPreAlloc,_bufferSize);??
_objectPool?=new?SocketAsyncEventArgsPool(_maxClient);??
_maxAcceptedClients?=new?Semaphore(_maxClient,?_maxClient);???
????????}??
????????#endregion??
????????#region?初始化??
///???
///?初始化函数??
///???
public?void?Init()??
????????{??
//?Allocates?one?large?byte?buffer?which?all?I/O?operations?use?a?piece?of.??This?gaurds???
//?against?memory?fragmentation??
????????????_bufferManager.InitBuffer();??
//?preallocate?pool?of?SocketAsyncEventArgs?objects??
????????????SocketAsyncEventArgs?readWriteEventArg;??
for?(int?i?=?0;?i?<?_maxClient;?i++)??
????????????{??
//Pre-allocate?a?set?of?reusable?SocketAsyncEventArgs??
readWriteEventArg?=new?SocketAsyncEventArgs();??
readWriteEventArg.Completed?+=new?EventHandler(OnIOCompleted);??
readWriteEventArg.UserToken?=null;??
//?assign?a?byte?buffer?from?the?buffer?pool?to?the?SocketAsyncEventArg?object??
????????????????_bufferManager.SetBuffer(readWriteEventArg);??
//?add?SocketAsyncEventArg?to?the?pool??
????????????????_objectPool.Push(readWriteEventArg);??
????????????}??
????????}??
????????#endregion??
????????#region?Start??
///???
///?启动??
///???
public?void?Start()??
????????{??
if?(!IsRunning)??
????????????{??
????????????????Init();??
IsRunning?=true;??
IPEndPoint?localEndPoint?=new?IPEndPoint(Address,?Port);??
//?创建监听socket??
_serverSock?=new?Socket(localEndPoint.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);??
//_serverSock.ReceiveBufferSize?=?_bufferSize;??
//_serverSock.SendBufferSize?=?_bufferSize;??
if?(localEndPoint.AddressFamily?==?AddressFamily.InterNetworkV6)??
????????????????{??
//?配置监听socket为?dual-mode?(IPv4?&?IPv6)???
//?27?is?equivalent?to?IPV6_V6ONLY?socket?option?in?the?winsock?snippet?below,??
_serverSock.SetSocketOption(SocketOptionLevel.IPv6,?(SocketOptionName)27,false);??
_serverSock.Bind(new?IPEndPoint(IPAddress.IPv6Any,?localEndPoint.Port));??
????????????????}??
else??
????????????????{??
????????????????????_serverSock.Bind(localEndPoint);??
????????????????}??
//?开始监听??
_serverSock.Listen(this._maxClient);??
//?在监听Socket上投递一个接受请求。??
StartAccept(null);??
????????????}??
????????}??
????????#endregion??
????????#region?Stop??
///???
///?停止服务??
///???
public?void?Stop()??
????????{??
if?(IsRunning)??
????????????{??
IsRunning?=false;??
????????????????_serverSock.Close();??
//TODO?关闭对所有客户端的连接??
????????????}??
????????}??
????????#endregion??
????????#region?Accept??
///???
///?从客户端开始接受一个连接操作??
///???
private?void?StartAccept(SocketAsyncEventArgs?asyniar)??
????????{??
if?(asyniar?==?null)??
????????????{??
asyniar?=new?SocketAsyncEventArgs();??
asyniar.Completed?+=new?EventHandler(OnAcceptCompleted);??
????????????}??
else??
????????????{??
//socket?must?be?cleared?since?the?context?object?is?being?reused??
asyniar.AcceptSocket?=null;??
????????????}??
????????????_maxAcceptedClients.WaitOne();??
if?(!_serverSock.AcceptAsync(asyniar))??
????????????{??
????????????????ProcessAccept(asyniar);??
//如果I/O挂起等待异步则触发AcceptAsyn_Asyn_Completed事件??
//此时I/O操作同步完成,不会触发Asyn_Completed事件,所以指定BeginAccept()方法??
????????????}??
????????}??
///???
///?accept?操作完成时回调函数??
///???
///?Object?who?raised?the?event.??
///?SocketAsyncEventArg?associated?with?the?completed?accept?operation.??
private?void?OnAcceptCompleted(object?sender,?SocketAsyncEventArgs?e)??
????????{??
????????????ProcessAccept(e);??
????????}??
///???
///?监听Socket接受处理??
///???
///?SocketAsyncEventArg?associated?with?the?completed?accept?operation.??
private?void?ProcessAccept(SocketAsyncEventArgs?e)??
????????{??
if?(e.SocketError?==?SocketError.Success)??
????????????{??
Socket?s?=?e.AcceptSocket;//和客户端关联的socket??
if?(s.Connected)??
????????????????{??
try??
????????????????????{??
Interlocked.Increment(ref?_clientCount);//原子操作加1??
????????????????????????SocketAsyncEventArgs?asyniar?=?_objectPool.Pop();??
????????????????????????asyniar.UserToken?=?s;??
Log4Debug(String.Format("客户?{0}?连入,?共有?{1}?个连接。",?s.RemoteEndPoint.ToString(),?_clientCount));??
if?(!s.ReceiveAsync(asyniar))//投递接收请求??
????????????????????????{??
????????????????????????????ProcessReceive(asyniar);??
????????????????????????}??
????????????????????}??
catch?(SocketException?ex)??
????????????????????{??
Log4Debug(String.Format("接收客户?{0}?数据出错,?异常信息:?{1}?。",?s.RemoteEndPoint,?ex.ToString()));??
//TODO?异常处理??
????????????????????}??
//投递下一个接受请求??
????????????????????StartAccept(e);??
????????????????}??
????????????}??
????????}??
????????#endregion??
????????#region?发送数据??
///???
///?异步的发送数据??
///???
///???
///???
public?void?Send(SocketAsyncEventArgs?e,?byte[]?data)??
????????{??
if?(e.SocketError?==?SocketError.Success)??
????????????{??
Socket?s?=?e.AcceptSocket;//和客户端关联的socket??
if?(s.Connected)??
????????????????{??
Array.Copy(data,?0,?e.Buffer,?0,?data.Length);//设置发送数据??
//e.SetBuffer(data,?0,?data.Length);?//设置发送数据??
if?(!s.SendAsync(e))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件??
????????????????????{??
//?同步发送时处理发送完成事件??
????????????????????????ProcessSend(e);??
????????????????????}??
else??
????????????????????{??
????????????????????????CloseClientSocket(e);??
????????????????????}??
????????????????}??
????????????}??
????????}??
///???
///?同步的使用socket发送数据??
///???
///???
///???
///???
///???
///???
public?void?Send(Socket?socket,?byte[]?buffer,?int?offset,?int?size,?int?timeout)??
????????{??
????????????socket.SendTimeout?=?0;??
int?startTickCount?=?Environment.TickCount;??
int?sent?=?0;?//?how?many?bytes?is?already?sent??
do??
????????????{??
if?(Environment.TickCount?>?startTickCount?+?timeout)??
????????????????{??
//throw?new?Exception("Timeout.");??
????????????????}??
try??
????????????????{??
????????????????????sent?+=?socket.Send(buffer,?offset?+?sent,?size?-?sent,?SocketFlags.None);??
????????????????}??
catch?(SocketException?ex)??
????????????????{??
if?(ex.SocketErrorCode?==?SocketError.WouldBlock?||??
????????????????????ex.SocketErrorCode?==?SocketError.IOPending?||??
????????????????????ex.SocketErrorCode?==?SocketError.NoBufferSpaceAvailable)??
????????????????????{??
//?socket?buffer?is?probably?full,?wait?and?try?again??
????????????????????????Thread.Sleep(30);??
????????????????????}??
else??
????????????????????{??
throw?ex;?//?any?serious?error?occurr??
????????????????????}??
????????????????}??
}while?(sent?<?size);??
????????}??
///???
///?发送完成时处理函数??
///???
///?与发送完成操作相关联的SocketAsyncEventArg对象??
private?void?ProcessSend(SocketAsyncEventArgs?e)??
????????{??
if?(e.SocketError?==?SocketError.Success)??
????????????{??
????????????????Socket?s?=?(Socket)e.UserToken;??
//TODO??
????????????}??
else??
????????????{??
????????????????CloseClientSocket(e);??
????????????}??
????????}??
????????#endregion??
????????#region?接收数据??
///???
///接收完成时处理函数??
///???
///?与接收完成操作相关联的SocketAsyncEventArg对象??
private?void?ProcessReceive(SocketAsyncEventArgs?e)??
????????{??
if?(e.SocketError?==?SocketError.Success)//if?(e.BytesTransferred?>?0?&&?e.SocketError?==?SocketError.Success)??
????????????{??
//?检查远程主机是否关闭连接??
if?(e.BytesTransferred?>?0)??
????????????????{??
????????????????????Socket?s?=?(Socket)e.UserToken;??
//判断所有需接收的数据是否已经完成??
if?(s.Available?==?0)??
????????????????????{??
//从侦听者获取接收到的消息。???
//String?received?=?Encoding.ASCII.GetString(e.Buffer,?e.Offset,?e.BytesTransferred);??
//echo?the?data?received?back?to?the?client??
//e.SetBuffer(e.Offset,?e.BytesTransferred);??
byte[]?data?=?new?byte[e.BytesTransferred];??
Array.Copy(e.Buffer,?e.Offset,?data,?0,?data.Length);//从e.Buffer块中复制数据出来,保证它可重用??
string?info=Encoding.Default.GetString(data);??
Log4Debug(String.Format("收到?{0}?数据为?{1}",s.RemoteEndPoint.ToString(),info));??
//TODO?处理数据??
//增加服务器接收的总字节数。??
????????????????????}??
if?(!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件??
????????????????????{??
//同步接收时处理接收完成事件??
????????????????????????ProcessReceive(e);??
????????????????????}??
????????????????}??
????????????}??
else??
????????????{??
????????????????CloseClientSocket(e);??
????????????}??
????????}??
????????#endregion??
????????#region?回调函数??
///???
///?当Socket上的发送或接收请求被完成时,调用此函数??
///???
///?激发事件的对象??
///?与发送或接收完成操作相关联的SocketAsyncEventArg对象??
private?void?OnIOCompleted(object?sender,?SocketAsyncEventArgs?e)??
????????{??
//?Determine?which?type?of?operation?just?completed?and?call?the?associated?handler.??
switch?(e.LastOperation)??
????????????{??
case?SocketAsyncOperation.Accept:??
????????????????????ProcessAccept(e);??
break;??
case?SocketAsyncOperation.Receive:??
????????????????????ProcessReceive(e);??
break;??
default:??
throw?new?ArgumentException("The?last?operation?completed?on?the?socket?was?not?a?receive?or?send");??
????????????}??
????????}??
????????#endregion??
????????#region?Close??
///???
///?关闭socket连接??
///???
///?SocketAsyncEventArg?associated?with?the?completed?send/receive?operation.??
private?void?CloseClientSocket(SocketAsyncEventArgs?e)??
????????{??
Log4Debug(String.Format("客户?{0}?断开连接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));??
Socket?s?=?e.UserTokenas?Socket;??
????????????CloseClientSocket(s,?e);??
????????}??
///???
///?关闭socket连接??
///???
///???
///???
private?void?CloseClientSocket(Socket?s,?SocketAsyncEventArgs?e)??
????????{??
try??
????????????{??
????????????????s.Shutdown(SocketShutdown.Send);??
????????????}??
catch?(Exception)??
????????????{??
//?Throw?if?client?has?closed,?so?it?is?not?necessary?to?catch.??
????????????}??
finally??
????????????{??
????????????????s.Close();??
????????????}??
Interlocked.Decrement(ref?_clientCount);??
????????????_maxAcceptedClients.Release();??
_objectPool.Push(e);//SocketAsyncEventArg?对象被释放,压入可重用队列。??
????????}??
????????#endregion??
????????#region?Dispose??
///???
///?Performs?application-defined?tasks?associated?with?freeing,???
///?releasing,?or?resetting?unmanaged?resources.??
///???
public?void?Dispose()??
????????{??
Dispose(true);??
GC.SuppressFinalize(this);??
????????}??
///???
///?Releases?unmanaged?and?-?optionally?-?managed?resources??
///???
///?true?to?release???
///?both?managed?and?unmanaged?resources;?false???
///?to?release?only?unmanaged?resources.??
protected?virtual?void?Dispose(bool?disposing)??
????????{??
if?(!this.disposed)??
????????????{??
if?(disposing)??
????????????????{??
try??
????????????????????{??
????????????????????????Stop();??
if?(_serverSock?!=?null)??
????????????????????????{??
_serverSock?=null;??
????????????????????????}??
????????????????????}??
catch?(SocketException?ex)??
????????????????????{??
//TODO?事件??
????????????????????}??
????????????????}??
disposed?=true;??
????????????}??
????????}??
????????#endregion??
public?void?Log4Debug(string?msg)??
????????{??
Console.WriteLine("notice:"+msg);??
????????}??
????}??
}
原文地址:https://www.cnblogs.com/Leo_wl/p/9920851.html