Winsock IO模型之IOCP模型

http://blog.csdn.net/lostyears/article/details/7436802

Windows平台上伸缩性最好的一种I/O模型莫过IOCP了,不过设计和实现起来比较复杂一些。针对成千上万个套接字的并发处理,IOCP采用了线程池+队列+重叠结构的内核机制完成任务。需要说明的是IOCP其实不仅可以接受套接字对象句柄,还可以接受文件对象句柄等。

为避免线程上下文切换,它采用了线程池。除此之外,在基于事件模型或重叠模型中不得不遇到WSAWaitForMultipleEvent的WSA_MAXIMUM_WAIT_EVENTS的限制,由此必须由自己来设计线程池来避开这种限制。而IOCP则由系统来实现线程池,对用户来说透明,由此减少了设计线程池带来的复杂性并提高了安全性。看看GetQueuedCompletionStatus这个函数,它包含了封包队列排队机制以及基于事件的重叠结构OVERLAPPED的通知,用户无需再去设计队列和OVERLAPPED结构中显示地创建事件。

IOCP一般被用于Windows平台上大规模服务器程序的设计。所以设计起来有很多地方需要注意。因为IOCP设计中需要用到很多异步操作,所以对到来的数据包的排序是需要额外处理的,简单和常用的方法是为每个封包添加一个序列号。接受方每接受一个封包就需要判断其是否为当前需要读取的下一个序列号封包,如果不是就要将之插入封包队列,如果是则返回给用户。当关闭IOCP时,我们要避免重叠操作正在进行的时候却要释放它的OVERLAPPED结构,阻止其发生的最好方法是在每个套接字句柄上调用closesocket,如此所有的未决重叠IO操作都会完成。一旦所有的套接字句柄关闭,就该终止IOCP上处理IO的工作线程了。这可以通过PostQueuedCompletionStatus来操作。在接受连接方面,投递多少连接是个需要认真考虑的问题,因为每创建一个套接字都会占用系统不小的开销,在Win2000及以后版本中我们可以通过在创建监听套接字时采用WSAEventSelect,注册FD_ACCEPT通知消息的方式,在投递的AcceptEx用完时但仍有客户连接时,事件对象将会受信,通知我们继续投递。另外,我们可以对每个客户套接字采用定时轮询的方法查询其连接时间长短,以此我们可以判断哪些客户连接有恶意的迹象(只连接服务器不发送任何数据)。

采用书上的一个例子,其中增加Keeplive机制,并修订了其中的几个错误:

[cpp] view plaincopyprint?

  1. ////////////////////////////////////////
  2. // IOCP.h文件
  3. #ifndef __IOCP_H__
  4. #define __IOCP_H__
  5. #include <winsock2.h>
  6. #include <windows.h>
  7. #include <Mswsock.h>
  8. #define BUFFER_SIZE 1024*4  // I/O请求的缓冲区大小
  9. #define MAX_THREAD  2       // I/O服务线程的数量
  10. // 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
  11. struct CIOCPBuffer
  12. {
  13. WSAOVERLAPPED ol;
  14. SOCKET sClient;         // AcceptEx接收的客户方套节字
  15. char *buff;             // I/O操作使用的缓冲区
  16. int nLen;               // buff缓冲区(使用的)大小
  17. ULONG nSequenceNumber;  // 此I/O的序列号
  18. int nOperation;         // 操作类型
  19. #define OP_ACCEPT   1
  20. #define OP_WRITE    2
  21. #define OP_READ     3
  22. CIOCPBuffer *pNext;
  23. };
  24. // 这是per-Handle数据。它包含了一个套节字的信息
  25. struct CIOCPContext
  26. {
  27. SOCKET s;                       // 套节字句柄
  28. SOCKADDR_IN addrLocal;          // 连接的本地地址
  29. SOCKADDR_IN addrRemote;         // 连接的远程地址
  30. BOOL bClosing;                  // 套节字是否关闭
  31. int nOutstandingRecv;           // 此套节字上抛出的重叠操作的数量
  32. int nOutstandingSend;
  33. ULONG nReadSequence;            // 安排给接收的下一个序列号
  34. ULONG nCurrentReadSequence;     // 当前要读的序列号
  35. CIOCPBuffer *pOutOfOrderReads;  // 记录没有按顺序完成的读I/O
  36. CRITICAL_SECTION Lock;          // 保护这个结构
  37. bool bNotifyCloseOrError;       // [2009.8.22 add Lostyears][当套接字关闭或出错时是否已通知过]
  38. CIOCPContext *pNext;
  39. };
  40. class CIOCPServer   // 处理线程
  41. {
  42. public:
  43. CIOCPServer();
  44. ~CIOCPServer();
  45. // 开始服务
  46. BOOL Start(int nPort = 4567, int nMaxConnections = 2000,
  47. int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
  48. // 停止服务
  49. void Shutdown();
  50. // 关闭一个连接和关闭所有连接
  51. void CloseAConnection(CIOCPContext *pContext);
  52. void CloseAllConnections();
  53. // 取得当前的连接数量
  54. ULONG GetCurrentConnection() { return m_nCurrentConnection; }
  55. // 向指定客户发送文本
  56. BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);
  57. protected:
  58. // 申请和释放缓冲区对象
  59. CIOCPBuffer *AllocateBuffer(int nLen);
  60. void ReleaseBuffer(CIOCPBuffer *pBuffer);
  61. // 申请和释放套节字上下文
  62. CIOCPContext *AllocateContext(SOCKET s);
  63. void ReleaseContext(CIOCPContext *pContext);
  64. // 释放空闲缓冲区对象列表和空闲上下文对象列表
  65. void FreeBuffers();
  66. void FreeContexts();
  67. // 向连接列表中添加一个连接
  68. BOOL AddAConnection(CIOCPContext *pContext);
  69. // 插入和移除未决的接受请求
  70. BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
  71. BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);
  72. // 取得下一个要读取的
  73. CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  74. // 投递接受I/O、发送I/O、接收I/O
  75. BOOL PostAccept(CIOCPBuffer *pBuffer);
  76. BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  77. BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  78. void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);
  79. // [2009.8.22 add Lostyears]
  80. // 当套件字关闭或出错时通知
  81. void NotifyConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  82. void NotifyConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
  83. // 事件通知函数
  84. // 建立了一个新的连接
  85. virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  86. // 一个连接关闭
  87. virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  88. // 在一个连接上发生了错误
  89. virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
  90. // 一个连接上的读操作完成
  91. virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  92. // 一个连接上的写操作完成
  93. virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
  94. protected:
  95. // 记录空闲结构信息
  96. CIOCPBuffer *m_pFreeBufferList;
  97. CIOCPContext *m_pFreeContextList;
  98. int m_nFreeBufferCount;
  99. int m_nFreeContextCount;
  100. CRITICAL_SECTION m_FreeBufferListLock;
  101. CRITICAL_SECTION m_FreeContextListLock;
  102. // 记录抛出的Accept请求
  103. CIOCPBuffer *m_pPendingAccepts;   // 抛出请求列表。
  104. long m_nPendingAcceptCount;
  105. CRITICAL_SECTION m_PendingAcceptsLock;
  106. // 记录连接列表
  107. CIOCPContext *m_pConnectionList;
  108. int m_nCurrentConnection;
  109. CRITICAL_SECTION m_ConnectionListLock;
  110. // 用于投递Accept请求
  111. HANDLE m_hAcceptEvent;
  112. HANDLE m_hRepostEvent;
  113. LONG m_nRepostCount;
  114. int m_nPort;                // 服务器监听的端口
  115. int m_nInitialAccepts;      // 开始时抛出的异步接收投递数
  116. int m_nInitialReads;
  117. int m_nMaxAccepts;          // 抛出的异步接收投递数最大值
  118. int m_nMaxSends;            // 抛出的异步发送投递数最大值(跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作)
  119. int m_nMaxFreeBuffers;      // 内存池中容纳的最大内存块数(超过该数将在物理上释放内存池)
  120. int m_nMaxFreeContexts;     // 上下文[套接字信息]内存池中容纳的最大上下文内存块数(超过该数将在物理上释放上下文内存池)
  121. int m_nMaxConnections;      // 最大连接数
  122. HANDLE m_hListenThread;         // 监听线程
  123. HANDLE m_hCompletion;           // 完成端口句柄
  124. SOCKET m_sListen;               // 监听套节字句柄
  125. LPFN_ACCEPTEX m_lpfnAcceptEx;   // AcceptEx函数地址
  126. LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址
  127. BOOL m_bShutDown;       // 用于通知监听线程退出
  128. BOOL m_bServerStarted;  // 记录服务是否启动
  129. CRITICAL_SECTION m_CloseOrErrLock;  // [2009.9.1 add Lostyears]
  130. private:    // 线程函数
  131. static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
  132. static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
  133. };
  134. #endif // __IOCP_H__
  135. //////////////////////////////////////////////////
  136. // IOCP.cpp文件
  137. #include "iocp.h"
  138. #pragma comment(lib, "WS2_32.lib")
  139. #include <stdio.h>
  140. #include <mstcpip.h>
  141. CIOCPServer::CIOCPServer()
  142. {
  143. // 列表
  144. m_pFreeBufferList = NULL;
  145. m_pFreeContextList = NULL;
  146. m_pPendingAccepts = NULL;
  147. m_pConnectionList = NULL;
  148. m_nFreeBufferCount = 0;
  149. m_nFreeContextCount = 0;
  150. m_nPendingAcceptCount = 0;
  151. m_nCurrentConnection = 0;
  152. ::InitializeCriticalSection(&m_FreeBufferListLock);
  153. ::InitializeCriticalSection(&m_FreeContextListLock);
  154. ::InitializeCriticalSection(&m_PendingAcceptsLock);
  155. ::InitializeCriticalSection(&m_ConnectionListLock);
  156. ::InitializeCriticalSection(&m_CloseOrErrLock); // [2009.9.1 add Lostyears]
  157. // Accept请求
  158. m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
  159. m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
  160. m_nRepostCount = 0;
  161. m_nPort = 4567;
  162. m_nInitialAccepts = 10;
  163. m_nInitialReads = 4;
  164. m_nMaxAccepts = 100;
  165. m_nMaxSends = 20;
  166. m_nMaxFreeBuffers = 200;
  167. m_nMaxFreeContexts = 100;
  168. m_nMaxConnections = 2000;
  169. m_hListenThread = NULL;
  170. m_hCompletion = NULL;
  171. m_sListen = INVALID_SOCKET;
  172. m_lpfnAcceptEx = NULL;
  173. m_lpfnGetAcceptExSockaddrs = NULL;
  174. m_bShutDown = FALSE;
  175. m_bServerStarted = FALSE;
  176. // 初始化WS2_32.dll
  177. WSADATA wsaData;
  178. WORD sockVersion = MAKEWORD(2, 2);
  179. ::WSAStartup(sockVersion, &wsaData);
  180. }
  181. CIOCPServer::~CIOCPServer()
  182. {
  183. Shutdown();
  184. if(m_sListen != INVALID_SOCKET)
  185. ::closesocket(m_sListen);
  186. if(m_hListenThread != NULL)
  187. ::CloseHandle(m_hListenThread);
  188. ::CloseHandle(m_hRepostEvent);
  189. ::CloseHandle(m_hAcceptEvent);
  190. ::DeleteCriticalSection(&m_FreeBufferListLock);
  191. ::DeleteCriticalSection(&m_FreeContextListLock);
  192. ::DeleteCriticalSection(&m_PendingAcceptsLock);
  193. ::DeleteCriticalSection(&m_ConnectionListLock);
  194. ::DeleteCriticalSection(&m_CloseOrErrLock); // [2009.9.1 add Lostyears]
  195. ::WSACleanup();
  196. }
  197. ///////////////////////////////////
  198. // 自定义帮助函数
  199. CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
  200. {
  201. CIOCPBuffer *pBuffer = NULL;
  202. if(nLen > BUFFER_SIZE)
  203. return NULL;
  204. // 为缓冲区对象申请内存
  205. ::EnterCriticalSection(&m_FreeBufferListLock);
  206. if(m_pFreeBufferList == NULL)  // 内存池为空,申请新的内存
  207. {
  208. pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
  209. HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
  210. }
  211. else    // 从内存池中取一块来使用
  212. {
  213. pBuffer = m_pFreeBufferList;
  214. m_pFreeBufferList = m_pFreeBufferList->pNext;
  215. pBuffer->pNext = NULL;
  216. m_nFreeBufferCount --;
  217. }
  218. ::LeaveCriticalSection(&m_FreeBufferListLock);
  219. // 初始化新的缓冲区对象
  220. if(pBuffer != NULL)
  221. {
  222. pBuffer->buff = (char*)(pBuffer + 1);
  223. pBuffer->nLen = nLen;
  224. //::ZeroMemory(pBuffer->buff, pBuffer->nLen);
  225. }
  226. return pBuffer;
  227. }
  228. void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
  229. {
  230. ::EnterCriticalSection(&m_FreeBufferListLock);
  231. if(m_nFreeBufferCount < m_nMaxFreeBuffers)   // 将要释放的内存添加到空闲列表中 [2010.5.15 mod Lostyears]old:m_nFreeBufferCount <= m_nMaxFreeBuffers
  232. {
  233. memset(pBuffer, 0, sizeof(CIOCPBuffer) + BUFFER_SIZE);
  234. pBuffer->pNext = m_pFreeBufferList;
  235. m_pFreeBufferList = pBuffer;
  236. m_nFreeBufferCount ++ ;
  237. }
  238. else            // 已经达到最大值,真正的释放内存
  239. {
  240. ::HeapFree(::GetProcessHeap(), 0, pBuffer);
  241. }
  242. ::LeaveCriticalSection(&m_FreeBufferListLock);
  243. }
  244. CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
  245. {
  246. CIOCPContext *pContext;
  247. // 申请一个CIOCPContext对象
  248. ::EnterCriticalSection(&m_FreeContextListLock);
  249. if(m_pFreeContextList == NULL)
  250. {
  251. pContext = (CIOCPContext *)
  252. ::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
  253. ::InitializeCriticalSection(&pContext->Lock);
  254. }
  255. else
  256. {
  257. // 在空闲列表中申请
  258. pContext = m_pFreeContextList;
  259. m_pFreeContextList = m_pFreeContextList->pNext;
  260. pContext->pNext = NULL;
  261. m_nFreeContextCount --; // [2009.8.9 mod Lostyears][old: m_nFreeBufferCount--]
  262. }
  263. ::LeaveCriticalSection(&m_FreeContextListLock);
  264. // 初始化对象成员
  265. if(pContext != NULL)
  266. {
  267. pContext->s = s;
  268. // [2009.8.22 add Lostyears]
  269. pContext->bNotifyCloseOrError = false;
  270. }
  271. return pContext;
  272. }
  273. void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
  274. {
  275. if(pContext->s != INVALID_SOCKET)
  276. ::closesocket(pContext->s);
  277. // 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
  278. CIOCPBuffer *pNext;
  279. while(pContext->pOutOfOrderReads != NULL)
  280. {
  281. pNext = pContext->pOutOfOrderReads->pNext;
  282. ReleaseBuffer(pContext->pOutOfOrderReads);
  283. pContext->pOutOfOrderReads = pNext;
  284. }
  285. ::EnterCriticalSection(&m_FreeContextListLock);
  286. if(m_nFreeContextCount < m_nMaxFreeContexts) // 添加到空闲列表 [2010.4.10 mod Lostyears][old: m_nFreeContextCount <= m_nMaxFreeContexts]如果m_nFreeContextCount==m_nMaxFreeContexts时,会在下一次导致m_nFreeContextCount>m_nMaxFreeContexts
  287. {
  288. // 先将关键代码段变量保存到一个临时变量中
  289. CRITICAL_SECTION cstmp = pContext->Lock;
  290. // 将要释放的上下文对象初始化为0
  291. memset(pContext, 0, sizeof(CIOCPContext));
  292. // 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
  293. pContext->Lock = cstmp;
  294. pContext->pNext = m_pFreeContextList;
  295. m_pFreeContextList = pContext;
  296. // 更新计数
  297. m_nFreeContextCount ++;
  298. }
  299. else // 已经达到最大值,真正地释放
  300. {
  301. ::DeleteCriticalSection(&pContext->Lock);
  302. ::HeapFree(::GetProcessHeap(), 0, pContext);
  303. pContext = NULL;
  304. }
  305. ::LeaveCriticalSection(&m_FreeContextListLock);
  306. }
  307. void CIOCPServer::FreeBuffers()
  308. {
  309. // 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
  310. ::EnterCriticalSection(&m_FreeBufferListLock);
  311. CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
  312. CIOCPBuffer *pNextBuffer;
  313. while(pFreeBuffer != NULL)
  314. {
  315. pNextBuffer = pFreeBuffer->pNext;
  316. if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
  317. {
  318. #ifdef _DEBUG
  319. ::OutputDebugString("  FreeBuffers释放内存出错!");
  320. #endif // _DEBUG
  321. break;
  322. }
  323. pFreeBuffer = pNextBuffer;
  324. }
  325. m_pFreeBufferList = NULL;
  326. m_nFreeBufferCount = 0;
  327. ::LeaveCriticalSection(&m_FreeBufferListLock);
  328. }
  329. void CIOCPServer::FreeContexts()
  330. {
  331. // 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
  332. ::EnterCriticalSection(&m_FreeContextListLock);
  333. CIOCPContext *pFreeContext = m_pFreeContextList;
  334. CIOCPContext *pNextContext;
  335. while(pFreeContext != NULL)
  336. {
  337. pNextContext = pFreeContext->pNext;
  338. ::DeleteCriticalSection(&pFreeContext->Lock);
  339. if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
  340. {
  341. #ifdef _DEBUG
  342. ::OutputDebugString("  FreeBuffers释放内存出错!");
  343. #endif // _DEBUG
  344. break;
  345. }
  346. pFreeContext = pNextContext;
  347. }
  348. m_pFreeContextList = NULL;
  349. m_nFreeContextCount = 0;
  350. ::LeaveCriticalSection(&m_FreeContextListLock);
  351. }
  352. BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
  353. {
  354. // 向客户连接列表添加一个CIOCPContext对象
  355. ::EnterCriticalSection(&m_ConnectionListLock);
  356. if(m_nCurrentConnection < m_nMaxConnections)
  357. {
  358. // 添加到表头
  359. pContext->pNext = m_pConnectionList;
  360. m_pConnectionList = pContext;
  361. // 更新计数
  362. m_nCurrentConnection ++;
  363. ::LeaveCriticalSection(&m_ConnectionListLock);
  364. return TRUE;
  365. }
  366. ::LeaveCriticalSection(&m_ConnectionListLock);
  367. return FALSE;
  368. }
  369. void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
  370. {
  371. // 首先从列表中移除要关闭的连接
  372. ::EnterCriticalSection(&m_ConnectionListLock);
  373. CIOCPContext* pTest = m_pConnectionList;
  374. if(pTest == pContext)
  375. {
  376. m_pConnectionList =  pTest->pNext; // [2009.8.9 mod Lostyears][old: m_pConnectionList =  pContext->pNext]
  377. m_nCurrentConnection --;
  378. }
  379. else
  380. {
  381. while(pTest != NULL && pTest->pNext !=  pContext)
  382. pTest = pTest->pNext;
  383. if(pTest != NULL)
  384. {
  385. pTest->pNext =  pContext->pNext;
  386. m_nCurrentConnection --;
  387. }
  388. }
  389. ::LeaveCriticalSection(&m_ConnectionListLock);
  390. // 然后关闭客户套节字
  391. ::EnterCriticalSection(&pContext->Lock);
  392. if(pContext->s != INVALID_SOCKET)
  393. {
  394. ::closesocket(pContext->s);
  395. pContext->s = INVALID_SOCKET;
  396. }
  397. pContext->bClosing = TRUE;
  398. ::LeaveCriticalSection(&pContext->Lock);
  399. }
  400. void CIOCPServer::CloseAllConnections()
  401. {
  402. // 遍历整个连接列表,关闭所有的客户套节字
  403. ::EnterCriticalSection(&m_ConnectionListLock);
  404. CIOCPContext *pContext = m_pConnectionList;
  405. while(pContext != NULL)
  406. {
  407. ::EnterCriticalSection(&pContext->Lock);
  408. if(pContext->s != INVALID_SOCKET)
  409. {
  410. ::closesocket(pContext->s);
  411. pContext->s = INVALID_SOCKET;
  412. }
  413. pContext->bClosing = TRUE;
  414. ::LeaveCriticalSection(&pContext->Lock);
  415. pContext = pContext->pNext;
  416. }
  417. m_pConnectionList = NULL;
  418. m_nCurrentConnection = 0;
  419. ::LeaveCriticalSection(&m_ConnectionListLock);
  420. }
  421. BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
  422. {
  423. // 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
  424. ::EnterCriticalSection(&m_PendingAcceptsLock);
  425. if(m_pPendingAccepts == NULL)
  426. m_pPendingAccepts = pBuffer;
  427. else
  428. {
  429. pBuffer->pNext = m_pPendingAccepts;
  430. m_pPendingAccepts = pBuffer;
  431. }
  432. m_nPendingAcceptCount ++;
  433. ::LeaveCriticalSection(&m_PendingAcceptsLock);
  434. return TRUE;
  435. }
  436. BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
  437. {
  438. BOOL bResult = FALSE;
  439. // 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
  440. ::EnterCriticalSection(&m_PendingAcceptsLock);
  441. CIOCPBuffer *pTest = m_pPendingAccepts;
  442. if(pTest == pBuffer)    // 如果是表头元素
  443. {
  444. m_pPendingAccepts = pTest->pNext; // [2009.8.9 mod Lostyears][old: m_pPendingAccepts = pBuffer->pNext]
  445. bResult = TRUE;
  446. }
  447. else                    // 不是表头元素的话,就要遍历这个表来查找了
  448. {
  449. while(pTest != NULL && pTest->pNext != pBuffer)
  450. pTest = pTest->pNext;
  451. if(pTest != NULL)
  452. {
  453. pTest->pNext = pBuffer->pNext;
  454. bResult = TRUE;
  455. }
  456. }
  457. // 更新计数
  458. if(bResult)
  459. m_nPendingAcceptCount --;
  460. ::LeaveCriticalSection(&m_PendingAcceptsLock);
  461. return  bResult;
  462. }
  463. CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  464. {
  465. if(pBuffer != NULL)
  466. {
  467. // 如果与要读的下一个序列号相等,则读这块缓冲区
  468. if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
  469. {
  470. return pBuffer;
  471. }
  472. // 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
  473. // 列表中的缓冲区是按照其序列号从小到大的顺序排列的
  474. pBuffer->pNext = NULL;
  475. CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
  476. CIOCPBuffer *pPre = NULL;
  477. while(ptr != NULL)
  478. {
  479. if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
  480. break;
  481. pPre = ptr;
  482. ptr = ptr->pNext;
  483. }
  484. if(pPre == NULL) // 应该插入到表头
  485. {
  486. pBuffer->pNext = pContext->pOutOfOrderReads;
  487. pContext->pOutOfOrderReads = pBuffer;
  488. }
  489. else            // 应该插入到表的中间
  490. {
  491. pBuffer->pNext = pPre->pNext;
  492. pPre->pNext = pBuffer; // [2009.8.9 mod Lostyears][old: pPre->pNext = pBuffer->pNext]
  493. }
  494. }
  495. // 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
  496. CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
  497. if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
  498. {
  499. pContext->pOutOfOrderReads = ptr->pNext;
  500. return ptr;
  501. }
  502. return NULL;
  503. }
  504. BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)  // 在监听套节字上投递Accept请求
  505. {
  506. // 设置I/O类型
  507. pBuffer->nOperation = OP_ACCEPT;
  508. // 投递此重叠I/O
  509. DWORD dwBytes;
  510. pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  511. BOOL b = m_lpfnAcceptEx(m_sListen,
  512. pBuffer->sClient,
  513. pBuffer->buff,
  514. pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2), // [2010.5.16 bak Lostyears]如果这里为0, 表示不等待接收数据而通知, 如果这里改为0, 则GetAcceptExSockaddrs函数中的相应参数也得相应改
  515. sizeof(sockaddr_in) + 16,
  516. sizeof(sockaddr_in) + 16,
  517. &dwBytes,
  518. &pBuffer->ol);
  519. if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
  520. {
  521. return FALSE;
  522. }
  523. return TRUE;
  524. }
  525. BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  526. {
  527. // 设置I/O类型
  528. pBuffer->nOperation = OP_READ;
  529. ::EnterCriticalSection(&pContext->Lock);
  530. // 设置序列号
  531. pBuffer->nSequenceNumber = pContext->nReadSequence;
  532. // 投递此重叠I/O
  533. DWORD dwBytes;
  534. DWORD dwFlags = 0;
  535. WSABUF buf;
  536. buf.buf = pBuffer->buff;
  537. buf.len = pBuffer->nLen;
  538. if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
  539. {
  540. if(::WSAGetLastError() != WSA_IO_PENDING)
  541. {
  542. ::LeaveCriticalSection(&pContext->Lock);
  543. return FALSE;
  544. }
  545. }
  546. // 增加套节字上的重叠I/O计数和读序列号计数
  547. pContext->nOutstandingRecv ++;
  548. pContext->nReadSequence ++;
  549. ::LeaveCriticalSection(&pContext->Lock);
  550. return TRUE;
  551. }
  552. BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  553. {
  554. // 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
  555. if(pContext->nOutstandingSend > m_nMaxSends)
  556. return FALSE;
  557. // 设置I/O类型,增加套节字上的重叠I/O计数
  558. pBuffer->nOperation = OP_WRITE;
  559. // 投递此重叠I/O
  560. DWORD dwBytes;
  561. DWORD dwFlags = 0;
  562. WSABUF buf;
  563. buf.buf = pBuffer->buff;
  564. buf.len = pBuffer->nLen;
  565. if(::WSASend(pContext->s,
  566. &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
  567. {
  568. if(::WSAGetLastError() != WSA_IO_PENDING)
  569. return FALSE;
  570. }
  571. // 增加套节字上的重叠I/O计数
  572. ::EnterCriticalSection(&pContext->Lock);
  573. pContext->nOutstandingSend ++;
  574. ::LeaveCriticalSection(&pContext->Lock);
  575. return TRUE;
  576. }
  577. BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
  578. int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
  579. {
  580. // 检查服务是否已经启动
  581. if(m_bServerStarted)
  582. return FALSE;
  583. // 保存用户参数
  584. m_nPort = nPort;
  585. m_nMaxConnections = nMaxConnections;
  586. m_nMaxFreeBuffers = nMaxFreeBuffers;
  587. m_nMaxFreeContexts = nMaxFreeContexts;
  588. m_nInitialReads = nInitialReads;
  589. // 初始化状态变量
  590. m_bShutDown = FALSE;
  591. m_bServerStarted = TRUE;
  592. // 创建监听套节字,绑定到本地端口,进入监听模式
  593. m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  594. SOCKADDR_IN si;
  595. si.sin_family = AF_INET;
  596. si.sin_port = ::ntohs(m_nPort);
  597. si.sin_addr.S_un.S_addr = INADDR_ANY;
  598. if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
  599. {
  600. m_bServerStarted = FALSE;
  601. return FALSE;
  602. }
  603. ::listen(m_sListen, 200);
  604. // 创建完成端口对象
  605. m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
  606. // 加载扩展函数AcceptEx
  607. GUID GuidAcceptEx = WSAID_ACCEPTEX;
  608. DWORD dwBytes;
  609. ::WSAIoctl(m_sListen,
  610. SIO_GET_EXTENSION_FUNCTION_POINTER,
  611. &GuidAcceptEx,
  612. sizeof(GuidAcceptEx),
  613. &m_lpfnAcceptEx,
  614. sizeof(m_lpfnAcceptEx),
  615. &dwBytes,
  616. NULL,
  617. NULL);
  618. // 加载扩展函数GetAcceptExSockaddrs
  619. GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
  620. ::WSAIoctl(m_sListen,
  621. SIO_GET_EXTENSION_FUNCTION_POINTER,
  622. &GuidGetAcceptExSockaddrs,
  623. sizeof(GuidGetAcceptExSockaddrs),
  624. &m_lpfnGetAcceptExSockaddrs,
  625. sizeof(m_lpfnGetAcceptExSockaddrs),
  626. &dwBytes,
  627. NULL,
  628. NULL
  629. );
  630. // 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
  631. ::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);
  632. // 注册FD_ACCEPT事件。
  633. // 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
  634. WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);
  635. // 创建监听线程
  636. m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);
  637. return TRUE;
  638. }
  639. void CIOCPServer::Shutdown()
  640. {
  641. if(!m_bServerStarted)
  642. return;
  643. // 通知监听线程,马上停止服务
  644. m_bShutDown = TRUE;
  645. ::SetEvent(m_hAcceptEvent);
  646. // 等待监听线程退出
  647. ::WaitForSingleObject(m_hListenThread, INFINITE);
  648. ::CloseHandle(m_hListenThread);
  649. m_hListenThread = NULL;
  650. m_bServerStarted = FALSE;
  651. }
  652. DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
  653. {
  654. CIOCPServer *pThis = (CIOCPServer*)lpParam;
  655. // 先在监听套节字上投递几个Accept I/O
  656. CIOCPBuffer *pBuffer;
  657. for(int i=0; i<pThis->m_nInitialAccepts; i++)
  658. {
  659. pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
  660. if(pBuffer == NULL)
  661. return -1;
  662. pThis->InsertPendingAccept(pBuffer);
  663. pThis->PostAccept(pBuffer);
  664. }
  665. // 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
  666. HANDLE hWaitEvents[2 + MAX_THREAD];
  667. int nEventCount = 0;
  668. hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
  669. hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;
  670. // 创建指定数量的工作线程在完成端口上处理I/O
  671. for(int i=0; i<MAX_THREAD; i++)
  672. {
  673. hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
  674. }
  675. // 下面进入无限循环,处理事件对象数组中的事件
  676. while(TRUE)
  677. {
  678. int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);
  679. // 首先检查是否要停止服务
  680. if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
  681. {
  682. // 关闭所有连接
  683. pThis->CloseAllConnections();
  684. ::Sleep(0);     // 给I/O工作线程一个执行的机会
  685. // 关闭监听套节字
  686. ::closesocket(pThis->m_sListen);
  687. pThis->m_sListen = INVALID_SOCKET;
  688. ::Sleep(0);     // 给I/O工作线程一个执行的机会
  689. // 通知所有I/O处理线程退出
  690. for(int i=2; i<MAX_THREAD + 2; i++)
  691. {
  692. ::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
  693. }
  694. // 等待I/O处理线程退出
  695. ::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);
  696. for(int i=2; i<MAX_THREAD + 2; i++)
  697. {
  698. ::CloseHandle(hWaitEvents[i]);
  699. }
  700. ::CloseHandle(pThis->m_hCompletion);
  701. pThis->FreeBuffers();
  702. pThis->FreeContexts();
  703. ::ExitThread(0);
  704. }
  705. // 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
  706. if(nIndex == WSA_WAIT_TIMEOUT)
  707. {
  708. pBuffer = pThis->m_pPendingAccepts;
  709. while(pBuffer != NULL)
  710. {
  711. int nSeconds;
  712. int nLen = sizeof(nSeconds);
  713. // 取得连接建立的时间
  714. ::getsockopt(pBuffer->sClient,
  715. SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
  716. // 如果超过2分钟客户还不发送初始数据,就让这个客户go away
  717. if(nSeconds != -1 && nSeconds > 2*60)
  718. {
  719. closesocket(pBuffer->sClient);
  720. pBuffer->sClient = INVALID_SOCKET;
  721. }
  722. pBuffer = pBuffer->pNext;
  723. }
  724. }
  725. else
  726. {
  727. nIndex = nIndex - WAIT_OBJECT_0;
  728. WSANETWORKEVENTS ne;
  729. int nLimit=0;
  730. if(nIndex == 0)         // 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
  731. {
  732. ::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
  733. if(ne.lNetworkEvents & FD_ACCEPT)
  734. {
  735. nLimit = 50;  // 增加的个数,这里设为50个
  736. }
  737. }
  738. else if(nIndex == 1)    // 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
  739. {
  740. nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
  741. }
  742. else if(nIndex > 1)      // I/O服务线程退出,说明有错误发生,关闭服务器
  743. {
  744. pThis->m_bShutDown = TRUE;
  745. continue;
  746. }
  747. // 投递nLimit个AcceptEx I/O请求
  748. int i = 0;
  749. while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
  750. {
  751. pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
  752. if(pBuffer != NULL)
  753. {
  754. pThis->InsertPendingAccept(pBuffer);
  755. pThis->PostAccept(pBuffer);
  756. }
  757. }
  758. }
  759. }
  760. return 0;
  761. }
  762. DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
  763. {
  764. #ifdef _DEBUG
  765. ::OutputDebugString("   WorkerThread 启动... \n");
  766. #endif // _DEBUG
  767. CIOCPServer *pThis = (CIOCPServer*)lpParam;
  768. CIOCPBuffer *pBuffer;
  769. DWORD dwKey;
  770. DWORD dwTrans;
  771. LPOVERLAPPED lpol;
  772. while(TRUE)
  773. {
  774. // 在关联到此完成端口的所有套节字上等待I/O完成
  775. BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
  776. &dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);
  777. if(dwTrans == -1) // 用户通知退出
  778. {
  779. #ifdef _DEBUG
  780. ::OutputDebugString("   WorkerThread 退出 \n");
  781. #endif // _DEBUG
  782. ::ExitThread(0);
  783. }
  784. pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol); // [2009.8.9 bak Lostyears][lpol作为CIOCPBuffer的ol成员,由其地址取CIOCPBuffer实例首地址]
  785. int nError = NO_ERROR;
  786. if(!bOK)                        // 在此套节字上有错误发生
  787. {
  788. SOCKET s;
  789. if(pBuffer->nOperation == OP_ACCEPT)
  790. {
  791. s = pThis->m_sListen;
  792. }
  793. else
  794. {
  795. if(dwKey == 0)
  796. break;
  797. s = ((CIOCPContext*)dwKey)->s;
  798. }
  799. DWORD dwFlags = 0;
  800. if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
  801. {
  802. nError = ::WSAGetLastError();
  803. }
  804. }
  805. pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
  806. }
  807. #ifdef _DEBUG
  808. ::OutputDebugString("   WorkerThread 退出 \n");
  809. #endif // _DEBUG
  810. return 0;
  811. }
  812. void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
  813. {
  814. CIOCPContext *pContext = (CIOCPContext *)dwKey;
  815. #ifdef _DEBUG
  816. ::OutputDebugString("   HandleIO... \n");
  817. #endif // _DEBUG
  818. // 1)首先减少套节字上的未决I/O计数
  819. if(pContext != NULL)
  820. {
  821. ::EnterCriticalSection(&pContext->Lock);
  822. if(pBuffer->nOperation == OP_READ)
  823. pContext->nOutstandingRecv --;
  824. else if(pBuffer->nOperation == OP_WRITE)
  825. pContext->nOutstandingSend --;
  826. ::LeaveCriticalSection(&pContext->Lock);
  827. // 2)检查套节字是否已经被我们关闭 [2009.8.9 bak Lostyears][如果关闭则释放剩下的未决IO]
  828. if(pContext->bClosing)
  829. {
  830. #ifdef _DEBUG
  831. ::OutputDebugString("   检查到套节字已经被我们关闭 \n");
  832. #endif // _DEBUG
  833. if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
  834. {
  835. ReleaseContext(pContext);
  836. }
  837. // 释放已关闭套节字的未决I/O
  838. ReleaseBuffer(pBuffer);
  839. return;
  840. }
  841. }
  842. else
  843. {
  844. RemovePendingAccept(pBuffer); // [2009.8.9 bak Lostyears][sListen关联了iocp, 关联时dwKey为0, 所以当有新连接发送数据时会执行到此]
  845. }
  846. // 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
  847. if(nError != NO_ERROR)
  848. {
  849. if(pBuffer->nOperation != OP_ACCEPT)
  850. {
  851. NotifyConnectionError(pContext, pBuffer, nError);
  852. CloseAConnection(pContext);
  853. if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
  854. {
  855. ReleaseContext(pContext);
  856. }
  857. #ifdef _DEBUG
  858. ::OutputDebugString("   检查到客户套节字上发生错误 \n");
  859. #endif // _DEBUG
  860. }
  861. else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
  862. {
  863. // 客户端出错,释放I/O缓冲区
  864. if(pBuffer->sClient != INVALID_SOCKET)
  865. {
  866. ::closesocket(pBuffer->sClient);
  867. pBuffer->sClient = INVALID_SOCKET;
  868. }
  869. #ifdef _DEBUG
  870. ::OutputDebugString("   检查到监听套节字上发生错误 \n");
  871. #endif // _DEBUG
  872. }
  873. ReleaseBuffer(pBuffer);
  874. return;
  875. }
  876. // 开始处理
  877. if(pBuffer->nOperation == OP_ACCEPT)
  878. {
  879. if(dwTrans == 0) // [2010.5.16 bak Lostyears]如果AcceptEx的数据接收缓冲区设为0, 一连接上就会执行到这
  880. {
  881. #ifdef _DEBUG
  882. ::OutputDebugString("   监听套节字上客户端关闭 \n");
  883. #endif // _DEBUG
  884. if(pBuffer->sClient != INVALID_SOCKET)
  885. {
  886. ::closesocket(pBuffer->sClient);
  887. pBuffer->sClient = INVALID_SOCKET;
  888. }
  889. }
  890. else
  891. {
  892. // 为新接受的连接申请客户上下文对象
  893. CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
  894. if(pClient != NULL)
  895. {
  896. if(AddAConnection(pClient))
  897. {
  898. // 取得客户地址
  899. int nLocalLen, nRmoteLen;
  900. LPSOCKADDR pLocalAddr, pRemoteAddr;
  901. m_lpfnGetAcceptExSockaddrs(
  902. pBuffer->buff,
  903. pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2), // [2010.5.16 bak Lostyears]和AcceptEx相应参数对应
  904. sizeof(sockaddr_in) + 16,
  905. sizeof(sockaddr_in) + 16,
  906. (SOCKADDR **)&pLocalAddr,
  907. &nLocalLen,
  908. (SOCKADDR **)&pRemoteAddr,
  909. &nRmoteLen);
  910. memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
  911. memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);
  912. // [2010.1.15 add Lostyears][加入KeepAlive机制]
  913. BOOL bKeepAlive = TRUE;
  914. int nRet = ::setsockopt(pClient->s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));
  915. if (nRet == SOCKET_ERROR)
  916. {
  917. CloseAConnection(pClient);
  918. }
  919. else
  920. {
  921. // 设置KeepAlive参数
  922. tcp_keepalive alive_in  = {0};
  923. tcp_keepalive alive_out = {0};
  924. alive_in.keepalivetime      = 5000; // 开始首次KeepAlive探测前的TCP空闲时间
  925. alive_in.keepaliveinterval  = 1000; // 两次KeepAlive探测间的时间间隔
  926. alive_in.onoff  = TRUE;
  927. unsigned long ulBytesReturn = 0;
  928. nRet = ::WSAIoctl(pClient->s, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),
  929. &alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);
  930. if (nRet == SOCKET_ERROR)
  931. {
  932. CloseAConnection(pClient);
  933. }
  934. else
  935. {
  936. // 关联新连接到完成端口对象
  937. ::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 2);
  938. // 通知用户
  939. pBuffer->nLen = dwTrans;
  940. OnConnectionEstablished(pClient, pBuffer);
  941. // 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放
  942. for(int i=0; i<m_nInitialReads; i++) // [2009.8.21 mod Lostyears][将常量值改为m_nInitialReads]
  943. {
  944. CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
  945. if(p != NULL)
  946. {
  947. if(!PostRecv(pClient, p))
  948. {
  949. CloseAConnection(pClient);
  950. break;
  951. }
  952. }
  953. }
  954. }
  955. }
  956. //// 关联新连接到完成端口对象
  957. //::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);
  958. //
  959. //// 通知用户
  960. //pBuffer->nLen = dwTrans;
  961. //OnConnectionEstablished(pClient, pBuffer);
  962. //
  963. //// 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放
  964. //for(int i=0; i<m_nInitialReads; i++) // [2009.8.22 mod Lostyears][old: i<5]
  965. //{
  966. //  CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
  967. //  if(p != NULL)
  968. //  {
  969. //      if(!PostRecv(pClient, p))
  970. //      {
  971. //          CloseAConnection(pClient);
  972. //          break;
  973. //      }
  974. //  }
  975. //}
  976. }
  977. else    // 连接数量已满,关闭连接
  978. {
  979. CloseAConnection(pClient);
  980. ReleaseContext(pClient);
  981. }
  982. }
  983. else
  984. {
  985. // 资源不足,关闭与客户的连接即可
  986. ::closesocket(pBuffer->sClient);
  987. pBuffer->sClient = INVALID_SOCKET;
  988. }
  989. }
  990. // Accept请求完成,释放I/O缓冲区
  991. ReleaseBuffer(pBuffer);
  992. // 通知监听线程继续再投递一个Accept请求
  993. ::InterlockedIncrement(&m_nRepostCount);
  994. ::SetEvent(m_hRepostEvent);
  995. }
  996. else if(pBuffer->nOperation == OP_READ)
  997. {
  998. if(dwTrans == 0)    // 对方关闭套节字
  999. {
  1000. // 先通知用户
  1001. pBuffer->nLen = 0;
  1002. NotifyConnectionClosing(pContext, pBuffer);
  1003. // 再关闭连接
  1004. CloseAConnection(pContext);
  1005. // 释放客户上下文和缓冲区对象
  1006. if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
  1007. {
  1008. ReleaseContext(pContext);
  1009. }
  1010. ReleaseBuffer(pBuffer);
  1011. }
  1012. else
  1013. {
  1014. pBuffer->nLen = dwTrans;
  1015. // 按照I/O投递的顺序读取接收到的数据
  1016. CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
  1017. while(p != NULL)
  1018. {
  1019. // 通知用户
  1020. OnReadCompleted(pContext, p);
  1021. // 增加要读的序列号的值
  1022. ::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
  1023. // 释放这个已完成的I/O
  1024. ReleaseBuffer(p);
  1025. p = GetNextReadBuffer(pContext, NULL);
  1026. }
  1027. // 继续投递一个新的接收请求
  1028. pBuffer = AllocateBuffer(BUFFER_SIZE);
  1029. if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
  1030. {
  1031. CloseAConnection(pContext);
  1032. }
  1033. }
  1034. }
  1035. else if(pBuffer->nOperation == OP_WRITE)
  1036. {
  1037. if(dwTrans == 0)    // 对方关闭套节字
  1038. {
  1039. // 先通知用户
  1040. pBuffer->nLen = 0;
  1041. NotifyConnectionClosing(pContext, pBuffer);
  1042. // 再关闭连接
  1043. CloseAConnection(pContext);
  1044. // 释放客户上下文和缓冲区对象
  1045. if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
  1046. {
  1047. ReleaseContext(pContext);
  1048. }
  1049. ReleaseBuffer(pBuffer);
  1050. }
  1051. else
  1052. {
  1053. // 写操作完成,通知用户
  1054. pBuffer->nLen = dwTrans;
  1055. OnWriteCompleted(pContext, pBuffer);
  1056. // 释放SendText函数申请的缓冲区
  1057. ReleaseBuffer(pBuffer);
  1058. }
  1059. }
  1060. }
  1061. // 当套件字关闭或出错时通知
  1062. void CIOCPServer::NotifyConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1063. {
  1064. ::EnterCriticalSection(&m_CloseOrErrLock);
  1065. if (!pContext->bNotifyCloseOrError)
  1066. {
  1067. pContext->bNotifyCloseOrError = true;
  1068. OnConnectionClosing(pContext, pBuffer);
  1069. }
  1070. ::LeaveCriticalSection(&m_CloseOrErrLock);
  1071. }
  1072. void CIOCPServer::NotifyConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
  1073. {
  1074. ::EnterCriticalSection(&m_CloseOrErrLock);
  1075. if (!pContext->bNotifyCloseOrError)
  1076. {
  1077. pContext->bNotifyCloseOrError = true;
  1078. OnConnectionError(pContext, pBuffer, nError);
  1079. }
  1080. ::LeaveCriticalSection(&m_CloseOrErrLock);
  1081. }
  1082. BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
  1083. {
  1084. CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
  1085. if(pBuffer != NULL)
  1086. {
  1087. memcpy(pBuffer->buff, pszText, nLen);
  1088. return PostSend(pContext, pBuffer);
  1089. }
  1090. return FALSE;
  1091. }
  1092. void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1093. {
  1094. }
  1095. void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1096. {
  1097. }
  1098. void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1099. {
  1100. }
  1101. void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1102. {
  1103. }
  1104. void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
  1105. {
  1106. }
  1107. ////////////////////////////////////////////////
  1108. // iocpserver.cpp文件
  1109. // CIOCPServer类的测试程序
  1110. #include "iocp.h"
  1111. #include <stdio.h>
  1112. #include <windows.h>
  1113. class CMyServer : public CIOCPServer
  1114. {
  1115. public:
  1116. void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1117. {
  1118. printf("接收到一个新的连接(%d): %s\n",
  1119. GetCurrentConnection(), ::inet_ntoa(pContext->addrRemote.sin_addr));
  1120. printf("接受到一个数据包, 其大小为: %d字节\n", pBuffer->nLen);
  1121. SendText(pContext, pBuffer->buff, pBuffer->nLen);
  1122. }
  1123. void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1124. {
  1125. printf("一个连接关闭\n");
  1126. }
  1127. void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
  1128. {
  1129. printf("一个连接发生错误: %d\n", nError);
  1130. }
  1131. void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1132. {
  1133. printf("接受到一个数据包, 其大小为: %d字节\n", pBuffer->nLen);
  1134. SendText(pContext, pBuffer->buff, pBuffer->nLen);
  1135. }
  1136. void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
  1137. {
  1138. printf("一个数据包发送成功, 其大小为: %d字节\n ", pBuffer->nLen);
  1139. }
  1140. };
  1141. void main()
  1142. {
  1143. CMyServer *pServer = new CMyServer;
  1144. // 开启服务
  1145. if(pServer->Start())
  1146. {
  1147. printf("服务器开启成功...\n");
  1148. }
  1149. else
  1150. {
  1151. printf("服务器开启失败!\n");
  1152. return;
  1153. }
  1154. // 创建事件对象,让ServerShutdown程序能够关闭自己
  1155. HANDLE hEvent = ::CreateEvent(NULL, FALSE, FALSE, "ShutdownEvent");
  1156. ::WaitForSingleObject(hEvent, INFINITE);
  1157. ::CloseHandle(hEvent);
  1158. // 关闭服务
  1159. pServer->Shutdown();
  1160. delete pServer;
  1161. printf("服务器关闭\n ");
  1162. }
////////////////////////////////////////
// IOCP.h文件

#ifndef __IOCP_H__
#define __IOCP_H__

#include <winsock2.h>
#include <windows.h>
#include <Mswsock.h>

#define BUFFER_SIZE 1024*4	// I/O请求的缓冲区大小
#define MAX_THREAD	2		// I/O服务线程的数量

// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
struct CIOCPBuffer
{
	WSAOVERLAPPED ol;

	SOCKET sClient;			// AcceptEx接收的客户方套节字

	char *buff;				// I/O操作使用的缓冲区
	int nLen;				// buff缓冲区(使用的)大小

	ULONG nSequenceNumber;	// 此I/O的序列号

	int nOperation;			// 操作类型
#define OP_ACCEPT	1
#define OP_WRITE	2
#define OP_READ		3

	CIOCPBuffer *pNext;
};

// 这是per-Handle数据。它包含了一个套节字的信息
struct CIOCPContext
{
	SOCKET s;						// 套节字句柄

	SOCKADDR_IN addrLocal;			// 连接的本地地址
	SOCKADDR_IN addrRemote;			// 连接的远程地址

	BOOL bClosing;					// 套节字是否关闭

	int nOutstandingRecv;			// 此套节字上抛出的重叠操作的数量
	int nOutstandingSend;

	ULONG nReadSequence;			// 安排给接收的下一个序列号
	ULONG nCurrentReadSequence;		// 当前要读的序列号
	CIOCPBuffer *pOutOfOrderReads;	// 记录没有按顺序完成的读I/O

	CRITICAL_SECTION Lock;			// 保护这个结构

	bool bNotifyCloseOrError;		// [2009.8.22 add Lostyears][当套接字关闭或出错时是否已通知过]

	CIOCPContext *pNext;
};

class CIOCPServer   // 处理线程
{
public:
	CIOCPServer();
	~CIOCPServer();

	// 开始服务
	BOOL Start(int nPort = 4567, int nMaxConnections = 2000,
			int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
	// 停止服务
	void Shutdown();

	// 关闭一个连接和关闭所有连接
	void CloseAConnection(CIOCPContext *pContext);
	void CloseAllConnections();	

	// 取得当前的连接数量
	ULONG GetCurrentConnection() { return m_nCurrentConnection; }

	// 向指定客户发送文本
	BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen); 

protected:

	// 申请和释放缓冲区对象
	CIOCPBuffer *AllocateBuffer(int nLen);
	void ReleaseBuffer(CIOCPBuffer *pBuffer);

	// 申请和释放套节字上下文
	CIOCPContext *AllocateContext(SOCKET s);
	void ReleaseContext(CIOCPContext *pContext);

	// 释放空闲缓冲区对象列表和空闲上下文对象列表
	void FreeBuffers();
	void FreeContexts();

	// 向连接列表中添加一个连接
	BOOL AddAConnection(CIOCPContext *pContext);

	// 插入和移除未决的接受请求
	BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
	BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);

	// 取得下一个要读取的
	CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

	// 投递接受I/O、发送I/O、接收I/O
	BOOL PostAccept(CIOCPBuffer *pBuffer);
	BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
	BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

	void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);

	// [2009.8.22 add Lostyears]
	// 当套件字关闭或出错时通知
	void NotifyConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
	void NotifyConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);

		// 事件通知函数
	// 建立了一个新的连接
	virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
	// 一个连接关闭
	virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
	// 在一个连接上发生了错误
	virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
	// 一个连接上的读操作完成
	virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
	// 一个连接上的写操作完成
	virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

protected:

	// 记录空闲结构信息
	CIOCPBuffer *m_pFreeBufferList;
	CIOCPContext *m_pFreeContextList;
	int m_nFreeBufferCount;
	int m_nFreeContextCount;
	CRITICAL_SECTION m_FreeBufferListLock;
	CRITICAL_SECTION m_FreeContextListLock;

	// 记录抛出的Accept请求
	CIOCPBuffer *m_pPendingAccepts;   // 抛出请求列表。
	long m_nPendingAcceptCount;
	CRITICAL_SECTION m_PendingAcceptsLock;

	// 记录连接列表
	CIOCPContext *m_pConnectionList;
	int m_nCurrentConnection;
	CRITICAL_SECTION m_ConnectionListLock;

	// 用于投递Accept请求
	HANDLE m_hAcceptEvent;
	HANDLE m_hRepostEvent;
	LONG m_nRepostCount;

	int m_nPort;				// 服务器监听的端口

	int m_nInitialAccepts;		// 开始时抛出的异步接收投递数
	int m_nInitialReads;
	int m_nMaxAccepts;			// 抛出的异步接收投递数最大值
	int m_nMaxSends;			// 抛出的异步发送投递数最大值(跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作)
	int m_nMaxFreeBuffers;		// 内存池中容纳的最大内存块数(超过该数将在物理上释放内存池)
	int m_nMaxFreeContexts;		// 上下文[套接字信息]内存池中容纳的最大上下文内存块数(超过该数将在物理上释放上下文内存池)
	int m_nMaxConnections;		// 最大连接数

	HANDLE m_hListenThread;			// 监听线程
	HANDLE m_hCompletion;			// 完成端口句柄
	SOCKET m_sListen;				// 监听套节字句柄
	LPFN_ACCEPTEX m_lpfnAcceptEx;	// AcceptEx函数地址
	LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址

	BOOL m_bShutDown;		// 用于通知监听线程退出
	BOOL m_bServerStarted;	// 记录服务是否启动

	CRITICAL_SECTION m_CloseOrErrLock;	// [2009.9.1 add Lostyears]

private:	// 线程函数
	static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
	static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
};

#endif // __IOCP_H__
//////////////////////////////////////////////////
// IOCP.cpp文件

#include "iocp.h"
#pragma comment(lib, "WS2_32.lib")
#include <stdio.h>
#include <mstcpip.h>

CIOCPServer::CIOCPServer()
{
	// 列表
	m_pFreeBufferList = NULL;
	m_pFreeContextList = NULL;
	m_pPendingAccepts = NULL;
	m_pConnectionList = NULL;

	m_nFreeBufferCount = 0;
	m_nFreeContextCount = 0;
	m_nPendingAcceptCount = 0;
	m_nCurrentConnection = 0;

	::InitializeCriticalSection(&m_FreeBufferListLock);
	::InitializeCriticalSection(&m_FreeContextListLock);
	::InitializeCriticalSection(&m_PendingAcceptsLock);
	::InitializeCriticalSection(&m_ConnectionListLock);
	::InitializeCriticalSection(&m_CloseOrErrLock);	// [2009.9.1 add Lostyears]

	// Accept请求
	m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
	m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
	m_nRepostCount = 0;

	m_nPort = 4567;

	m_nInitialAccepts = 10;
	m_nInitialReads = 4;
	m_nMaxAccepts = 100;
	m_nMaxSends = 20;
	m_nMaxFreeBuffers = 200;
	m_nMaxFreeContexts = 100;
	m_nMaxConnections = 2000;

	m_hListenThread = NULL;
	m_hCompletion = NULL;
	m_sListen = INVALID_SOCKET;
	m_lpfnAcceptEx = NULL;
	m_lpfnGetAcceptExSockaddrs = NULL;

	m_bShutDown = FALSE;
	m_bServerStarted = FALSE;

	// 初始化WS2_32.dll
	WSADATA wsaData;
	WORD sockVersion = MAKEWORD(2, 2);
	::WSAStartup(sockVersion, &wsaData);
}

CIOCPServer::~CIOCPServer()
{
	Shutdown();

	if(m_sListen != INVALID_SOCKET)
		::closesocket(m_sListen);
	if(m_hListenThread != NULL)
		::CloseHandle(m_hListenThread);

	::CloseHandle(m_hRepostEvent);
	::CloseHandle(m_hAcceptEvent);

	::DeleteCriticalSection(&m_FreeBufferListLock);
	::DeleteCriticalSection(&m_FreeContextListLock);
	::DeleteCriticalSection(&m_PendingAcceptsLock);
	::DeleteCriticalSection(&m_ConnectionListLock);
	::DeleteCriticalSection(&m_CloseOrErrLock); // [2009.9.1 add Lostyears]

	::WSACleanup();
}

///////////////////////////////////
// 自定义帮助函数

CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
	CIOCPBuffer *pBuffer = NULL;
	if(nLen > BUFFER_SIZE)
		return NULL;

	// 为缓冲区对象申请内存
	::EnterCriticalSection(&m_FreeBufferListLock);
	if(m_pFreeBufferList == NULL)  // 内存池为空,申请新的内存
	{
		pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
						HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
	}
	else	// 从内存池中取一块来使用
	{
		pBuffer = m_pFreeBufferList;
		m_pFreeBufferList = m_pFreeBufferList->pNext;
		pBuffer->pNext = NULL;
		m_nFreeBufferCount --;
	}
	::LeaveCriticalSection(&m_FreeBufferListLock);

	// 初始化新的缓冲区对象
	if(pBuffer != NULL)
	{
		pBuffer->buff = (char*)(pBuffer + 1);
		pBuffer->nLen = nLen;
		//::ZeroMemory(pBuffer->buff, pBuffer->nLen);
	}
	return pBuffer;
}

void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
	::EnterCriticalSection(&m_FreeBufferListLock);

	if(m_nFreeBufferCount < m_nMaxFreeBuffers)	// 将要释放的内存添加到空闲列表中 [2010.5.15 mod Lostyears]old:m_nFreeBufferCount <= m_nMaxFreeBuffers
	{
		memset(pBuffer, 0, sizeof(CIOCPBuffer) + BUFFER_SIZE);
		pBuffer->pNext = m_pFreeBufferList;
		m_pFreeBufferList = pBuffer;

		m_nFreeBufferCount ++ ;
	}
	else			// 已经达到最大值,真正的释放内存
	{
		::HeapFree(::GetProcessHeap(), 0, pBuffer);
	}

	::LeaveCriticalSection(&m_FreeBufferListLock);
}

CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
	CIOCPContext *pContext;

	// 申请一个CIOCPContext对象
	::EnterCriticalSection(&m_FreeContextListLock);
	if(m_pFreeContextList == NULL)
	{
		pContext = (CIOCPContext *)
				::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext)); 

		::InitializeCriticalSection(&pContext->Lock);
	}
	else
	{
		// 在空闲列表中申请
		pContext = m_pFreeContextList;
		m_pFreeContextList = m_pFreeContextList->pNext;
		pContext->pNext = NULL;

		m_nFreeContextCount --; // [2009.8.9 mod Lostyears][old: m_nFreeBufferCount--]
	}

	::LeaveCriticalSection(&m_FreeContextListLock);

	// 初始化对象成员
	if(pContext != NULL)
	{
		pContext->s = s;

		// [2009.8.22 add Lostyears]
		pContext->bNotifyCloseOrError = false;
	}

	return pContext;
}

void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
	if(pContext->s != INVALID_SOCKET)
		::closesocket(pContext->s);

	// 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
	CIOCPBuffer *pNext;
	while(pContext->pOutOfOrderReads != NULL)
	{
		pNext = pContext->pOutOfOrderReads->pNext;
		ReleaseBuffer(pContext->pOutOfOrderReads);
		pContext->pOutOfOrderReads = pNext;
	}

	::EnterCriticalSection(&m_FreeContextListLock);

	if(m_nFreeContextCount < m_nMaxFreeContexts) // 添加到空闲列表 [2010.4.10 mod Lostyears][old: m_nFreeContextCount <= m_nMaxFreeContexts]如果m_nFreeContextCount==m_nMaxFreeContexts时,会在下一次导致m_nFreeContextCount>m_nMaxFreeContexts

	{
		// 先将关键代码段变量保存到一个临时变量中
		CRITICAL_SECTION cstmp = pContext->Lock;

		// 将要释放的上下文对象初始化为0
		memset(pContext, 0, sizeof(CIOCPContext));

		// 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
		pContext->Lock = cstmp;
		pContext->pNext = m_pFreeContextList;
		m_pFreeContextList = pContext;

		// 更新计数
		m_nFreeContextCount ++;
	}
	else // 已经达到最大值,真正地释放
	{
		::DeleteCriticalSection(&pContext->Lock);
		::HeapFree(::GetProcessHeap(), 0, pContext);
		pContext = NULL;
	}

	::LeaveCriticalSection(&m_FreeContextListLock);
}

void CIOCPServer::FreeBuffers()
{
	// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
	::EnterCriticalSection(&m_FreeBufferListLock);

	CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
	CIOCPBuffer *pNextBuffer;
	while(pFreeBuffer != NULL)
	{
		pNextBuffer = pFreeBuffer->pNext;
		if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
		{
#ifdef _DEBUG
			::OutputDebugString("  FreeBuffers释放内存出错!");
#endif // _DEBUG
			break;
		}
		pFreeBuffer = pNextBuffer;
	}
	m_pFreeBufferList = NULL;
	m_nFreeBufferCount = 0;

	::LeaveCriticalSection(&m_FreeBufferListLock);
}

void CIOCPServer::FreeContexts()
{
	// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
	::EnterCriticalSection(&m_FreeContextListLock);

	CIOCPContext *pFreeContext = m_pFreeContextList;
	CIOCPContext *pNextContext;
	while(pFreeContext != NULL)
	{
		pNextContext = pFreeContext->pNext;

		::DeleteCriticalSection(&pFreeContext->Lock);
		if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
		{
#ifdef _DEBUG
			::OutputDebugString("  FreeBuffers释放内存出错!");
#endif // _DEBUG
			break;
		}
		pFreeContext = pNextContext;
	}
	m_pFreeContextList = NULL;
	m_nFreeContextCount = 0;

	::LeaveCriticalSection(&m_FreeContextListLock);
}

BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
	// 向客户连接列表添加一个CIOCPContext对象

	::EnterCriticalSection(&m_ConnectionListLock);
	if(m_nCurrentConnection < m_nMaxConnections)
	{
		// 添加到表头
		pContext->pNext = m_pConnectionList;
		m_pConnectionList = pContext;
		// 更新计数
		m_nCurrentConnection ++;

		::LeaveCriticalSection(&m_ConnectionListLock);
		return TRUE;
	}
	::LeaveCriticalSection(&m_ConnectionListLock);

	return FALSE;
}

void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
{
	// 首先从列表中移除要关闭的连接
	::EnterCriticalSection(&m_ConnectionListLock);

	CIOCPContext* pTest = m_pConnectionList;
	if(pTest == pContext)
	{
		m_pConnectionList =  pTest->pNext; // [2009.8.9 mod Lostyears][old: m_pConnectionList =  pContext->pNext]
		m_nCurrentConnection --;
	}
	else
	{
		while(pTest != NULL && pTest->pNext !=  pContext)
			pTest = pTest->pNext;
		if(pTest != NULL)
		{
			pTest->pNext =  pContext->pNext;
			m_nCurrentConnection --;
		}
	}

	::LeaveCriticalSection(&m_ConnectionListLock);

	// 然后关闭客户套节字
	::EnterCriticalSection(&pContext->Lock);

	if(pContext->s != INVALID_SOCKET)
	{
		::closesocket(pContext->s);
		pContext->s = INVALID_SOCKET;
	}
	pContext->bClosing = TRUE;

	::LeaveCriticalSection(&pContext->Lock);
}

void CIOCPServer::CloseAllConnections()
{
	// 遍历整个连接列表,关闭所有的客户套节字

	::EnterCriticalSection(&m_ConnectionListLock);

	CIOCPContext *pContext = m_pConnectionList;
	while(pContext != NULL)
	{
		::EnterCriticalSection(&pContext->Lock);

		if(pContext->s != INVALID_SOCKET)
		{
			::closesocket(pContext->s);
			pContext->s = INVALID_SOCKET;
		}

		pContext->bClosing = TRUE;

		::LeaveCriticalSection(&pContext->Lock);	

		pContext = pContext->pNext;
	}

	m_pConnectionList = NULL;
	m_nCurrentConnection = 0;

	::LeaveCriticalSection(&m_ConnectionListLock);
}

BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
	// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中

	::EnterCriticalSection(&m_PendingAcceptsLock);

	if(m_pPendingAccepts == NULL)
		m_pPendingAccepts = pBuffer;
	else
	{
		pBuffer->pNext = m_pPendingAccepts;
		m_pPendingAccepts = pBuffer;
	}
	m_nPendingAcceptCount ++;

	::LeaveCriticalSection(&m_PendingAcceptsLock);

	return TRUE;
}

BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
	BOOL bResult = FALSE;

	// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
	::EnterCriticalSection(&m_PendingAcceptsLock);

	CIOCPBuffer *pTest = m_pPendingAccepts;
	if(pTest == pBuffer)	// 如果是表头元素
	{
		m_pPendingAccepts = pTest->pNext; // [2009.8.9 mod Lostyears][old: m_pPendingAccepts = pBuffer->pNext]
		bResult = TRUE;
	}
	else					// 不是表头元素的话,就要遍历这个表来查找了
	{
		while(pTest != NULL && pTest->pNext != pBuffer)
			pTest = pTest->pNext;
		if(pTest != NULL)
		{
			pTest->pNext = pBuffer->pNext;
			 bResult = TRUE;
		}
	}
	// 更新计数
	if(bResult)
		m_nPendingAcceptCount --;

	::LeaveCriticalSection(&m_PendingAcceptsLock);

	return  bResult;
}

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	if(pBuffer != NULL)
	{
		// 如果与要读的下一个序列号相等,则读这块缓冲区
		if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
		{
			return pBuffer;
		}

		// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中

		// 列表中的缓冲区是按照其序列号从小到大的顺序排列的

		pBuffer->pNext = NULL;

		CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
		CIOCPBuffer *pPre = NULL;
		while(ptr != NULL)
		{
			if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
				break;

			pPre = ptr;
			ptr = ptr->pNext;
		}

		if(pPre == NULL) // 应该插入到表头
		{
			pBuffer->pNext = pContext->pOutOfOrderReads;
			pContext->pOutOfOrderReads = pBuffer;
		}
		else			// 应该插入到表的中间
		{
			pBuffer->pNext = pPre->pNext;
			pPre->pNext = pBuffer; // [2009.8.9 mod Lostyears][old: pPre->pNext = pBuffer->pNext]
		}
	}

	// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
	CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
	if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
	{
		pContext->pOutOfOrderReads = ptr->pNext;
		return ptr;
	}
	return NULL;
}

BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)	// 在监听套节字上投递Accept请求
{
		// 设置I/O类型
		pBuffer->nOperation = OP_ACCEPT;

		// 投递此重叠I/O
		DWORD dwBytes;
		pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
		BOOL b = m_lpfnAcceptEx(m_sListen,
			pBuffer->sClient,
			pBuffer->buff,
			pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2), // [2010.5.16 bak Lostyears]如果这里为0, 表示不等待接收数据而通知, 如果这里改为0, 则GetAcceptExSockaddrs函数中的相应参数也得相应改
			sizeof(sockaddr_in) + 16,
			sizeof(sockaddr_in) + 16,
			&dwBytes,
			&pBuffer->ol);
		if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
		{
			return FALSE;
		}
		return TRUE;
}

BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	// 设置I/O类型
	pBuffer->nOperation = OP_READ;	

	::EnterCriticalSection(&pContext->Lock);

	// 设置序列号
	pBuffer->nSequenceNumber = pContext->nReadSequence;

	// 投递此重叠I/O
	DWORD dwBytes;
	DWORD dwFlags = 0;
	WSABUF buf;
	buf.buf = pBuffer->buff;
	buf.len = pBuffer->nLen;
	if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
	{
		if(::WSAGetLastError() != WSA_IO_PENDING)
		{
			::LeaveCriticalSection(&pContext->Lock);
			return FALSE;
		}
	}

	// 增加套节字上的重叠I/O计数和读序列号计数

	pContext->nOutstandingRecv ++;
	pContext->nReadSequence ++;

	::LeaveCriticalSection(&pContext->Lock);

	return TRUE;
}

BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
	if(pContext->nOutstandingSend > m_nMaxSends)
		return FALSE;

	// 设置I/O类型,增加套节字上的重叠I/O计数
	pBuffer->nOperation = OP_WRITE;

	// 投递此重叠I/O
	DWORD dwBytes;
	DWORD dwFlags = 0;
	WSABUF buf;
	buf.buf = pBuffer->buff;
	buf.len = pBuffer->nLen;
	if(::WSASend(pContext->s,
			&buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
	{
		if(::WSAGetLastError() != WSA_IO_PENDING)
			return FALSE;
	}	

	// 增加套节字上的重叠I/O计数
	::EnterCriticalSection(&pContext->Lock);
	pContext->nOutstandingSend ++;
	::LeaveCriticalSection(&pContext->Lock);

	return TRUE;
}

BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
			int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
{
	// 检查服务是否已经启动
	if(m_bServerStarted)
		return FALSE;

	// 保存用户参数
	m_nPort = nPort;
	m_nMaxConnections = nMaxConnections;
	m_nMaxFreeBuffers = nMaxFreeBuffers;
	m_nMaxFreeContexts = nMaxFreeContexts;
	m_nInitialReads = nInitialReads;

	// 初始化状态变量
	m_bShutDown = FALSE;
	m_bServerStarted = TRUE;

	// 创建监听套节字,绑定到本地端口,进入监听模式
	m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	SOCKADDR_IN si;
	si.sin_family = AF_INET;
	si.sin_port = ::ntohs(m_nPort);
	si.sin_addr.S_un.S_addr = INADDR_ANY;
	if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
	{
		m_bServerStarted = FALSE;
		return FALSE;
	}
	::listen(m_sListen, 200);

	// 创建完成端口对象
	m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

	// 加载扩展函数AcceptEx
	GUID GuidAcceptEx = WSAID_ACCEPTEX;
	DWORD dwBytes;
	::WSAIoctl(m_sListen,
		SIO_GET_EXTENSION_FUNCTION_POINTER,
		&GuidAcceptEx,
		sizeof(GuidAcceptEx),
		&m_lpfnAcceptEx,
		sizeof(m_lpfnAcceptEx),
		&dwBytes,
		NULL,
		NULL);

	// 加载扩展函数GetAcceptExSockaddrs
	GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
	::WSAIoctl(m_sListen,
		SIO_GET_EXTENSION_FUNCTION_POINTER,
		&GuidGetAcceptExSockaddrs,
		sizeof(GuidGetAcceptExSockaddrs),
		&m_lpfnGetAcceptExSockaddrs,
		sizeof(m_lpfnGetAcceptExSockaddrs),
		&dwBytes,
		NULL,
		NULL
		);

	// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
	::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);

	// 注册FD_ACCEPT事件。
	// 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
	WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);

	// 创建监听线程
	m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);

	return TRUE;
}

void CIOCPServer::Shutdown()
{
	if(!m_bServerStarted)
		return;

	// 通知监听线程,马上停止服务
	m_bShutDown = TRUE;
	::SetEvent(m_hAcceptEvent);
	// 等待监听线程退出
	::WaitForSingleObject(m_hListenThread, INFINITE);
	::CloseHandle(m_hListenThread);
	m_hListenThread = NULL;

	m_bServerStarted = FALSE;
}

DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
{
	CIOCPServer *pThis = (CIOCPServer*)lpParam;

	// 先在监听套节字上投递几个Accept I/O
	CIOCPBuffer *pBuffer;
	for(int i=0; i<pThis->m_nInitialAccepts; i++)
	{
		pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
		if(pBuffer == NULL)
			return -1;
		pThis->InsertPendingAccept(pBuffer);
		pThis->PostAccept(pBuffer);
	}

	// 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
	HANDLE hWaitEvents[2 + MAX_THREAD];
	int nEventCount = 0;
	hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
	hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;

	// 创建指定数量的工作线程在完成端口上处理I/O
	for(int i=0; i<MAX_THREAD; i++)
	{
		hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
	}

	// 下面进入无限循环,处理事件对象数组中的事件
	while(TRUE)
	{
		int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);

		// 首先检查是否要停止服务
		if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
		{
			// 关闭所有连接
			pThis->CloseAllConnections();
			::Sleep(0);		// 给I/O工作线程一个执行的机会
			// 关闭监听套节字
			::closesocket(pThis->m_sListen);
			pThis->m_sListen = INVALID_SOCKET;
			::Sleep(0);		// 给I/O工作线程一个执行的机会

			// 通知所有I/O处理线程退出
			for(int i=2; i<MAX_THREAD + 2; i++)
			{
				::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
			}

			// 等待I/O处理线程退出
			::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);

			for(int i=2; i<MAX_THREAD + 2; i++)
			{
				::CloseHandle(hWaitEvents[i]);
			}

			::CloseHandle(pThis->m_hCompletion);

			pThis->FreeBuffers();
			pThis->FreeContexts();
			::ExitThread(0);
		}	

		// 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
		if(nIndex == WSA_WAIT_TIMEOUT)
		{
			pBuffer = pThis->m_pPendingAccepts;
			while(pBuffer != NULL)
			{
				int nSeconds;
				int nLen = sizeof(nSeconds);
				// 取得连接建立的时间
				::getsockopt(pBuffer->sClient,
					SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
				// 如果超过2分钟客户还不发送初始数据,就让这个客户go away
				if(nSeconds != -1 && nSeconds > 2*60)
				{
					closesocket(pBuffer->sClient);
                    pBuffer->sClient = INVALID_SOCKET;
				}

				pBuffer = pBuffer->pNext;
			}
		}
		else
		{
			nIndex = nIndex - WAIT_OBJECT_0;
			WSANETWORKEVENTS ne;
            int nLimit=0;
			if(nIndex == 0)			// 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
			{
				::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
				if(ne.lNetworkEvents & FD_ACCEPT)
				{
					nLimit = 50;  // 增加的个数,这里设为50个
				}
			}
			else if(nIndex == 1)	// 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
			{
				nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
			}
			else if(nIndex > 1)		// I/O服务线程退出,说明有错误发生,关闭服务器
			{
				pThis->m_bShutDown = TRUE;
				continue;
			}

			// 投递nLimit个AcceptEx I/O请求
			int i = 0;
			while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
			{
				pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
				if(pBuffer != NULL)
				{
					pThis->InsertPendingAccept(pBuffer);
					pThis->PostAccept(pBuffer);
				}
			}
		}
	}
	return 0;
}

DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
			::OutputDebugString("	WorkerThread 启动... \n");
#endif // _DEBUG

	CIOCPServer *pThis = (CIOCPServer*)lpParam;

	CIOCPBuffer *pBuffer;
	DWORD dwKey;
	DWORD dwTrans;
	LPOVERLAPPED lpol;
	while(TRUE)
	{
		// 在关联到此完成端口的所有套节字上等待I/O完成
		BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
					&dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);

		if(dwTrans == -1) // 用户通知退出
		{
#ifdef _DEBUG
			::OutputDebugString("	WorkerThread 退出 \n");
#endif // _DEBUG
			::ExitThread(0);
		}

		pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol); // [2009.8.9 bak Lostyears][lpol作为CIOCPBuffer的ol成员,由其地址取CIOCPBuffer实例首地址]
		int nError = NO_ERROR;
		if(!bOK)						// 在此套节字上有错误发生
		{
			SOCKET s;
			if(pBuffer->nOperation == OP_ACCEPT)
			{
				s = pThis->m_sListen;
			}
			else
			{
				if(dwKey == 0)
					break;
				s = ((CIOCPContext*)dwKey)->s;
			}
			DWORD dwFlags = 0;
			if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
			{
				nError = ::WSAGetLastError();
			}
		}
		pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
	}

#ifdef _DEBUG
			::OutputDebugString("	WorkerThread 退出 \n");
#endif // _DEBUG
	return 0;
}

void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
	CIOCPContext *pContext = (CIOCPContext *)dwKey;

#ifdef _DEBUG
			::OutputDebugString("	HandleIO... \n");
#endif // _DEBUG

	// 1)首先减少套节字上的未决I/O计数
	if(pContext != NULL)
	{
		::EnterCriticalSection(&pContext->Lock);

		if(pBuffer->nOperation == OP_READ)
			pContext->nOutstandingRecv --;
		else if(pBuffer->nOperation == OP_WRITE)
			pContext->nOutstandingSend --;

		::LeaveCriticalSection(&pContext->Lock);

		// 2)检查套节字是否已经被我们关闭 [2009.8.9 bak Lostyears][如果关闭则释放剩下的未决IO]
		if(pContext->bClosing)
		{
#ifdef _DEBUG
			::OutputDebugString("	检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{
				ReleaseContext(pContext);
			}
			// 释放已关闭套节字的未决I/O
			ReleaseBuffer(pBuffer);
			return;
		}
	}
	else
	{
		RemovePendingAccept(pBuffer); // [2009.8.9 bak Lostyears][sListen关联了iocp, 关联时dwKey为0, 所以当有新连接发送数据时会执行到此]
	}

	// 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
	if(nError != NO_ERROR)
	{
		if(pBuffer->nOperation != OP_ACCEPT)
		{
			NotifyConnectionError(pContext, pBuffer, nError);
			CloseAConnection(pContext);
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{
				ReleaseContext(pContext);
			}
#ifdef _DEBUG
			::OutputDebugString("	检查到客户套节字上发生错误 \n");
#endif // _DEBUG
		}
		else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
		{
			// 客户端出错,释放I/O缓冲区
			if(pBuffer->sClient != INVALID_SOCKET)
			{
				::closesocket(pBuffer->sClient);
				pBuffer->sClient = INVALID_SOCKET;
			}
#ifdef _DEBUG
			::OutputDebugString("	检查到监听套节字上发生错误 \n");
#endif // _DEBUG
		}

		ReleaseBuffer(pBuffer);
		return;
	}

	// 开始处理
	if(pBuffer->nOperation == OP_ACCEPT)
	{
		if(dwTrans == 0) // [2010.5.16 bak Lostyears]如果AcceptEx的数据接收缓冲区设为0, 一连接上就会执行到这
		{
#ifdef _DEBUG
			::OutputDebugString("	监听套节字上客户端关闭 \n");
#endif // _DEBUG

			if(pBuffer->sClient != INVALID_SOCKET)
			{
				::closesocket(pBuffer->sClient);
				pBuffer->sClient = INVALID_SOCKET;
			}
		}
		else
		{
			// 为新接受的连接申请客户上下文对象
		CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
			if(pClient != NULL)
			{
				if(AddAConnection(pClient))
				{
					// 取得客户地址
					int nLocalLen, nRmoteLen;
					LPSOCKADDR pLocalAddr, pRemoteAddr;
					m_lpfnGetAcceptExSockaddrs(
						pBuffer->buff,
						pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2), // [2010.5.16 bak Lostyears]和AcceptEx相应参数对应
						sizeof(sockaddr_in) + 16,
						sizeof(sockaddr_in) + 16,
						(SOCKADDR **)&pLocalAddr,
						&nLocalLen,
						(SOCKADDR **)&pRemoteAddr,
						&nRmoteLen);
					memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
					memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);

					// [2010.1.15 add Lostyears][加入KeepAlive机制]
					BOOL bKeepAlive = TRUE;
					int nRet = ::setsockopt(pClient->s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));
					if (nRet == SOCKET_ERROR)
					{
						CloseAConnection(pClient);
					}
					else
					{
						// 设置KeepAlive参数
						tcp_keepalive alive_in	= {0};
						tcp_keepalive alive_out	= {0};
						alive_in.keepalivetime		= 5000;	// 开始首次KeepAlive探测前的TCP空闲时间
						alive_in.keepaliveinterval	= 1000;	// 两次KeepAlive探测间的时间间隔
						alive_in.onoff	= TRUE;
						unsigned long ulBytesReturn	= 0;
						nRet = ::WSAIoctl(pClient->s, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),
							&alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);
						if (nRet == SOCKET_ERROR)
						{
							CloseAConnection(pClient);
						}
						else
						{
							// 关联新连接到完成端口对象
							::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 2);

							// 通知用户
							pBuffer->nLen = dwTrans;
							OnConnectionEstablished(pClient, pBuffer);

							// 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放
							for(int i=0; i<m_nInitialReads; i++) // [2009.8.21 mod Lostyears][将常量值改为m_nInitialReads]
							{
								CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
								if(p != NULL)
								{
									if(!PostRecv(pClient, p))
									{
										CloseAConnection(pClient);
										break;
									}
								}
							}
						}
					}

 					//// 关联新连接到完成端口对象
 					//::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);
 					//
 					//// 通知用户
 					//pBuffer->nLen = dwTrans;
 					//OnConnectionEstablished(pClient, pBuffer);
 					//
 					//// 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放
 					//for(int i=0; i<m_nInitialReads; i++) // [2009.8.22 mod Lostyears][old: i<5]
 					//{
 					//	CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
 					//	if(p != NULL)
 					//	{
 					//		if(!PostRecv(pClient, p))
 					//		{
 					//			CloseAConnection(pClient);
 					//			break;
 					//		}
 					//	}
 					//}
				}
				else	// 连接数量已满,关闭连接
				{
					CloseAConnection(pClient);
					ReleaseContext(pClient);
				}
			}
			else
			{
				// 资源不足,关闭与客户的连接即可
				::closesocket(pBuffer->sClient);
				pBuffer->sClient = INVALID_SOCKET;
			}
		}

		// Accept请求完成,释放I/O缓冲区
		ReleaseBuffer(pBuffer);	

		// 通知监听线程继续再投递一个Accept请求
		::InterlockedIncrement(&m_nRepostCount);
		::SetEvent(m_hRepostEvent);
	}
	else if(pBuffer->nOperation == OP_READ)
	{
		if(dwTrans == 0)	// 对方关闭套节字
		{
			// 先通知用户
			pBuffer->nLen = 0;
			NotifyConnectionClosing(pContext, pBuffer);	

			// 再关闭连接
			CloseAConnection(pContext);
			// 释放客户上下文和缓冲区对象
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{
				ReleaseContext(pContext);
			}
			ReleaseBuffer(pBuffer);
		}
		else
		{
			pBuffer->nLen = dwTrans;
			// 按照I/O投递的顺序读取接收到的数据
			CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
			while(p != NULL)
			{
				// 通知用户
				OnReadCompleted(pContext, p);
				// 增加要读的序列号的值
				::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
				// 释放这个已完成的I/O
				ReleaseBuffer(p);
				p = GetNextReadBuffer(pContext, NULL);
			}

			// 继续投递一个新的接收请求
			pBuffer = AllocateBuffer(BUFFER_SIZE);
			if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
			{
				CloseAConnection(pContext);
			}
		}
	}
	else if(pBuffer->nOperation == OP_WRITE)
	{

		if(dwTrans == 0)	// 对方关闭套节字
		{
			// 先通知用户
			pBuffer->nLen = 0;
			NotifyConnectionClosing(pContext, pBuffer);	

			// 再关闭连接
			CloseAConnection(pContext);

			// 释放客户上下文和缓冲区对象
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{
				ReleaseContext(pContext);
			}
			ReleaseBuffer(pBuffer);
		}
		else
		{
			// 写操作完成,通知用户
			pBuffer->nLen = dwTrans;
			OnWriteCompleted(pContext, pBuffer);
			// 释放SendText函数申请的缓冲区
			ReleaseBuffer(pBuffer);
		}
	}
}

// 当套件字关闭或出错时通知
void CIOCPServer::NotifyConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	::EnterCriticalSection(&m_CloseOrErrLock);
	if (!pContext->bNotifyCloseOrError)
	{
		pContext->bNotifyCloseOrError = true;
		OnConnectionClosing(pContext, pBuffer);
	}
	::LeaveCriticalSection(&m_CloseOrErrLock);
}

void CIOCPServer::NotifyConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
	::EnterCriticalSection(&m_CloseOrErrLock);
	if (!pContext->bNotifyCloseOrError)
	{
		pContext->bNotifyCloseOrError = true;
		OnConnectionError(pContext, pBuffer, nError);
	}
	::LeaveCriticalSection(&m_CloseOrErrLock);
}

BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
{
	CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
	if(pBuffer != NULL)
	{
		memcpy(pBuffer->buff, pszText, nLen);
		return PostSend(pContext, pBuffer);
	}
	return FALSE;
}

void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
}
////////////////////////////////////////////////
// iocpserver.cpp文件

// CIOCPServer类的测试程序

#include "iocp.h"
#include <stdio.h>
#include <windows.h>

class CMyServer : public CIOCPServer
{
public:

	void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
	{
		printf("接收到一个新的连接(%d): %s\n",
					GetCurrentConnection(), ::inet_ntoa(pContext->addrRemote.sin_addr));
		printf("接受到一个数据包, 其大小为: %d字节\n", pBuffer->nLen);

		SendText(pContext, pBuffer->buff, pBuffer->nLen);
	}

	void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
	{
		printf("一个连接关闭\n");
	}

	void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
	{
		printf("一个连接发生错误: %d\n", nError);
	}

	void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
	{
		printf("接受到一个数据包, 其大小为: %d字节\n", pBuffer->nLen);
		SendText(pContext, pBuffer->buff, pBuffer->nLen);
	}

	void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
	{
		printf("一个数据包发送成功, 其大小为: %d字节\n ", pBuffer->nLen);
	}
};

void main()
{
	CMyServer *pServer = new CMyServer;

	// 开启服务
	if(pServer->Start())
	{
		printf("服务器开启成功...\n");
	}
	else
	{
		printf("服务器开启失败!\n");
		return;
	}

	// 创建事件对象,让ServerShutdown程序能够关闭自己
	HANDLE hEvent = ::CreateEvent(NULL, FALSE, FALSE, "ShutdownEvent");
	::WaitForSingleObject(hEvent, INFINITE);
	::CloseHandle(hEvent);

	// 关闭服务
	pServer->Shutdown();
	delete pServer;

	printf("服务器关闭\n ");

}

参考:

《Windows网络与通信程序设计》 王艳平 张越

转载请注明!

时间: 2024-09-29 04:51:06

Winsock IO模型之IOCP模型的相关文章

IOCP模型与网络编

一.前言:        在老师分配任务(“尝试利用IOCP模型写出服务端和客户端的代码”)给我时,脑子一片空白,并不知道什么是IOCP模型,会不会是像软件设计模式里面的工厂模式,装饰模式之类的那些呢?嘿嘿,不过好像是一个挺好玩的东西,挺好奇是什么东西来的,又是一个新知识啦~于是,开始去寻找一大堆的资料,为这个了解做准备,只是呢,有时还是想去找一本书去系统地学习一下,毕竟网络的资料还是有点零散.话说,本人学习这个模型的基础是,写过一个简单的Socket服务器及客户端程序,外加一个简单的Socke

IOCP模型与网络编程

IOCP模型与网络编程 一.前言:        在老师分配任务("尝试利用IOCP模型写出服务端和客户端的代码")给我时,脑子一片空白,并不知道什么是IOCP模型,会不会是像软件设计模式里面的工厂模式,装饰模式之类的那些呢?嘿嘿,不过好像是一个挺好玩的东西,挺好奇是什么东西来的,又是一个新知识啦~于是,开始去寻找一大堆的资料,为这个了解做准备,只是呢,有时还是想去找一本书去系统地学习一下,毕竟网络的资料还是有点零散.话说,本人学习这个模型的基础是,写过一个简单的Socket服务器及客

IOCP模型

IOCP http://blog.csdn.net/zhongguoren666/article/details/7386592 Winsock IO模型之IOCP模型 http://blog.csdn.net/lostyears/article/details/7436802

winsock编程IOCP模型实现代码

winsock编程IOCP模型实现代码 话不多说,上代码.借鉴<windows核心编程>部分源码和CSDN小猪部分代码. stdafx.h依赖头文件: 1 #include <iostream> 2 #include <WinSock2.h> 3 #include <MSWSock.h> 4 #include <vector> 5 #include "Singleton.h" 6 #include "IOCPWrap

IOCP模型总结(总结回顾)

IOCP旧代码重提,最近一直在玩其他方面的东东,时不时回顾一下,收益多多. IOCP(I/O Completion Port,I/O完成端口)是性能最好的一种I/O模型.它是应用程序使用线程池处理异步I/O请求的一种机制.在处理多个并发的异步I/O请求时,以往的模型都是在接收请求是创建一个线程来应答请求.这样就有很多的线程并行地运行在系统中.而这些线程都是可运行的,Windows内核花费大量的时间在进行线程的上下文切换,并没有多少时间花在线程运行上.再加上创建新线程的开销比较大,所以造成了效率的

【IOCP】 IOCP模型属于一种通讯模型- 较难

http://baike.baidu.com/link?url=e9vXkKd2aHp8VDr1XTURdwQB4K85r28IYjeMwRIyuaXtsrCsXHY1eohiFgsDXRYRlj6xEQoZFzH9dgKwla2n3q IOCP(I/O Completion Port),常称I/O完成端口. IOCP模型属于一种通讯模型,适用于(能控制并发执行的)高负载服务器的一个技术. 通俗一点说,就是用于高效处理很多很多的客户端进行数据交换的一个模型.或者可以说,就是能异步I/O操作的模型

IOCP模型、EPOLL模型的比较以及游戏服务器端的一些建议

一:IOCP和Epoll之间的异同.异:1:IOCP是WINDOWS系统下使用.Epoll是Linux系统下使用.2:IOCP是IO操作完毕之后,通过Get函数获得一个完成的事件通知.Epoll是当你希望进行一个IO操作时,向Epoll查询是否可读或者可写,若处于可读或可写状态后,Epoll会通过epoll_wait进行通知.3:IOCP封装了异步的消息事件的通知机制,同时封装了部分IO操作.但Epoll仅仅封装了一个异步事件的通知机制,并不负责IO读写操作.Epoll保持了事件通知和IO操作间

对于TCP协议中IOCP模型的一些简单的理解

请不要觉得这一篇没有代码的文章没意义,对IOCP模型的代码,百度搜索可以得到很多,但是后续很多需要纠结的地方,很多人都经历过,如果你已经在尝试写IOCP服务端了,那么你很可能会对写代码之外的一些设计问题很纠结,那么本文很可能是对你有所帮助的,这一个帖子是我开的讨论帖,我不是很懂CSDN的帖子分数的意义,我觉得那对于我这种1年难得发1贴的人来说估计也没什么作用,但我很希望大家能一起参与进来讨论:http://bbs.csdn.net/topics/390890567?page=1#post-398

Nginx源码分析 - Nginx启动以及IOCP模型

Nginx 源码分析 - Nginx启动以及IOCP模型 版本及平台信息 本文档针对Nginx1.11.7版本,分析Windows下的相关代码,虽然服务器可能用linux更多,但是windows平台下的代码也基本相似 ,另外windows的IOCP完成端口,异步IO模型非常优秀,很值得一看. Nginx启动 曾经有朋友问我,面对一个大项目的源代码,应该从何读起呢?我给他举了一个例子,我们学校大一大二是在紫金港校区,到了 大三搬到玉泉校区,但是大一的时候也会有时候有事情要去玉泉办.偶尔会去玉泉,但