现在的位置: 首页 > 综合 > 正文

[Windows]7种网络编程I/O模型代码实现实例

2018年04月02日 ⁄ 综合 ⁄ 共 116438字 ⁄ 字号 评论关闭

From:

http://blog.csdn.net/woshinia/article/details/8585930

 

部分代码参考《[WINDOWS网络与通信程序设计].王艳平》,网络中一些I/O模型的代码都没有对socket是否可写做过深入研究,我这边会提供一些解决方法。

阻塞模式下,send会发生阻塞(非阻塞模式下send返回WSAEWOULDBLOCK错误,重叠I/O下表现为投递的发送请求一直无法完成)的情况一般可以分为3种 : 

1,  服务器虽然发送了大量数据,但客户端并未调用recv函数去接。

2,网络状况不佳,发送缓冲区中的数据一直发不出去。

3,发送数据量很大,如下载功能,协议发送数据的速度比不上send函数将数据拷贝到发送缓冲区的速度。

对于1,2情况,我们似乎可以直接关闭套接字,让客户端重新请求。但对于3,却不行。而且实际操作过程中,我们无法区分是1,2,还是3,我们能做的是尽量去保证发送的正确性。当然防止1情况或者2情况中长时间网络不畅,可以设定超时。若socket一直处于不可写状态超过1分钟,那么就关闭套接字。在最后的IOCP模型中就加入了这种超时机制。其他模型若要加入,可参考它来做。

一,基本的阻塞模型

  1. #include <WinSock2.h>
      
  2. #include <Windows.h>
      
  3. #include <stdio.h>
      
  4.   
  5. #pragma comment(lib,"Ws2_32.lib")
      
  6.   
  7. DWORD WINAPI WorkThread(void* param)  
  8. {  
  9.     SOCKET* psClient = (SOCKET*)param;  
  10.     char buf[4096];  
  11.     while(true)  
  12.     {  
  13.         int len = recv(*psClient,buf,4096,0);  
  14.         if(len <= 0)  
  15.         {  
  16.             printf("recv失败!%d\n",WSAGetLastError());  
  17.             Sleep(5000);  
  18.             break;  
  19.         }  
  20.         buf[len] = '\0';  
  21.         printf("收到数据:%s\n",buf);  
  22.     }  
  23.     closesocket(*psClient);  
  24.     delete psClient;  
  25.     return 0;  
  26. }  
  27.   
  28. int main()  
  29. {  
  30.     WSAData wsaData;  
  31.     if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))  
  32.     {  
  33.         printf("WSAStartup失败!\n",WSAGetLastError());  
  34.         Sleep(5000);  
  35.         return 0;  
  36.     }  
  37.     USHORT nPort = 3456;  
  38.     SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
  39.     sockaddr_in sin;  
  40.     sin.sin_family = AF_INET;  
  41.     sin.sin_port = htons(nPort);  
  42.     sin.sin_addr.S_un.S_addr = INADDR_ANY;  
  43.   
  44.     if(SOCKET_ERROR == ::bind(sListen,(sockaddr*)&sin,sizeof(sin)))  
  45.     {  
  46.         printf("bind失败!%d\n",WSAGetLastError());  
  47.         Sleep(5000);  
  48.         return -1;  
  49.     }  
  50.   
  51.     ::listen(sListen,5);  
  52.   
  53.     while(true)  
  54.     {  
  55.         sockaddr_in addrRemote;  
  56.         int nAddrLen = sizeof(addrRemote);  
  57.         SOCKET *psClient = new SOCKET;  
  58.         *psClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);  
  59.         HANDLE hThread = CreateThread(NULL,0,WorkThread,psClient,0,NULL);  
  60.         CloseHandle(hThread);  
  61.     }  
  62.     closesocket(sListen);  
  63.     WSACleanup();  
  64. }  

二,无任何优化的非阻塞模型

  1. #include <WinSock2.h>
      
  2. #include <Windows.h>
      
  3. #include <stdio.h>
      
  4. #include <vector>
      
  5. using namespace std;  
  6.   
  7. #pragma comment(lib,"Ws2_32.lib")
      
  8.   
  9. CRITICAL_SECTION g_cs;  
  10. HANDLE           g_StartEvent;  
  11. vector<SOCKET> g_vecClients;  
  12. int g_iVecSize = 0;  
  13. DWORD WINAPI WorkThread(void* param)  
  14. {  
  15.     char buf[4096];  
  16.     while(1)  
  17.     {  
  18.         if(g_vecClients.empty())  
  19.         {  
  20.             ResetEvent(g_StartEvent);  
  21.             WaitForSingleObject(g_StartEvent,INFINITE);  
  22.         }  
  23.   
  24.         EnterCriticalSection(&g_cs);  
  25.         for(vector<SOCKET>::iterator it = g_vecClients.begin();it != g_vecClients.end();)  
  26.         {  
  27.             int len = recv(*it,buf,4096,0);  
  28.             if(len == SOCKET_ERROR)  
  29.             {  
  30.                 if(WSAEWOULDBLOCK != WSAGetLastError())  
  31.                 {  
  32.                     printf("recv Error:%d\n",WSAGetLastError());  
  33.                     closesocket(*it);  
  34.                     it = g_vecClients.erase(it);  
  35.                 }  
  36.                 else  
  37.                 {  
  38.                     printf("%d.",*it);  
  39.                     ++it;  
  40.                 }  
  41.             }  
  42.             else  
  43.             {  
  44.                 buf[len] = 0;  
  45.                 printf("收到数据: %s\n",buf);  
  46.                 ++it;  
  47.             }  
  48.         }  
  49.         LeaveCriticalSection(&g_cs);  
  50.         Sleep(100);  
  51.   
  52.     }  
  53.     return 0;  
  54. }  
  55.   
  56. int main()  
  57. {  
  58.     InitializeCriticalSectionAndSpinCount(&g_cs,4000);  
  59.     g_StartEvent = CreateEvent(NULL,FALSE,FALSE,NULL);  
  60.   
  61.     WSAData wsaDate;  
  62.     WSAStartup(MAKEWORD(2,2),&wsaDate);  
  63.     USHORT nport = 3456;  
  64.     u_long ul = 1;  
  65.     SOCKET s = socket(AF_INET,SOCK_STREAM,0);  
  66.     ioctlsocket(s,FIONBIO,&ul);  
  67.     sockaddr_in sin;  
  68.     sin.sin_family = AF_INET;  
  69.     sin.sin_port = htons(nport);  
  70.     sin.sin_addr.S_un.S_addr = ADDR_ANY;  
  71.   
  72.     if(SOCKET_ERROR == ::bind(s,(sockaddr*)&sin,sizeof(sin)))  
  73.     {  
  74.         return -1;  
  75.     }  
  76.   
  77.     ::listen(s,5);  
  78.   
  79.     HANDLE hThread = CreateThread(NULL,0,WorkThread,NULL,0,NULL);  
  80.     CloseHandle(hThread);  
  81.   
  82.     while(true)  
  83.     {  
  84.         sockaddr_in addrRemote;  
  85.         int nAddrLen = sizeof(addrRemote);  
  86.         SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);  
  87.         if(sClient != SOCKET_ERROR)  
  88.         {  
  89.             EnterCriticalSection(&g_cs);  
  90.             g_vecClients.push_back(sClient);  
  91.             LeaveCriticalSection(&g_cs);  
  92.             if(g_vecClients.size() == 1)  
  93.                 SetEvent(g_StartEvent);  
  94.         }  
  95.         else if(WSAEWOULDBLOCK == WSAGetLastError())  
  96.         {  
  97.             printf(".");  
  98.             Sleep(100);  
  99.         }  
  100.         else  
  101.         {  
  102.             printf("accept failed! %d\n",WSAGetLastError());  
  103.         }  
  104.     }  
  105.     closesocket(s);  
  106.     WSACleanup();  
  107.     CloseHandle(g_StartEvent);  
  108.     DeleteCriticalSection(&g_cs);  
  109. }  

三,select模型

  1. #include <WinSock2.h>
      
  2. #include <Windows.h>
      
  3. #include <MSWSock.h>
      
  4. #include <stdio.h>
      
  5. #include <map>
      
  6. using namespace std;  
  7.   
  8.   
  9. #pragma comment(lib,"Ws2_32.lib")
      
  10. #pragma comment(lib,"Mswsock.lib")
      
  11.   
  12.   
  13. struct ThreadObj{  
  14.     OVERLAPPED *pOl;  
  15.     HANDLE s;  
  16. };  
  17.   
  18.   
  19. int g_iIndex = 0;  
  20. map<SOCKET,char*> g_map;  
  21.   
  22.   
  23. int main()  
  24. {  
  25.     WSAData wsaData;  
  26.     if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))  
  27.     {  
  28.         printf("初始化失败!%d\n",WSAGetLastError());  
  29.         Sleep(5000);  
  30.         return -1;  
  31.     }  
  32.     USHORT nport = 3456;  
  33.     SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
  34.     u_long ul = 1;  
  35.     ioctlsocket(sListen,FIONBIO,&ul);  
  36.     sockaddr_in sin;  
  37.     sin.sin_family = AF_INET;  
  38.     sin.sin_port = htons(nport);  
  39.     sin.sin_addr.S_un.S_addr = ADDR_ANY;  
  40.   
  41.   
  42.     if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))  
  43.     {  
  44.         printf("bind failed!%d\n",WSAGetLastError());  
  45.         Sleep(5000);  
  46.         return -1;  
  47.     }  
  48.   
  49.   
  50.     listen(sListen,5);  
  51.   
  52.   
  53.     //1)初始化一个套接字集合fdSocket,并将监听套接字放入
      
  54.     fd_set fdSocket;  
  55.     FD_ZERO(&fdSocket);  
  56.     FD_SET(sListen,&fdSocket);  
  57.     TIMEVAL time={1,0};  
  58.     char buf[4096];  
  59.     fd_set fdWrite;  
  60.     FD_ZERO(&fdWrite);  
  61.     while(true)  
  62.     {  
  63.         //2)将fdSocket的一个拷贝fdRead传给select函数
      
  64.         fd_set fdRead = fdSocket;  
  65.         fd_set fdTmp = fdWrite;  
  66.         int nRetAll = 0;  
  67.         if(fdTmp.fd_count > 0)  
  68.             nRetAll = select(0,&fdRead,&fdTmp,NULL,NULL/*&time*/);//若不设置超时则select为阻塞
      
  69.         else  
  70.             nRetAll = select(0,&fdRead,NULL,NULL,NULL/*&time*/);  
  71.         if(nRetAll > 0)  
  72.         {  
  73.             //3)通过将原来的fdSocket和被select处理过的fdRead进行比较,决定由哪些socket有数据可以读取
      
  74.             for(int i=0;i<fdSocket.fd_count;i++)  
  75.             {  
  76.                 if(FD_ISSET(fdSocket.fd_array[i],&fdRead))  
  77.                 {  
  78.                     if(fdSocket.fd_array[i] == sListen)  
  79.                     {  
  80.                         if(fdSocket.fd_count < FD_SETSIZE)  
  81.                         {  
  82.                             sockaddr_in addrRemote;  
  83.                             int nAddrLen = sizeof(addrRemote);  
  84.                             SOCKET sClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);  
  85.                             FD_SET(sClient,&fdSocket);  
  86.                             printf("接收到连接:(%s)\n",inet_ntoa(addrRemote.sin_addr));  
  87.                         }  
  88.                         else  
  89.                         {  
  90.                             printf("连接数量已达上限!\n");  
  91.                             continue;  
  92.                         }  
  93.                     }  
  94.                     else  
  95.                     {  
  96.                         int nRecv = recv(fdSocket.fd_array[i],buf,4096,0);  
  97.                         if(nRecv > 0)  
  98.                         {  
  99.                             buf[nRecv] = 0;  
  100.                               
  101.                             printf("收到数据:%s\n",buf);  
  102.                           
  103.                             int nRet = send(fdSocket.fd_array[i],buf,nRecv,0);  
  104.                             if(nRet <= 0)  
  105.                             {  
  106.                                 SOCKET s = fdSocket.fd_array[i];  
  107.                                 if(GetLastError() == WSAEWOULDBLOCK)  
  108.                                 {                                     
  109.                                     if(g_map.find(s) == g_map.end())  
  110.                                     {  
  111.                                         char* szTmp = new char[nRecv + 1];  
  112.                                         strncpy(szTmp,buf,nRecv);  
  113.                                         szTmp[nRecv] = 0;  
  114.                                         g_map[s] = szTmp;  
  115.                                     }  
  116.                                     else  
  117.                                     {  
  118.                                         char* szOld = g_map[s];  
  119.                                         char* szTmp2 = new char[strlen(szOld) + nRecv + 1];  
  120.                                         strncpy(szTmp2,szOld,strlen(szOld));  
  121.                                         strncpy(szTmp2 + strlen(szOld),buf,nRecv);  
  122.                                         szTmp2[strlen(szOld) + nRecv] = 0;  
  123.                                         delete [] szOld;  
  124.                                         g_map[s] = szTmp2;  
  125.                                     }  
  126.                                     FD_SET(fdSocket.fd_array[i],&fdWrite);  
  127.                                 }  
  128.                                 else  
  129.                                 {  
  130.                                     closesocket(fdSocket.fd_array[i]);  
  131.                                     if(g_map.find(s) != g_map.end())  
  132.                                     {  
  133.                                         if(g_map[s] != NULL)  
  134.                                             delete [] g_map[s];  
  135.                                         g_map.erase(s);  
  136.                                     }  
  137.                                     FD_CLR(fdSocket.fd_array[i],&fdSocket);  
  138.                                 }  
  139.                             }  
  140.                             printf("发送了%d\n",nRet);  
  141.                         }  
  142.                         else  
  143.                         {  
  144.                             printf("1个Client已断开\n");  
  145.                             closesocket(fdSocket.fd_array[i]);  
  146.                             FD_CLR(fdSocket.fd_array[i],&fdSocket);  
  147.                         }  
  148.                     }  
  149.                 }  
  150.                 if(FD_ISSET(fdSocket.fd_array[i],&fdTmp))  
  151.                 {  
  152.                     SOCKET s = fdSocket.fd_array[i];  
  153.                     if(g_map.find(s) != g_map.end())  
  154.                     {  
  155.                         char* szToSend = g_map[s];  
  156.                         int nToSend = strlen(szToSend);  
  157.                         int nRet = send(fdSocket.fd_array[i],szToSend,nToSend,0);  
  158.                         if(nRet <= 0)  
  159.                         {  
  160.                             if(GetLastError() == WSAEWOULDBLOCK)  
  161.                             {                                     
  162.                                 //do nothing
      
  163.                             }  
  164.                             else  
  165.                             {  
  166.                                 closesocket(fdSocket.fd_array[i]);  
  167.                                 if(g_map.find(s) != g_map.end())  
  168.                                 {  
  169.                                     if(g_map[s] != NULL)  
  170.                                         delete [] g_map[s];  
  171.                                     g_map.erase(s);  
  172.                                 }  
  173.                                 FD_CLR(fdSocket.fd_array[i],&fdSocket);  
  174.                             }  
  175.                         }  
  176.                         else if(nRet < nToSend)  
  177.                         {  
  178.                             printf("发送了%d/%d\n",nRet,nToSend);  
  179.                             nToSend -= nRet;  
  180.                             char* szTmp = new char[nToSend + 1];  
  181.                             strncpy(szTmp,szToSend + nRet,nToSend);  
  182.                             szTmp[nToSend] = 0;  
  183.                             delete [] szToSend;  
  184.                             g_map[s] = szTmp;                     
  185.                         }  
  186.                         else  
  187.                         {  
  188.                             if(g_map[s] != NULL)  
  189.                                 delete [] g_map[s];  
  190.                             g_map.erase(s);  
  191.                             FD_CLR(fdSocket.fd_array[i],&fdWrite);  
  192.                         }  
  193.                         printf("============================================发送了%d\n",nRet);  
  194.                     }  
  195.                 }  
  196.             }  
  197.         }  
  198.         else if(nRetAll == 0)  
  199.         {  
  200.             printf("time out!\n");  
  201.         }  
  202.         else  
  203.         {  
  204.             printf("select error!%d\n",WSAGetLastError());  
  205.             Sleep(5000);  
  206.             break;  
  207.         }  
  208.     }  
  209.     closesocket(sListen);  
  210.     WSACleanup();  
  211. }  

四,异步选择模型

注意:收到FD_Write消息有2种情况:1,在socket第一次和窗口句柄绑定后。2,socket从不可写状态变成可写状态。下面的事件选择模型也是同理。

  1. #include <WinSock2.h>
      
  2. #include <Windows.h>
      
  3. #include <stdio.h>
      
  4. #include <map>
      
  5. using namespace std;  
  6.   
  7.   
  8. #pragma comment(lib,"Ws2_32.lib")
      
  9.   
  10.   
  11. #define WM_SOCKET (WM_USER + 100) 
      
  12.   
  13.   
  14. map<SOCKET,char*> g_map;  
  15. LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam)  
  16. {  
  17.     switch(uMsg)  
  18.     {  
  19.     case WM_SOCKET:  
  20.         {  
  21.             SOCKET s = wParam;  
  22.             if(WSAGETSELECTERROR(lParam))  
  23.             {  
  24.                 printf("消息错误!\n");  
  25.                 closesocket(s);  
  26.                 return 0;  
  27.             }  
  28.   
  29.   
  30.             switch(WSAGETSELECTEVENT(lParam))  
  31.             {  
  32.             case FD_ACCEPT:  
  33.                 {  
  34.                     sockaddr_in addrRemote;  
  35.                     int nAddrLen = sizeof(addrRemote);  
  36.                     SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);  
  37.                     WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE);  
  38.                 }break;  
  39.             case FD_WRITE:  
  40.                 {  
  41.                     printf("write====================\n");  
  42.                     if(!g_map.empty())  
  43.                     {  
  44.                         char* buf = g_map[s];  
  45.                         int nLenth = strlen(buf);  
  46.                         while(nLenth > 0)  
  47.                         {  
  48.                             int nRet = send(s,buf,nLenth,0);  
  49.                             if(nRet > 0)  
  50.                             {  
  51.                                 buf += nRet;  
  52.                                 nLenth -= nRet;  
  53.                             }  
  54.                             else if(10035 == GetLastError())  
  55.                             {  
  56.                                 char* newBuf = new char[nLenth + 1];  
  57.                                 strncpy(newBuf,buf,nLenth);  
  58.                                 newBuf[nLenth] = 0;  
  59.                                 delete [] g_map[s];  
  60.                                 g_map[s] = newBuf;  
  61.                                 break;  
  62.                             }  
  63.                             else  
  64.                             {  
  65.                                 delete [] g_map[s];  
  66.                                 g_map.erase(s);  
  67.                                 closesocket(s);  
  68.                             }  
  69.                         }  
  70.                         if(nLenth == 0)  
  71.                         {  
  72.                             g_map.erase(s);  
  73.                         }  
  74.                     }  
  75.                 }break;  
  76.             case FD_READ:  
  77.                 {  
  78.                     char buf[4096];  
  79.                     int nRet = recv(s,buf,4096,0);  
  80.                     if(nRet > 0)  
  81.                     {  
  82.                         buf[nRet] = 0;  
  83.                         //printf("收到数据:%s\n",buf);
      
  84.                         int x = send(s,buf,nRet,0);  
  85.                         printf("已发送字节数:%d , 线程号:%d\n",x,GetCurrentThreadId());  
  86.                         if(x < 0)  
  87.                         {  
  88.                             int iError = GetLastError();  
  89.                             printf("数据:%s ,错误:%d\n",buf,iError);  
  90.                             if(10035 == iError)  
  91.                             {  
  92.                                 if(g_map.end() != g_map.find(s))  
  93.                                 {  
  94.                                     int newLength = strlen(g_map[s]) + strlen(buf);  
  95.                                     char* newBuf = new char[newLength + 1];  
  96.                                     strncpy(newBuf,g_map[s],strlen(g_map[s]));  
  97.                                     strncpy(newBuf+strlen(g_map[s]),buf,strlen(buf));  
  98.                                     newBuf[newLength] = 0;  
  99.                                     delete [] g_map[s];  
  100.                                     g_map[s] = newBuf;  
  101.                                 }  
  102.                                 else  
  103.                                 {  
  104.                                     char* newBuf = new char[strlen(buf) + 1];  
  105.                                     strncpy(newBuf,buf,strlen(buf));  
  106.                                     newBuf[strlen(buf)] = 0;  
  107.                                     g_map[s] = newBuf;  
  108.                                 }  
  109.                             }  
  110.                             else  
  111.                             {  
  112.                                 if(g_map.end() != g_map.find(s))  
  113.                                 {  
  114.                                     delete [] g_map[s];  
  115.                                     g_map.erase(s);  
  116.                                 }  
  117.                                 closesocket(s);  
  118.                             }  
  119.                         }     
  120.                     }  
  121.                     else  
  122.                     {  
  123.                         printf("1个Client已经断开1111!\n");  
  124.                         if(g_map.end() != g_map.find(s))  
  125.                         {  
  126.                             delete [] g_map[s];  
  127.                             g_map.erase(s);  
  128.                         }  
  129.                         closesocket(s);  
  130.                     }  
  131.                 }break;  
  132.             case FD_CLOSE:  
  133.                 {  
  134.                     printf("1个Client已经断开222!\n");  
  135.                     if(g_map.end() != g_map.find(s))  
  136.                     {  
  137.                         delete [] g_map[s];  
  138.                         g_map.erase(s);  
  139.                     }  
  140.                     closesocket(s);  
  141.                 }break;   
  142.             }  
  143.         }break;  
  144.     case WM_DESTROY:  
  145.         {  
  146.             printf("窗口已关闭!\n");  
  147.             PostQuitMessage(0);  
  148.         }  
  149.     }  
  150.     return DefWindowProc(hwnd,uMsg,wParam,lParam);  
  151. }  
  152.   
  153.   
  154. int main()  
  155. {  
  156.     char szClassName[] = "WSAAsyncSelect Test";  
  157.     static WNDCLASSEX wndClass;  
  158.     wndClass.cbSize = sizeof(wndClass);  
  159.     wndClass.style = CS_HREDRAW | CS_VREDRAW;  
  160.     wndClass.lpfnWndProc = WindowProc;  
  161.     wndClass.cbClsExtra = 0;  
  162.     wndClass.cbWndExtra = 0;  
  163.     wndClass.hInstance = GetModuleHandle(0);  
  164.     wndClass.hIcon = LoadIcon(NULL,IDI_APPLICATION);  
  165.     wndClass.hCursor = LoadCursor(NULL,IDC_ARROW);  
  166.     wndClass.hbrBackground = (HBRUSH)GetStockObject(WHITE_BRUSH);  
  167.     wndClass.lpszMenuName = NULL;  
  168.     wndClass.lpszClassName = szClassName;  
  169.     wndClass.hIconSm = NULL;  
  170.   
  171.   
  172.     ATOM atom = RegisterClassEx(&wndClass);  
  173.     if(0 == atom)  
  174.     {  
  175.         char error[256];  
  176.         sprintf(error,"RegisterClassEx错误!%d",GetLastError());  
  177.         MessageBox(NULL,error,"error",MB_OK);  
  178.         return -1;  
  179.     }  
  180.     HWND hwnd = CreateWindowEx(0,(char *)atom,"",WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT,  
  181.         CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL);  
  182.     if(hwnd == NULL)  
  183.     {  
  184.         char error[256];  
  185.         sprintf(error,"创建窗口错误!%d",GetLastError());  
  186.         MessageBox(NULL,error,"error",MB_OK);  
  187.         return -1;  
  188.     }  
  189.   
  190.   
  191.     WSAData wsaData;  
  192.     if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))  
  193.     {  
  194.         printf("初始化失败!%d\n",WSAGetLastError());  
  195.         Sleep(5000);  
  196.         return -1;  
  197.     }  
  198.     USHORT nport = 3456;  
  199.     SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
  200.     sockaddr_in sin;  
  201.     sin.sin_family = AF_INET;  
  202.     sin.sin_port = htons(nport);  
  203.     sin.sin_addr.S_un.S_addr = ADDR_ANY;  
  204.   
  205.   
  206.     if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))  
  207.     {  
  208.         printf("bind failed!%d\n",WSAGetLastError());  
  209.         Sleep(5000);  
  210.         return -1;  
  211.     }  
  212.   
  213.   
  214.     WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE);  
  215.     listen(sListen,5);  
  216.   
  217.   
  218.     MSG msg;  
  219.     while(GetMessage(&msg,NULL,0,0))  
  220.     {  
  221.         TranslateMessage(&msg);  
  222.         DispatchMessage(&msg);  
  223.     }  
  224.     closesocket(sListen);  
  225.     WSACleanup();  
  226.     return msg.wParam;  
  227. }  

五,事件选择模型

事件选择模型主要难点是对线程池的使用,send操作可以参考异步选择模型。

  1. #include <WinSock2.h>
      
  2. #include <Windows.h>
      
  3. #include <stdio.h>
      
  4. #include <vector>
      
  5. using namespace std;  
  6.   
  7.   
  8. #pragma comment(lib,"Ws2_32.lib")
      
  9.   
  10.   
  11. typedef struct _THREAD_OBJ  
  12. {  
  13.     HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];  
  14.     SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS];  
  15.     int nSocksUsed;  
  16.     CRITICAL_SECTION cs;  
  17.     _THREAD_OBJ *pNext;  
  18. }THREAD_OBJ,*PTHREAD_OBJ;  
  19.   
  20.   
  21. PTHREAD_OBJ g_pThreadList = NULL;  
  22. CRITICAL_SECTION g_cs;  
  23. BOOL g_bServerRunning = FALSE;  
  24. HANDLE g_hThreads[1000] = {0};  
  25. int g_nThreadsCount = 0;  
  26.   
  27.   
  28. PTHREAD_OBJ CreateThreadObj()  
  29. {   
  30.     PTHREAD_OBJ pThread = new THREAD_OBJ();  
  31.     if(pThread != NULL)  
  32.     {  
  33.         InitializeCriticalSectionAndSpinCount(&pThread->cs,4000);  
  34.         pThread->events[0] = WSACreateEvent();  
  35.         pThread->nSocksUsed = 1;  
  36.         EnterCriticalSection(&g_cs);  
  37.         pThread->pNext = g_pThreadList;  
  38.         g_pThreadList = pThread;  
  39.         LeaveCriticalSection(&g_cs);  
  40.     }  
  41.     return pThread;  
  42. }  
  43.   
  44.   
  45. void FreeThreadObj(PTHREAD_OBJ pThread)  
  46. {  
  47.     if(pThread == NULL)  
  48.         return;  
  49.     EnterCriticalSection(&g_cs);  
  50.     PTHREAD_OBJ p = g_pThreadList;  
  51.     if(p == pThread)  
  52.     {  
  53.         g_pThreadList = p->pNext;  
  54.     }  
  55.     else  
  56.     {  
  57.         while(p != NULL && p->pNext != pThread)  
  58.         {  
  59.             p = p->pNext;  
  60.         }  
  61.         if(p != NULL)  
  62.         {  
  63.             p->pNext = pThread->pNext;  
  64.         }  
  65.     }  
  66.     LeaveCriticalSection(&g_cs);  
  67.   
  68.   
  69.     DeleteCriticalSection(&pThread->cs);  
  70.     WSACloseEvent(pThread->events[0]);  
  71.     delete pThread;  
  72. }  
  73.   
  74.   
  75. LONG g_nTotalConnections;  
  76. LONG g_nCurrentConnections;  
  77.   
  78.   
  79. BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s)  
  80. {  
  81.     if(pThread == NULL || s == INVALID_SOCKET)  
  82.         return FALSE;  
  83.   
  84.   
  85.     BOOL bRet = FALSE;  
  86.     EnterCriticalSection(&pThread->cs);  
  87.     if(pThread->nSocksUsed < WSA_MAXIMUM_WAIT_EVENTS)  
  88.     {  
  89.         pThread->events[pThread->nSocksUsed] = WSACreateEvent();  
  90.         pThread->sockets[pThread->nSocksUsed] = s;  
  91.         WSAEventSelect(s,pThread->events[pThread->nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE);  
  92.         pThread->nSocksUsed++;  
  93.         bRet = TRUE;  
  94.         WSASetEvent(pThread->events[0]);//通知线程,有新的事件加入了,需要重新调用WSAWaitFormultipleEvents
      
  95.     }  
  96.     LeaveCriticalSection(&pThread->cs);  
  97.   
  98.   
  99.     if(bRet)  
  100.     {  
  101.         InterlockedIncrement(&g_nTotalConnections);  
  102.         InterlockedIncrement(&g_nCurrentConnections);  
  103.     }  
  104.     return bRet;  
  105. }  
  106.   
  107.   
  108. void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s)  
  109. {  
  110.     if(pThread == NULL || s == INVALID_SOCKET)  
  111.         return;  
  112.     EnterCriticalSection(&pThread->cs);  
  113.     for(int i=1;i<pThread->nSocksUsed;i++)  
  114.     {  
  115.         if(pThread->sockets[i] == s)  
  116.         {  
  117.             WSACloseEvent(pThread->events[i]);  
  118.             closesocket(s);  
  119.             for(int j=i;j<pThread->nSocksUsed - 1;j++)  
  120.             {  
  121.                 pThread->events[j] = pThread->events[j+1];  
  122.                 pThread->sockets[j] = pThread->sockets[j+1];  
  123.             }  
  124.             pThread->nSocksUsed--;  
  125.             break;  
  126.         }  
  127.     }  
  128.     LeaveCriticalSection(&pThread->cs);  
  129.     InterlockedDecrement(&g_nCurrentConnections);  
  130. }  
  131.   
  132.   
  133. BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex)  
  134. {  
  135.     WSANETWORKEVENTS event;  
  136.     SOCKET s = pThread->sockets[nIndex];  
  137.     HANDLE sEvent = pThread->events[nIndex];  
  138.     if(0 != WSAEnumNetworkEvents(s,sEvent,&event))  
  139.     {  
  140.         printf("socket error!\n");  
  141.         RemoveSocket(pThread,s);  
  142.         return FALSE;  
  143.     }  
  144.     do   
  145.     {  
  146.         if(event.lNetworkEvents & FD_READ)  
  147.         {  
  148.             if(event.iErrorCode[FD_READ_BIT] == 0)  
  149.             {  
  150.                 char szText[256];  
  151.                 int nRecv = recv(s,szText,strlen(szText),0);  
  152.                 if(nRecv > 0)  
  153.                 {  
  154.                     szText[nRecv] = '\0';  
  155.                     printf("接收到数据:%s\n",szText);          
  156.                 }  
  157.                 else  
  158.                 {  
  159.                     break;  
  160.                 }  
  161.             }  
  162.             else  
  163.                 break;  
  164.         }  
  165.         else if(event.lNetworkEvents & FD_CLOSE)  
  166.         {  
  167.             break;  
  168.         }  
  169.         else if(event.lNetworkEvents & FD_WRITE)  
  170.         {  
  171.             printf("FD_WRITE==========================\n");  
  172.         }  
  173.         return TRUE;  
  174.     } while (FALSE);  
  175.     printf("socket error2!\n");  
  176.     RemoveSocket(pThread,s);  
  177.     return FALSE;  
  178. }  
  179.   
  180.   
  181. DWORD WINAPI ServerThread(LPVOID lpParam)  
  182. {  
  183.     PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;  
  184.   
  185.   
  186.     while(TRUE)  
  187.     {  
  188.         int nIndex = WSAWaitForMultipleEvents(  
  189.             pThread->nSocksUsed,pThread->events,FALSE,WSA_INFINITE,FALSE);  
  190.         nIndex = nIndex - WSA_WAIT_EVENT_0;  
  191.   
  192.   
  193.         if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)  
  194.         {  
  195.             printf("WSAWaitForMultipleEvents error!\n");  
  196.             continue;  
  197.         }  
  198.         else if(nIndex == 0)  
  199.         {  
  200.             ResetEvent(pThread->events[0]);  
  201.         }  
  202.         else  
  203.         {  
  204.             HandleIo(pThread,nIndex);  
  205.         }  
  206.         if(!g_bServerRunning && pThread->nSocksUsed == 1)  
  207.             break;  
  208.     }  
  209.     FreeThreadObj(pThread);  
  210.     return 0;  
  211. }  
  212.   
  213.   
  214. BOOL AssignToFreeThread(SOCKET s)  
  215. {  
  216.     if(s == INVALID_SOCKET)  
  217.         return FALSE;  
  218.     BOOL bAllSucceed = TRUE;  
  219.     EnterCriticalSection(&g_cs);  
  220.     PTHREAD_OBJ pThread = g_pThreadList;  
  221.     while(pThread != NULL)  
  222.     {  
  223.         if(InsertSocket(pThread,s))  
  224.         {  
  225.             break;  
  226.         }  
  227.         pThread = pThread->pNext;  
  228.     }  
  229.     if(pThread == NULL)  
  230.     {  
  231.         if(g_nThreadsCount < 1000)  
  232.         {  
  233.             pThread = CreateThreadObj();  
  234.             HANDLE hThread = CreateThread(NULL,0,ServerThread,pThread,0,NULL);  
  235.             if(!hThread)  
  236.             {  
  237.                 bAllSucceed = FALSE;  
  238.                 FreeThreadObj(pThread);  
  239.             }  
  240.             else  
  241.             {  
  242.                 g_hThreads[g_nThreadsCount++] = hThread;  
  243.                 InsertSocket(pThread,s);  
  244.             }  
  245.         }  
  246.         else  
  247.             bAllSucceed = FALSE;  
  248.     }  
  249.     LeaveCriticalSection(&g_cs);  
  250.     return bAllSucceed;  
  251. }  
  252.   
  253.   
  254. DWORD WINAPI ControlThread(LPVOID lpParma)  
  255. {  
  256.     HANDLE wsaEvent = (HANDLE)lpParma;  
  257.     char cmd[128];  
  258.     while(scanf("%s",cmd))  
  259.     {  
  260.         if(cmd[0] == 's')  
  261.         {  
  262.             g_bServerRunning = FALSE;  
  263.             EnterCriticalSection(&g_cs);  
  264.             PTHREAD_OBJ pThread = g_pThreadList;  
  265.             while(pThread != NULL)  
  266.             {  
  267.                 EnterCriticalSection(&pThread->cs);  
  268.                 for(int i=0;i<pThread->nSocksUsed;i++)  
  269.                 {  
  270.                     closesocket(pThread->sockets[i]);  
  271.                 }  
  272.                 WSASetEvent(pThread->events[0]);  
  273.                 LeaveCriticalSection(&pThread->cs);  
  274.                 pThread = pThread->pNext;  
  275.             }  
  276.             LeaveCriticalSection(&g_cs);  
  277.             WSASetEvent(wsaEvent);  
  278.             break;  
  279.         }  
  280.     }  
  281.     return 0;  
  282. }  
  283.   
  284.   
  285. int main()  
  286. {  
  287.     WSAData wsaData;  
  288.     if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))  
  289.     {  
  290.         printf("初始化失败!%d\n",WSAGetLastError());  
  291.         Sleep(5000);  
  292.         return -1;  
  293.     }  
  294.     USHORT nport = 3456;  
  295.     SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
  296.     sockaddr_in sin;  
  297.     sin.sin_family = AF_INET;  
  298.     sin.sin_port = htons(nport);  
  299.     sin.sin_addr.S_un.S_addr = ADDR_ANY;  
  300.   
  301.   
  302.     if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))  
  303.     {  
  304.         printf("bind failed!%d\n",WSAGetLastError());  
  305.         Sleep(5000);  
  306.         return -1;  
  307.     }  
  308.   
  309.   
  310.     listen(sListen,200);  
  311.   
  312.   
  313.     WSAEVENT wsaEvent = WSACreateEvent();  
  314.     WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE);  
  315.     InitializeCriticalSectionAndSpinCount(&g_cs,4000);  
  316.     g_bServerRunning = TRUE;  
  317.     HANDLE hThread = CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL);  
  318.     CloseHandle(hThread);  
  319.     while(TRUE)  
  320.     {  
  321.         int nRet = WaitForSingleObject(wsaEvent,5*1000);  
  322.         if(!g_bServerRunning)  
  323.         {  
  324.             closesocket(sListen);  
  325.             WSACloseEvent(wsaEvent);  
  326.             WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE);  
  327.             for(int i=0;i<g_nThreadsCount;i++)  
  328.             {  
  329.                 CloseHandle(g_hThreads[i]);  
  330.             }  
  331.             break;  
  332.         }  
  333.         if(nRet == WAIT_FAILED)  
  334.         {  
  335.             printf("WaitForSingleObject Failed!\n");  
  336.             break;  
  337.         }  
  338.         else if(nRet == WAIT_TIMEOUT)  
  339.         {  
  340.             printf("\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n",  
  341.                 g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount);  
  342.             continue;  
  343.         }  
  344.         else  
  345.         {  
  346.             ResetEvent(wsaEvent);  
  347.             while(TRUE)  
  348.             {  
  349.                 sockaddr_in addrRemote;  
  350.                 int nLen = sizeof(addrRemote);  
  351.                 SOCKET sNew = accept(sListen,(sockaddr*)&addrRemote,&nLen);  
  352.                 if(sNew == SOCKET_ERROR)  
  353.                     break;  
  354.                   
  355.                 if(!AssignToFreeThread(sNew))  
  356.                 {  
  357.                     closesocket(sNew);  
  358.                     printf("AssignToFreeThread Failed!\n");  
  359.                 }  
  360.             }  
  361.         }  
  362.     }  
  363.     DeleteCriticalSection(&g_cs);  
  364.     return 0;  
  365. }  

六,重叠I/O模型。

若需要建线程池,可参考事件选择模型。若纠结于send,可参考下面的IOCP。

  1. #include <WinSock2.h>
      
  2. #include <Windows.h>
      
  3. #include <MSWSock.h>
      
  4. #include <stdio.h>
      
  5.   
  6.   
  7. #pragma comment(lib,"Ws2_32.lib")
      
  8.   
  9.   
  10. #define BUFFER_SIZE 4096
      
  11.   
  12.   
  13. typedef struct _SOCKET_OBJ  
  14. {  
  15.     SOCKET s;  
  16.     int nOutstandingOps;  
  17.     LPFN_ACCEPTEX lpfnAcceptEx;  
  18. }SOCKET_OBJ,*PSOCKET_OBJ;  
  19.   
  20.   
  21. PSOCKET_OBJ CreateSocketObj(SOCKET s)  
  22. {  
  23.     PSOCKET_OBJ pSocket = new SOCKET_OBJ();  
  24.     if(pSocket != NULL)  
  25.         pSocket->s = s;  
  26.     return pSocket;  
  27. }  
  28.   
  29.   
  30. void FreeSocketObj(PSOCKET_OBJ pSocket)  
  31. {  
  32.     if(pSocket == NULL)  
  33.         return;  
  34.     if(pSocket->s != INVALID_SOCKET)  
  35.         closesocket(pSocket->s);  
  36.     delete pSocket;  
  37. }  
  38.   
  39.   
  40. typedef struct _BUFFER_OBJ  
  41. {  
  42.     OVERLAPPED ol;  
  43.     char* buff;  
  44.     int nLen;  
  45.     PSOCKET_OBJ pSocket;  
  46.     int nOperation;  
  47. #define OP_ACCEPT 1
      
  48. #define OP_READ 2
      
  49. #define OP_WRITE 3
      
  50.     SOCKET sAccept;  
  51.     _BUFFER_OBJ* pNext;  
  52. }BUFFER_OBJ,*PBUFFER_OBJ;  
  53.   
  54.   
  55.   
  56.   
  57.   
  58.   
  59. HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];  
  60. int g_nBufferCount;  
  61. PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;  
  62. BOOL g_bServerRunning;  
  63. CRITICAL_SECTION g_cs;  
  64.   
  65.   
  66. PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)  
  67. {  
  68.     if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1)  
  69.         return NULL;  
  70.     PBUFFER_OBJ pBuffer = new BUFFER_OBJ();  
  71.     if(pBuffer != NULL)  
  72.     {  
  73.         pBuffer->buff = new char[nLen];  
  74.         pBuffer->nLen = nLen;  
  75.         pBuffer->ol.hEvent = WSACreateEvent();  
  76.         pBuffer->pSocket = pSocket;  
  77.         pBuffer->sAccept = INVALID_SOCKET;  
  78.         pBuffer->pNext = NULL;  
  79.         EnterCriticalSection(&g_cs);  
  80.         if(g_pBufferHeader == NULL)  
  81.         {  
  82.             g_pBufferHeader = g_pBufferTail = pBuffer;  
  83.         }  
  84.         else  
  85.         {  
  86.             g_pBufferTail->pNext = pBuffer;  
  87.             g_pBufferTail = pBuffer;  
  88.         }  
  89.         LeaveCriticalSection(&g_cs);  
  90.         g_events[++g_nBufferCount] = pBuffer->ol.hEvent;  
  91.     }  
  92.     return pBuffer;  
  93. }  
  94.   
  95.   
  96. void FreeBufferObj(PBUFFER_OBJ pBuffer)  
  97. {  
  98.     EnterCriticalSection(&g_cs);  
  99.     PBUFFER_OBJ pTest = g_pBufferHeader;  
  100.     BOOL bFind = FALSE;  
  101.     if(pTest == pBuffer)  
  102.     {  
  103.         if(g_pBufferHeader == g_pBufferTail)  
  104.             g_pBufferHeader = g_pBufferTail = NULL;  
  105.         else  
  106.             g_pBufferHeader = g_pBufferHeader->pNext;  
  107.         bFind = TRUE;  
  108.     }  
  109.     else  
  110.     {  
  111.         while(pTest != NULL && pTest->pNext != pBuffer)  
  112.             pTest = pTest->pNext;  
  113.         if(pTest != NULL)  
  114.         {  
  115.             pTest->pNext = pBuffer->pNext;  
  116.             if(pTest->pNext == NULL)  
  117.                 g_pBufferTail = pTest;  
  118.             bFind = TRUE;  
  119.         }  
  120.     }  
  121.       
  122.     if(bFind)  
  123.     {  
  124.         g_nBufferCount--;  
  125.         WSACloseEvent(pBuffer->ol.hEvent);  
  126.         delete [] pBuffer->buff;  
  127.         delete pBuffer;  
  128.     }  
  129.     LeaveCriticalSection(&g_cs);  
  130. }  
  131.   
  132.   
  133. PBUFFER_OBJ FindBufferObj(HANDLE hEvent)  
  134. {  
  135.     if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE)  
  136.         return NULL;  
  137.     EnterCriticalSection(&g_cs);  
  138.     PBUFFER_OBJ pTest = g_pBufferHeader;  
  139.     while(pTest != NULL && pTest->ol.hEvent != hEvent)  
  140.         pTest = pTest->pNext;  
  141.     LeaveCriticalSection(&g_cs);  
  142.     return pTest;  
  143. }  
  144.   
  145.   
  146. void RebuildArray()  
  147. {  
  148.     EnterCriticalSection(&g_cs);  
  149.     PBUFFER_OBJ pBuffer = g_pBufferHeader;  
  150.     int i=1;  
  151.     while(pBuffer != NULL)  
  152.     {  
  153.         g_events[i++] = pBuffer->ol.hEvent;  
  154.         pBuffer = pBuffer->pNext;  
  155.     }  
  156.     LeaveCriticalSection(&g_cs);  
  157. }  
  158.   
  159.   
  160. BOOL PostAccept(PBUFFER_OBJ pBuffer)  
  161. {  
  162.     PSOCKET_OBJ pSocket = pBuffer->pSocket;  
  163.     if(pSocket->lpfnAcceptEx != NULL)  
  164.     {  
  165.         pBuffer->nOperation = OP_ACCEPT;  
  166.         pSocket->nOutstandingOps++;  
  167.   
  168.   
  169.         DWORD dwBytes;  
  170.         pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);  
  171.         BOOL b = pSocket->lpfnAcceptEx(pSocket->s,  
  172.             pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2),  
  173.             sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol);  
  174.         if(!b)  
  175.         {  
  176.             if(WSAGetLastError() != WSA_IO_PENDING)  
  177.                 return FALSE;  
  178.         }  
  179.         return TRUE;  
  180.     }  
  181.     return FALSE;  
  182. }  
  183.   
  184.   
  185. BOOL PostRecv(PBUFFER_OBJ pBuffer)  
  186. {  
  187.     pBuffer->nOperation = OP_READ;  
  188.     pBuffer->pSocket->nOutstandingOps++;  
  189.   
  190.   
  191.     DWORD dwBytes;  
  192.     DWORD dwFlags = 0;  
  193.     WSABUF buf;  
  194.     buf.buf = pBuffer->buff;  
  195.     buf.len = pBuffer->nLen;  
  196.     if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL))  
  197.     {  
  198.         if(WSAGetLastError() != WSA_IO_PENDING)  
  199.             return FALSE;  
  200.     }  
  201.     return TRUE;  
  202. }  
  203.   
  204.   
  205. BOOL PostSend(PBUFFER_OBJ pBuffer)  
  206. {  
  207.     pBuffer->nOperation = OP_WRITE;  
  208.     pBuffer->pSocket->nOutstandingOps++;  
  209.     DWORD dwBytes;  
  210.     DWORD dwFlags = 0;  
  211.     WSABUF buf;  
  212.     buf.buf = pBuffer->buff;  
  213.     buf.len = pBuffer->nLen;  
  214.     if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL))  
  215.     {  
  216.         if(WSAGetLastError() != WSA_IO_PENDING)  
  217.             return FALSE;  
  218.     }  
  219.     return TRUE;  
  220. }  
  221.   
  222.   
  223. BOOL HandleIo(PBUFFER_OBJ pBuffer)  
  224. {  
  225.     if(pBuffer == NULL)  
  226.         return FALSE;  
  227.   
  228.   
  229.     PSOCKET_OBJ pSocket = pBuffer->pSocket;  
  230.     pSocket->nOutstandingOps--;  
  231.   
  232.   
  233.     DWORD dwTrans;  
  234.     DWORD dwFlags;  
  235.     BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags);  
  236.     if(!bRet)  
  237.     {  
  238.         if(pSocket->s != INVALID_SOCKET)  
  239.         {  
  240.             closesocket(pSocket->s);  
  241.             pSocket->s = INVALID_SOCKET;  
  242.         }  
  243.         if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET)  
  244.         {  
  245.             closesocket(pBuffer->sAccept);  
  246.             pBuffer->sAccept = INVALID_SOCKET;  
  247.         }  
  248.         if(pSocket->nOutstandingOps == 0)  
  249.         {  
  250.             FreeSocketObj(pSocket);  
  251.         }  
  252.         FreeBufferObj(pBuffer);  
  253.         return FALSE;  
  254.     }  
  255.   
  256.   
  257.     switch(pBuffer->nOperation)  
  258.     {  
  259.     case OP_ACCEPT:  
  260.         {  
  261.             if(dwTrans > 0)  
  262.             {  
  263.                 pBuffer->buff[dwTrans] = 0;  
  264.                 printf("Accept收到数据:%s\n",pBuffer->buff);  
  265.   
  266.   
  267.                 PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept);  
  268.                 PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE);  
  269.                 if(pRecv == NULL)  
  270.                 {  
  271.                     printf("Too much connections!\n");  
  272.                     FreeSocketObj(pClient);  
  273.                     return FALSE;  
  274.                 }  
  275.                 RebuildArray();  
  276.                 if(!PostRecv(pRecv))  
  277.                 {  
  278.                     FreeSocketObj(pClient);  
  279.                     FreeBufferObj(pBuffer);  
  280.                     return FALSE;  
  281.                 }  
  282.             }  
  283.             else  
  284.             {  
  285.                 if(pSocket->s != INVALID_SOCKET)  
  286.                 {  
  287.                     closesocket(pSocket->s);  
  288.                     pSocket->s = INVALID_SOCKET;  
  289.                 }  
  290.                 if(pBuffer->sAccept != INVALID_SOCKET)  
  291.                 {  
  292.                     closesocket(pBuffer->sAccept);  
  293.                     pBuffer->sAccept = INVALID_SOCKET;  
  294.                 }  
  295.                 if(pSocket->nOutstandingOps == 0)  
  296.                 {  
  297.                     FreeSocketObj(pSocket);  
  298.                 }  
  299.                 FreeBufferObj(pBuffer);  
  300.             }  
  301. //          PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE);
      
  302.             //if(pSend == NULL)
      
  303.             //{
      
  304.             //  printf("Too much connections!\n");
      
  305.             //  FreeSocketObj(pClient);
      
  306.             //  return FALSE;
      
  307.             //}
      
  308.             //RebuildArray();
      
  309.             //pSend->nLen = dwTrans;
      
  310.             //memcpy(pSend->buff,pBuffer->buff,dwTrans);
      
  311.   
  312.   
  313.             //if(!PostSend(pSend))
      
  314.             //{
      
  315.             //  FreeSocketObj(pSocket);
      
  316.             //  FreeBufferObj(pBuffer);
      
  317.             //  return FALSE;
      
  318.             //}
      
  319.   
  320.   
  321.             PostAccept(pBuffer);  
  322.         }break;  
  323.     case OP_READ:  
  324.         {  
  325.             if(dwTrans > 0)  
  326.             {  
  327.                 pBuffer->buff[dwTrans] = 0;  
  328.                 printf("Recv收到数据:%s\n",pBuffer->buff);  
  329.                 PostRecv(pBuffer);  
  330.             }  
  331.             else  
  332.             {  
  333.                 if(pSocket->s != INVALID_SOCKET)  
  334.                 {  
  335.                     closesocket(pSocket->s);  
  336.                     pSocket->s = INVALID_SOCKET;  
  337.                 }  
  338.                 if(pSocket->nOutstandingOps == 0)  
  339.                 {  
  340.                     FreeSocketObj(pSocket);  
  341.                 }  
  342.                 FreeBufferObj(pBuffer);  
  343.             }  
  344.         }break;  
  345.     case OP_WRITE:  
  346.         {  
  347.             if(dwTrans > 0)  
  348.             {  
  349.                 pBuffer->buff[dwTrans] = 0;  
  350.                 printf("发送数据: %s 成功!\n",pBuffer->buff);  
  351.                 FreeBufferObj(pBuffer);  
  352.             }  
  353.             else  
  354.             {  
  355.                 if(pSocket->s != INVALID_SOCKET)  
  356.                 {  
  357.                     closesocket(pSocket->s);  
  358.                     pSocket->s = INVALID_SOCKET;  
  359.                 }  
  360.                 if(pSocket->nOutstandingOps == 0)  
  361.                 {  
  362.                     FreeSocketObj(pSocket);  
  363.                 }  
  364.                 FreeBufferObj(pBuffer);  
  365.             }  
  366.         }break;  
  367.     }  
  368. }  
  369.   
  370.   
  371. DWORD WINAPI ControlThread(LPVOID lpParma)  
  372. {  
  373.     char cmd[128];  
  374.     while(scanf("%s",cmd))  
  375.     {  
  376.         if(cmd[0] == 's')  
  377.         {  
  378.             g_bServerRunning = FALSE;  
  379.             EnterCriticalSection(&g_cs);  
  380.             PBUFFER_OBJ pBuffer = g_pBufferHeader;  
  381.             while(pBuffer != NULL)  
  382.             {  
  383.                 if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET)  
  384.                     closesocket(pBuffer->pSocket->s);  
  385.                 pBuffer = pBuffer->pNext;  
  386.             }  
  387.             LeaveCriticalSection(&g_cs);  
  388.             break;  
  389.         }  
  390.     }  
  391.     return 0;  
  392. }  
  393.   
  394.   
  395. int main()  
  396. {  
  397.     InitializeCriticalSectionAndSpinCount(&g_cs,4000);  
  398.     WSAData wsaData;  
  399.     if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))  
  400.     {  
  401.         printf("初始化失败!%d\n",WSAGetLastError());  
  402.         Sleep(5000);  
  403.         return -1;  
  404.     }  
  405.     USHORT nport = 3456;  
  406.     SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
  407.     sockaddr_in sin;  
  408.     sin.sin_family = AF_INET;  
  409.     sin.sin_port = htons(nport);  
  410.     sin.sin_addr.S_un.S_addr = ADDR_ANY;  
  411.   
  412.   
  413.     if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))  
  414.     {  
  415.         printf("bind failed!%d\n",WSAGetLastError());  
  416.         Sleep(5000);  
  417.         return -1;  
  418.     }  
  419.   
  420.   
  421.     listen(sListen,200);  
  422.   
  423.   
  424.   
  425.   
  426.     g_bServerRunning = TRUE;  
  427.     PSOCKET_OBJ pListen = CreateSocketObj(sListen);  
  428.     GUID GuidAcceptEx = WSAID_ACCEPTEX;  
  429.     DWORD dwBytes;  
  430.     WSAIoctl(pListen->s,  
  431.         SIO_GET_EXTENSION_FUNCTION_POINTER,  
  432.         &GuidAcceptEx,  
  433.         sizeof(GuidAcceptEx),  
  434.         &pListen->lpfnAcceptEx,  
  435.         sizeof(pListen->lpfnAcceptEx),  
  436.         &dwBytes,  
  437.         NULL,  
  438.         NULL);  
  439.     g_events[0] = WSACreateEvent();  
  440.   
  441.   
  442.     for(int i=0;i<5;++i)  
  443.     {  
  444.         PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));  
  445.     }  
  446.       
  447.     HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL);  
  448.     while(TRUE)  
  449.     {  
  450.         int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE);  
  451.         if(nIndex == WSA_WAIT_FAILED)  
  452.         {  
  453.             printf("WSAWaitForMultipleEvents Failed!\n");  
  454.             break;  
  455.         }  
  456.         nIndex = nIndex - WSA_WAIT_EVENT_0;  
  457.         for(int i=nIndex;i<= g_nBufferCount;i++)  
  458.         {  
  459.             int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE);  
  460.             if(nRet == WSA_WAIT_TIMEOUT)  
  461.                 continue;  
  462.   
  463.   
  464.             if(i == 0)  
  465.             {             
  466.                 RebuildArray();  
  467.                 continue;  
  468.             }  
  469.   
  470.   
  471.             PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]);  
  472.             if(pBuffer != NULL)  
  473.             {  
  474.                 if(!HandleIo(pBuffer))  
  475.                     RebuildArray();  
  476.             }  
  477.         }  
  478.         if(!g_bServerRunning && g_nBufferCount == 0)  
  479.             break;  
  480.     }  
  481.     WSACloseEvent(g_events[0]);  
  482.     WaitForSingleObject(hThread,INFINITE);  
  483.     CloseHandle(hThread);  
  484.     closesocket(sListen);  
  485.     WSACleanup();  
  486.     DeleteCriticalSection(&g_cs);  
  487.     return 0;  
  488. }  

七,IOCP。

大框架为书中例子,对强化了发送操作,部分异常处理,且加入了连接超时处理。

注意:当一个投递完成,且对应socket上已经没有未决的投递,必须要再投递一个请求或者关闭连接,否则socket对应的数据结构无法被释放,对应socket连接断开时也无法被

检测到。所以如果业务逻辑结束,要关闭连接。或者你需要等客户端来断开连接,那么你可以在业务逻辑结束后,再投递一个接收请求(客户端断开时,接收请求返回且接收的字节数为0,则此类中的异常处理逻辑便会将资源清理掉)。

头文件

  1. ////////////////////////////////////////
      
  2. // IOCP.h文件
      
  3.   
  4.   
  5. #ifndef __IOCP_H__
      
  6. #define __IOCP_H__
      
  7.   
  8.   
  9. #include <winsock2.h>
      
  10. #include <windows.h>
      
  11. #include <Mswsock.h>
      
  12.   
  13.   
  14. #define BUFFER_SIZE 1024*4      // I/O请求的缓冲区大小
      
  15. #define MAX_THREAD  1           // I/O服务线程的数量
      
  16.   
  17.   
  18.   
  19.   
  20. // 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
      
  21. struct CIOCPBuffer  
  22. {  
  23.     CIOCPBuffer()  
  24.     {  
  25.         memset(&ol,0,sizeof(WSAOVERLAPPED));  
  26.         sClient = INVALID_SOCKET;  
  27.         memset(buff,0,BUFFER_SIZE);  
  28.         nLen = 0;  
  29.         nSequenceNumber = 0;  
  30.         bIsReleased = FALSE;  
  31.         nOperation = 0;  
  32.         pNext = NULL;  
  33.     }  
  34.     WSAOVERLAPPED ol;  
  35.   
  36.   
  37.     SOCKET sClient;         // AcceptEx接收的客户方套节字
      
  38.   
  39.   
  40.     char buff[BUFFER_SIZE];             // I/O操作使用的缓冲区
      
  41.     int nLen;               // buff缓冲区(使用的)大小
      
  42.   
  43.   
  44.     ULONG nSequenceNumber;  // 此I/O的序列号
      
  45.     BOOL  bIsReleased;  
  46.   
  47.   
  48.     int nOperation;         // 操作类型
      
  49. #define OP_ACCEPT   1
      
  50. #define OP_WRITE    2
      
  51. #define OP_READ     3
      
  52.   
  53.   
  54.     CIOCPBuffer *pNext;  
  55. };  
  56. struct CIOCPNextToSend;  
  57. struct CIOCPTimerData;  
  58. // 这是per-Handle数据。它包含了一个套节字的信息
      
  59. struct CIOCPContext  
  60. {  
  61.     CIOCPContext()  
  62.     {  
  63.         s = INVALID_SOCKET;  
  64.         memset(&addrLocal,0,sizeof(SOCKADDR_IN));  
  65.         memset(&addrRemote,0,sizeof(SOCKADDR_IN));  
  66.         bClosing = FALSE;  
  67.         nOutstandingRecv = 0;  
  68.         nOutstandingSend = 0;  
  69.         nReadSequence = 0;  
  70.         nCurrentReadSequence = 0;  
  71.         nCurrentStep = 0;  
  72.         pOutOfOrderReads = NULL;  
  73.         pNextToSend = NULL;  
  74.         bIsReleased = FALSE;  
  75.         pNext = NULL;  
  76.         pPreData = NULL;  
  77.         strcpy(szClientName,"");  
  78.         hTimer = NULL;  
  79.         hCompletion = NULL;  
  80.     }  
  81.     CIOCPBuffer m_pBuffer;  
  82.     SOCKET s;                       // 套节字句柄
      
  83.   
  84.   
  85.     SOCKADDR_IN addrLocal;          // 连接的本地地址
      
  86.     SOCKADDR_IN addrRemote;         // 连接的远程地址
      
  87.   
  88.   
  89.     BOOL bClosing;                  // 套节字是否关闭
      
  90.   
  91.   
  92.     int nOutstandingRecv;           // 此套节字上抛出的重叠操作的数量
      
  93.     int nOutstandingSend;             
  94.   
  95.   
  96.   
  97.   
  98.     ULONG nReadSequence;            // 安排给接收的下一个序列号
      
  99.     ULONG nCurrentReadSequence;     // 当前要读的序列号
      
  100.   
  101.     CIOCPBuffer *pOutOfOrderReads;  // 记录没有按顺序完成的读I/O
      
  102.     CIOCPNextToSend *pNextToSend;       //xss,按顺序发送的下一个要发送的。
      
  103.   
  104.   
  105.     LPVOID pPreData; //xss,用于2个过程之间的数据交流。
      
  106.     ULONG  nCurrentStep;//xss,用于记录当前处于的过程步骤数。
      
  107.     BOOL   bIsReleased;  
  108.   
  109.   
  110.     CRITICAL_SECTION Lock;          // 保护这个结构
      
  111.   
  112.   
  113.     CIOCPContext *pNext;  
  114.   
  115.   
  116.     char szClientName[256];//xss
      
  117.     HANDLE hTimer;//xss
      
  118.     HANDLE hCompletion;//xss
      
  119.   
  120.   
  121. };  
  122.   
  123.   
  124. struct CIOCPNextToSend//xss
      
  125. {  
  126.     CIOCPBuffer * pBuffer;  
  127.     CIOCPNextToSend * pNext;  
  128. };  
  129.   
  130.   
  131. struct CIOCPTimerData  
  132. {  
  133.     CIOCPContext* pContext;  
  134.     HANDLE hCompletion;  
  135. };  
  136.   
  137.   
  138. class CIOCPServer   // 处理线程
      
  139. {  
  140. public:  
  141.     CIOCPServer();  
  142.     ~CIOCPServer();  
  143.   
  144.   
  145.     // 开始服务
      
  146.     BOOL Start(int nPort = 3456, int nMaxConnections = 2000,   
  147.             int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);  
  148.     // 停止服务
      
  149.     void Shutdown();  
  150.   
  151.   
  152.     // 关闭一个连接和关闭所有连接
      
  153.     void CloseAConnection(CIOCPContext *pContext);  
  154.     void CloseAllConnections();   
  155.   
  156.   
  157.     // 取得当前的连接数量
      
  158.     ULONG GetCurrentConnection() { return m_nCurrentConnection; }  
  159.   
  160.   
  161.     // 向指定客户发送文本
      
  162.     BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);   
  163.   
  164.   
  165. protected:  
  166.   
  167.   
  168.     // 申请和释放缓冲区对象
      
  169.     CIOCPBuffer *AllocateBuffer(int nLen);  
  170.     void ReleaseBuffer(CIOCPBuffer *pBuffer);  
  171.   
  172.   
  173.     // 申请和释放套节字上下文
      
  174.     CIOCPContext *AllocateContext(SOCKET s);  
  175.     void ReleaseContext(CIOCPContext *pContext);  
  176.   
  177.   
  178.     // 释放空闲缓冲区对象列表和空闲上下文对象列表
      
  179.     void FreeBuffers();  
  180.     void FreeContexts();  
  181.   
  182.   
  183.     // 向连接列表中添加一个连接
      
  184.     BOOL AddAConnection(CIOCPContext *pContext);  
  185.   
  186.   
  187.     // 插入和移除未决的接受请求
      
  188.     BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);  
  189.     BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);  
  190.   
  191.   
  192.     //xss,把要发送的数据加入队列,按顺序发送
      
  193.     BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  194.     //xss,发送下一个需要发送的
      
  195.     BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  196.     // 取得下一个要读取的
      
  197.     CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  198.   
  199.   
  200.   
  201.   
  202.     void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
      
  203.     // 投递接受I/O、发送I/O、接收I/O
      
  204.     BOOL PostAccept(CIOCPBuffer *pBuffer);  
  205.     BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  206.     BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  207.     BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  208.   
  209.   
  210.     void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);  
  211.   
  212.   
  213.   
  214.   
  215.         // 事件通知函数
      
  216.     // 建立了一个新的连接
      
  217.     virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  218.     // 一个连接关闭
      
  219.     virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  220.     // 在一个连接上发生了错误
      
  221.     virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);  
  222.     // 一个连接上的读操作完成
      
  223.     virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  224.     // 一个连接上的写操作完成
      
  225.     virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);  
  226.   
  227.   
  228. protected:  
  229.   
  230.   
  231.     // 记录空闲结构信息
      
  232.     CIOCPBuffer *m_pFreeBufferList;  
  233.     CIOCPContext *m_pFreeContextList;  
  234.     int m_nFreeBufferCount;  
  235.     int m_nFreeContextCount;      
  236.     CRITICAL_SECTION m_FreeBufferListLock;  
  237.     CRITICAL_SECTION m_FreeContextListLock;  
  238.   
  239.   
  240.     CRITICAL_SECTION m_HeapLock;  
  241.     CRITICAL_SECTION m_RepostLock;  
  242.   
  243.   
  244.     // 记录抛出的Accept请求
      
  245.     CIOCPBuffer *m_pPendingAccepts;   // 抛出请求列表。
      
  246.     long m_nPendingAcceptCount;  
  247.     CRITICAL_SECTION m_PendingAcceptsLock;  
  248.   
  249.   
  250.     // 记录连接列表
      
  251.     CIOCPContext *m_pConnectionList;  
  252.     int m_nCurrentConnection;  
  253.     CRITICAL_SECTION m_ConnectionListLock;  
  254.   
  255.   
  256.     // 用于投递Accept请求
      
  257.     HANDLE m_hAcceptEvent;  
  258.     HANDLE m_hRepostEvent;  
  259.     LONG m_nRepostCount;  
  260.   
  261.   
  262.     int m_nPort;                // 服务器监听的端口
      
  263.   
  264.   
  265.     int m_nInitialAccepts;  
  266.     int m_nInitialReads;  
  267.     int m_nMaxAccepts;  
  268.     int m_nMaxSends;  
  269.     int m_nMaxFreeBuffers;  
  270.     int m_nMaxFreeContexts;  
  271.     int m_nMaxConnections;  
  272.   
  273.   
  274.     HANDLE m_hListenThread;         // 监听线程
      
  275.     HANDLE m_hCompletion;           // 完成端口句柄
      
  276.     SOCKET m_sListen;               // 监听套节字句柄
      
  277.     LPFN_ACCEPTEX m_lpfnAcceptEx;   // AcceptEx函数地址
      
  278.     LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址
      
  279.   
  280.   
  281.     BOOL m_bShutDown;       // 用于通知监听线程退出
      
  282.     BOOL m_bServerStarted;  // 记录服务是否启动
      
  283.   
  284.   
  285.     HANDLE m_hTimerQueue;//xss 
      
  286.   
  287.   
  288. private:    // 线程函数
      
  289.     static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);  
  290.     static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);  
  291. };  
  292.   
  293.   
  294.   
  295.   
  296. #endif // __IOCP_H__  

cpp文件

  1. //////////////////////////////////////////////////
      
  2. // IOCP.cpp文件
      
  3. #define _WIN32_WINNT 0x0500 //xss
      
  4.   
  5.   
  6. #include "iocp.h"
      
  7. #pragma comment(lib, "WS2_32.lib")
      
  8.   
  9.   
  10. #include <stdio.h>
      
  11. #include "httpFun.h"
      
  12.   
  13.   
  14. static int iBufferCount = 0;  
  15. static int iContextCount = 0;  
  16. CIOCPServer::CIOCPServer()  
  17. {  
  18.     // 列表
      
  19.     m_pFreeBufferList = NULL;  
  20.     m_pFreeContextList = NULL;    
  21.     m_pPendingAccepts = NULL;  
  22.     m_pConnectionList = NULL;  
  23.   
  24.   
  25.     m_nFreeBufferCount = 0;  
  26.     m_nFreeContextCount = 0;  
  27.     m_nPendingAcceptCount = 0;  
  28.     m_nCurrentConnection = 0;  
  29.   
  30.   
  31.     ::InitializeCriticalSection(&m_FreeBufferListLock);  
  32.     ::InitializeCriticalSection(&m_FreeContextListLock);  
  33.     ::InitializeCriticalSection(&m_PendingAcceptsLock);  
  34.     ::InitializeCriticalSection(&m_ConnectionListLock);  
  35.   
  36.   
  37.     ::InitializeCriticalSection(&m_HeapLock);  
  38.     ::InitializeCriticalSection(&m_RepostLock);  
  39.   
  40.   
  41.     // Accept请求
      
  42.     m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);  
  43.     m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);  
  44.     m_nRepostCount = 0;  
  45.   
  46.   
  47.     m_nPort = 8888;  
  48.   
  49.   
  50.     m_nInitialAccepts = 10;  
  51.     m_nInitialReads = 4;  
  52.     m_nMaxAccepts = 100;  
  53.     m_nMaxSends = 20;  
  54.     m_nMaxFreeBuffers = 200;  
  55.     m_nMaxFreeContexts = 100;  
  56.     m_nMaxConnections = 2000;  
  57.   
  58.   
  59.     m_hListenThread = NULL;  
  60.     m_hCompletion = NULL;  
  61.     m_sListen = INVALID_SOCKET;  
  62.     m_lpfnAcceptEx = NULL;  
  63.     m_lpfnGetAcceptExSockaddrs = NULL;  
  64.   
  65.   
  66.     m_bShutDown = FALSE;  
  67.     m_bServerStarted = FALSE;  
  68.   
  69.   
  70.     m_hTimerQueue = ::CreateTimerQueue();  
  71.   
  72.   
  73.     // 初始化WS2_32.dll
      
  74.     WSADATA wsaData;  
  75.     WORD sockVersion = MAKEWORD(2, 2);  
  76.     ::WSAStartup(sockVersion, &wsaData);  
  77. }  
  78.   
  79.   
  80. CIOCPServer::~CIOCPServer()  
  81. {  
  82.     Shutdown();  
  83.   
  84.   
  85.     if(m_sListen != INVALID_SOCKET)  
  86.         ::closesocket(m_sListen);  
  87.     if(m_hListenThread != NULL)  
  88.         ::CloseHandle(m_hListenThread);  
  89.   
  90.   
  91.     ::CloseHandle(m_hRepostEvent);  
  92.     ::CloseHandle(m_hAcceptEvent);  
  93.   
  94.   
  95.     ::DeleteCriticalSection(&m_FreeBufferListLock);  
  96.     ::DeleteCriticalSection(&m_FreeContextListLock);  
  97.     ::DeleteCriticalSection(&m_PendingAcceptsLock);  
  98.     ::DeleteCriticalSection(&m_ConnectionListLock);  
  99.   
  100.   
  101.     ::DeleteCriticalSection(&m_HeapLock);  
  102.     ::DeleteCriticalSection(&m_RepostLock);  
  103.   
  104.   
  105.     ::DeleteTimerQueue(m_hTimerQueue);//xss
      
  106.     ::WSACleanup();   
  107. }  
  108.   
  109.   
  110. ///////////////////////////////////////
      
  111. static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired)  
  112. {  
  113.     CIOCPContext* pContext = (CIOCPContext*)lpParam;  
  114.     if(pContext != NULL && pContext->bClosing == FALSE)  
  115.     {  
  116.         EnterCriticalSection(&pContext->Lock);  
  117.         if(pContext->hCompletion != NULL)  
  118.         {  
  119.             PostQueuedCompletionStatus(pContext->hCompletion,-2,(ULONG_PTR)pContext,NULL);  
  120.         }  
  121.         LeaveCriticalSection(&pContext->Lock);  
  122.     }  
  123. }  
  124.   
  125.   
  126.   
  127.   
  128.   
  129.   
  130. ///////////////////////////////////
      
  131. // 自定义帮助函数
      
  132.   
  133.   
  134. CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)  
  135. {  
  136.     CIOCPBuffer *pBuffer = NULL;  
  137.     if(nLen > BUFFER_SIZE)  
  138.         return NULL;  
  139.   
  140.   
  141.     // 为缓冲区对象申请内存
      
  142.     ::EnterCriticalSection(&m_FreeBufferListLock);  
  143.     if(m_pFreeBufferList == NULL)  // 内存池为空,申请新的内存
      
  144.     {  
  145. //      pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(), 
      
  146. //                      HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
      
  147.         pBuffer = new CIOCPBuffer();  
  148.     }  
  149.     else    // 从内存池中取一块来使用
      
  150.     {  
  151.         pBuffer = m_pFreeBufferList;  
  152.         m_pFreeBufferList = m_pFreeBufferList->pNext;      
  153.         pBuffer->pNext = NULL;  
  154.         m_nFreeBufferCount --;  
  155.     }  
  156.     ::LeaveCriticalSection(&m_FreeBufferListLock);  
  157.   
  158.   
  159.     EnterCriticalSection(&m_HeapLock);  
  160.     iBufferCount++;  
  161.     LeaveCriticalSection(&m_HeapLock);  
  162.   
  163.   
  164.     // 初始化新的缓冲区对象
      
  165.     if(pBuffer != NULL)  
  166.     {  
  167.         //pBuffer->buff = (char*)(pBuffer + sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该+sizeof(CIOCPBuffer);
      
  168.         pBuffer->nLen = nLen;  
  169.         pBuffer->bIsReleased = FALSE;  
  170.     }  
  171.     return pBuffer;  
  172. }  
  173.   
  174.   
  175. void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)  
  176. {  
  177.     if(pBuffer == NULL || pBuffer->bIsReleased)  
  178.         return;  
  179.   
  180.   
  181.     ::EnterCriticalSection(&m_FreeBufferListLock);  
  182.   
  183.   
  184.     if(m_nFreeBufferCount <= m_nMaxFreeBuffers)  // 将要释放的内存添加到空闲列表中
      
  185.     {  
  186.         memset(pBuffer, 0, sizeof(CIOCPBuffer) /*+ BUFFER_SIZE*/);  
  187.         pBuffer->pNext = m_pFreeBufferList;  
  188.         m_pFreeBufferList = pBuffer;  
  189.   
  190.   
  191.         m_nFreeBufferCount ++ ;  
  192.   
  193.   
  194.         pBuffer->bIsReleased = TRUE;  
  195.     }  
  196.     else            // 已经达到最大值,真正的释放内存
      
  197.     {  
  198.         //::HeapFree(::GetProcessHeap(), 0, pBuffer);
      
  199.         delete pBuffer;  
  200.     }  
  201.   
  202.   
  203.     ::LeaveCriticalSection(&m_FreeBufferListLock);  
  204.   
  205.   
  206.     EnterCriticalSection(&m_HeapLock);  
  207.     iBufferCount--;  
  208.     LeaveCriticalSection(&m_HeapLock);  
  209. }  
  210.   
  211.   
  212.   
  213.   
  214. CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)  
  215. {  
  216.     CIOCPContext *pContext;  
  217.   
  218.   
  219.     // 申请一个CIOCPContext对象
      
  220.     ::EnterCriticalSection(&m_FreeContextListLock);  
  221.   
  222.   
  223.     if(m_pFreeContextList == NULL)  
  224.     {  
  225.         //pContext = (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
      
  226.         pContext = new CIOCPContext();  
  227.   
  228.   
  229.         ::InitializeCriticalSection(&pContext->Lock);  
  230.     }  
  231.     else      
  232.     {  
  233.         // 在空闲列表中申请
      
  234.         pContext = m_pFreeContextList;  
  235.         m_pFreeContextList = m_pFreeContextList->pNext;  
  236.         pContext->pNext = NULL;  
  237.         m_nFreeBufferCount --;  
  238.     }  
  239.     ::LeaveCriticalSection(&m_FreeContextListLock);  
  240.   
  241.   
  242.     EnterCriticalSection(&m_HeapLock);  
  243.     iContextCount++;  
  244.     LeaveCriticalSection(&m_HeapLock);  
  245.   
  246.   
  247.     // 初始化对象成员
      
  248.     if(pContext != NULL)  
  249.     {  
  250.         pContext->s = s;  
  251.         pContext->bIsReleased = FALSE;  
  252.     }  
  253.     return pContext;  
  254. }  
  255.   
  256.   
  257. void CIOCPServer::ReleaseContext(CIOCPContext *pContext)  
  258. {  
  259.     if(pContext == NULL || pContext->bIsReleased)  
  260.         return;  
  261.   
  262.   
  263.     printf("\n%s释放了Context\n\n",pContext->szClientName);  
  264.     if(pContext->s != INVALID_SOCKET)  
  265.         ::closesocket(pContext->s);  
  266.   
  267.   
  268.     // 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
      
  269.     CIOCPBuffer *pNext;  
  270.     while(pContext->pOutOfOrderReads != NULL)  
  271.     {  
  272.         pNext = pContext->pOutOfOrderReads->pNext;  
  273.         ReleaseBuffer(pContext->pOutOfOrderReads);  
  274.         pContext->pOutOfOrderReads = pNext;  
  275.     }  
  276.   
  277.   
  278.     //xss,再释放(如果有的话)此套接字上未完成的写I/O缓冲区
      
  279.     CIOCPNextToSend* pSend = NULL;  
  280.     while(pContext->pNextToSend != NULL)  
  281.     {  
  282.         pSend = pContext->pNextToSend->pNext;  
  283.         if(pContext->pNextToSend->pBuffer != NULL && pContext->pNextToSend->pBuffer->bIsReleased == FALSE)  
  284.         {  
  285.             ReleaseBuffer(pContext->pNextToSend->pBuffer);  
  286.         }  
  287.         delete pContext->pNextToSend;  
  288.         pContext->pNextToSend = pSend;  
  289.     }  
  290.   
  291.   
  292.     if(pContext->hTimer != NULL)  
  293.     {  
  294.         DeleteTimerQueueTimer(m_hTimerQueue,pContext->hTimer,NULL);  
  295.         pContext->hTimer = NULL;   
  296.     }  
  297.   
  298.   
  299.     ::EnterCriticalSection(&m_FreeContextListLock);  
  300.       
  301.     if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表
      
  302.     {  
  303.         // 先将关键代码段变量保存到一个临时变量中
      
  304.         CRITICAL_SECTION cstmp = pContext->Lock;  
  305.         // 将要释放的上下文对象初始化为0
      
  306.         memset(pContext, 0, sizeof(CIOCPContext));  
  307.   
  308.   
  309.         // 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
      
  310.         pContext->Lock = cstmp;  
  311.         pContext->pNext = m_pFreeContextList;  
  312.         m_pFreeContextList = pContext;  
  313.           
  314.         // 更新计数
      
  315.         m_nFreeContextCount ++;  
  316.   
  317.   
  318.         pContext->bIsReleased = TRUE;  
  319.     }  
  320.     else  
  321.     {  
  322.         ::DeleteCriticalSection(&pContext->Lock);  
  323.         //::HeapFree(::GetProcessHeap(), 0, pContext);
      
  324.         delete pContext;  
  325.     }  
  326.     ::LeaveCriticalSection(&m_FreeContextListLock);  
  327.   
  328.   
  329.       
  330.     EnterCriticalSection(&m_HeapLock);  
  331.     iContextCount--;  
  332.     LeaveCriticalSection(&m_HeapLock);    
  333. }  
  334.   
  335.   
  336. void CIOCPServer::FreeBuffers()  
  337. {  
  338.     // 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
      
  339.     ::EnterCriticalSection(&m_FreeBufferListLock);  
  340.   
  341.   
  342.     CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;  
  343.     CIOCPBuffer *pNextBuffer;  
  344.     while(pFreeBuffer != NULL)  
  345.     {  
  346.         pNextBuffer = pFreeBuffer->pNext;  
  347.   
  348.   
  349.         delete pFreeBuffer;  
  350. //      if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
      
  351. //      {
      
  352. // #ifdef _DEBUG
      
  353. //          ::OutputDebugString("  FreeBuffers释放内存出错!");
      
  354. // #endif // _DEBUG
      
  355. //          break;
      
  356. //      }
      
  357.         pFreeBuffer = pNextBuffer;  
  358.     }  
  359.     m_pFreeBufferList = NULL;  
  360.     m_nFreeBufferCount = 0;  
  361.   
  362.   
  363.     ::LeaveCriticalSection(&m_FreeBufferListLock);  
  364. }  
  365.   
  366.   
  367. void CIOCPServer::FreeContexts()  
  368. {  
  369.     // 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
      
  370.     ::EnterCriticalSection(&m_FreeContextListLock);  
  371.       
  372.     CIOCPContext *pFreeContext = m_pFreeContextList;  
  373.     CIOCPContext *pNextContext;  
  374.     while(pFreeContext != NULL)  
  375.     {  
  376.         pNextContext = pFreeContext->pNext;  
  377.           
  378.         ::DeleteCriticalSection(&pFreeContext->Lock);  
  379.         delete pFreeContext;  
  380. //      if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
      
  381. //      {
      
  382. // #ifdef _DEBUG
      
  383. //          ::OutputDebugString("  FreeBuffers释放内存出错!");
      
  384. // #endif // _DEBUG
      
  385. //          break;
      
  386. //      }
      
  387.         pFreeContext = pNextContext;  
  388.     }  
  389.     m_pFreeContextList = NULL;  
  390.     m_nFreeContextCount = 0;  
  391.   
  392.   
  393.     ::LeaveCriticalSection(&m_FreeContextListLock);  
  394. }  
  395.   
  396.   
  397.   
  398.   
  399. BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)  
  400. {  
  401.     // 向客户连接列表添加一个CIOCPContext对象
      
  402.   
  403.   
  404.     ::EnterCriticalSection(&m_ConnectionListLock);  
  405.     if(m_nCurrentConnection <= m_nMaxConnections)  
  406.     {  
  407.         // 添加到表头
      
  408.         pContext->pNext = m_pConnectionList;  
  409.         m_pConnectionList = pContext;  
  410.         // 更新计数
      
  411.         m_nCurrentConnection ++;  
  412.   
  413.   
  414.         ::LeaveCriticalSection(&m_ConnectionListLock);  
  415.         return TRUE;  
  416.     }  
  417.     ::LeaveCriticalSection(&m_ConnectionListLock);  
  418.   
  419.   
  420.     return FALSE;  
  421. }  
  422.   
  423.   
  424. void CIOCPServer::CloseAConnection(CIOCPContext *pContext)  
  425. {  
  426.     if(pContext == NULL || pContext->bClosing == TRUE)  
  427.         return;  
  428.   
  429.   
  430.     // 首先从列表中移除要关闭的连接
      
  431.     ::EnterCriticalSection(&m_ConnectionListLock);  
  432.   
  433.   
  434.     CIOCPContext* pTest = m_pConnectionList;  
  435.     if(pTest == pContext)  
  436.     {  
  437.         m_pConnectionList =  pContext->pNext;  
  438.         m_nCurrentConnection --;  
  439.     }  
  440.     else  
  441.     {  
  442.         while(pTest != NULL && pTest->pNext !=  pContext)  
  443.             pTest = pTest->pNext;  
  444.         if(pTest != NULL)  
  445.         {  
  446.             pTest->pNext =  pContext->pNext;  
  447.             m_nCurrentConnection --;  
  448.         }  
  449.     }  
  450.       
  451.     ::LeaveCriticalSection(&m_ConnectionListLock);  
  452.   
  453.   
  454.     // 然后关闭客户套节字
      
  455.     ::EnterCriticalSection(&pContext->Lock);  
  456.   
  457.   
  458.     if(pContext->s != INVALID_SOCKET)  
  459.     {  
  460.         ::closesocket(pContext->s);    
  461.         pContext->s = INVALID_SOCKET;  
  462.     }  
  463.     pContext->bClosing = TRUE;  
  464.   
  465.   
  466.     ::LeaveCriticalSection(&pContext->Lock);  
  467. }  
  468.   
  469.   
  470. void CIOCPServer::CloseAllConnections()  
  471. {  
  472.     // 遍历整个连接列表,关闭所有的客户套节字
      
  473.   
  474.   
  475.     ::EnterCriticalSection(&m_ConnectionListLock);  
  476.   
  477.   
  478.     CIOCPContext *pContext = m_pConnectionList;  
  479.     while(pContext != NULL)  
  480.     {     
  481.         ::EnterCriticalSection(&pContext->Lock);  
  482.   
  483.   
  484.         if(pContext->s != INVALID_SOCKET)  
  485.         {  
  486.             ::closesocket(pContext->s);  
  487.             pContext->s = INVALID_SOCKET;  
  488.         }  
  489.   
  490.   
  491.         pContext->bClosing = TRUE;  
  492.   
  493.   
  494.         ::LeaveCriticalSection(&pContext->Lock);   
  495.           
  496.         pContext = pContext->pNext;  
  497.     }  
  498.   
  499.   
  500.     m_pConnectionList = NULL;  
  501.     m_nCurrentConnection = 0;  
  502.   
  503.   
  504.     ::LeaveCriticalSection(&m_ConnectionListLock);  
  505. }  
  506.   
  507.   
  508.   
  509.   
  510. BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)  
  511. {  
  512.     // 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
      
  513.   
  514.   
  515.     ::EnterCriticalSection(&m_PendingAcceptsLock);  
  516.   
  517.   
  518.     if(m_pPendingAccepts == NULL)  
  519.         m_pPendingAccepts = pBuffer;  
  520.     else  
  521.     {  
  522.         pBuffer->pNext = m_pPendingAccepts;  
  523.         m_pPendingAccepts = pBuffer;  
  524.     }  
  525.     m_nPendingAcceptCount ++;  
  526.   
  527.   
  528.     ::LeaveCriticalSection(&m_PendingAcceptsLock);  
  529.   
  530.   
  531.     return TRUE;  
  532. }  
  533.   
  534.   
  535. BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)  
  536. {  
  537.     BOOL bResult = FALSE;  
  538.   
  539.   
  540.     // 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
      
  541.     ::EnterCriticalSection(&m_PendingAcceptsLock);  
  542.   
  543.   
  544.     CIOCPBuffer *pTest = m_pPendingAccepts;  
  545.     if(pTest == pBuffer)    // 如果是表头元素
      
  546.     {  
  547.         m_pPendingAccepts = pBuffer->pNext;  
  548.         bResult = TRUE;  
  549.     }  
  550.     else                    // 不是表头元素的话,就要遍历这个表来查找了
      
  551.     {  
  552.         while(pTest != NULL && pTest->pNext != pBuffer)  
  553.             pTest = pTest->pNext;  
  554.         if(pTest != NULL)  
  555.         {  
  556.             pTest->pNext = pBuffer->pNext;  
  557.              bResult = TRUE;  
  558.         }  
  559.     }  
  560.     // 更新计数
      
  561.     if(bResult)  
  562.         m_nPendingAcceptCount --;  
  563.   
  564.   
  565.     ::LeaveCriticalSection(&m_PendingAcceptsLock);  
  566.   
  567.   
  568.     return  bResult;  
  569. }  
  570.   
  571.   
  572. void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  573. {  
  574.     CloseAConnection(pContext);  
  575. }  
  576.   
  577.   
  578. BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
      
  579. {  
  580.     ::EnterCriticalSection(&pContext->Lock);  
  581.     CIOCPNextToSend *ptr = pContext->pNextToSend;  
  582.   
  583.   
  584.     CIOCPNextToSend * pSend = new CIOCPNextToSend();  
  585.     pSend->pBuffer = pBuffer;  
  586.     pSend->pNext = NULL;  
  587.     if(ptr == NULL)  
  588.     {  
  589.         printf("数据:%10.10s ...,被直接发送。\n",pBuffer->buff);  
  590.         //::EnterCriticalSection(&pContext->Lock);
      
  591.         pContext->pNextToSend = pSend;  
  592.         //::LeaveCriticalSection(&pContext->Lock);
      
  593.         if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送
      
  594.         {  
  595.             ::LeaveCriticalSection(&pContext->Lock);  
  596.             return FALSE;  
  597.         }  
  598.     }  
  599.     else  
  600.     {  
  601.         printf("数据:%10.10s ...,被放入链表结尾。\n",pBuffer->buff);  
  602.         while(ptr->pNext != NULL)  
  603.         {  
  604.             ptr = ptr->pNext;  
  605.         }  
  606.         ptr->pNext = pSend;//新的发送请求放在链表结尾
      
  607.     }  
  608.     ::LeaveCriticalSection(&pContext->Lock);  
  609.     return TRUE;  
  610. }  
  611.   
  612.   
  613. BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
      
  614. {  
  615.     ::EnterCriticalSection(&pContext->Lock);  
  616.     CIOCPNextToSend* pSend = pContext->pNextToSend;  
  617.     CIOCPNextToSend* pNextSend = NULL;  
  618.     if(pSend != NULL && pSend->pNext != NULL)//发送成功的pBuffer是队列的第一个,发送下一个,pNextToSend指向下一个,pBuffer由外面释放。
      
  619.     {  
  620.         pNextSend = pSend->pNext;  
  621.         if(pNextSend->pBuffer != NULL)  
  622.         {  
  623.             printf("数据:%10.10s ...从链表中弹出被发送。\n",pNextSend->pBuffer->buff);  
  624.             if(!PostSend(pContext,pNextSend->pBuffer))  
  625.             {  
  626.                 delete pSend;  
  627.                 pContext->pNextToSend = pNextSend;  
  628.                 ::LeaveCriticalSection(&pContext->Lock);  
  629.                 return FALSE;  
  630.             }  
  631.         }  
  632.     }  
  633.     if(pSend != NULL)  
  634.     {  
  635.         pNextSend = pSend->pNext;  
  636.         delete pSend;  
  637.         pContext->pNextToSend = pNextSend;  
  638.     }  
  639.     ::LeaveCriticalSection(&pContext->Lock);  
  640.     return TRUE;  
  641. }  
  642.   
  643.   
  644. CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  645. {  
  646.     if(pBuffer != NULL)  
  647.     {  
  648.         // 如果与要读的下一个序列号相等,则读这块缓冲区
      
  649.         if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)  
  650.         {  
  651.             return pBuffer;  
  652.         }  
  653.           
  654.         // 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
      
  655.   
  656.   
  657.         // 列表中的缓冲区是按照其序列号从小到大的顺序排列的
      
  658.   
  659.   
  660.         pBuffer->pNext = NULL;  
  661.           
  662.         CIOCPBuffer *ptr = pContext->pOutOfOrderReads;  
  663.         CIOCPBuffer *pPre = NULL;  
  664.         while(ptr != NULL)  
  665.         {  
  666.             if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)  
  667.                 break;  
  668.               
  669.             pPre = ptr;  
  670.             ptr = ptr->pNext;  
  671.         }  
  672.           
  673.         if(pPre == NULL) // 应该插入到表头
      
  674.         {  
  675.             pBuffer->pNext = pContext->pOutOfOrderReads;  
  676.             pContext->pOutOfOrderReads = pBuffer;  
  677.         }  
  678.         else            // 应该插入到表的中间
      
  679.         {  
  680.             pBuffer->pNext = pPre->pNext;  
  681.             pPre->pNext = pBuffer/*->pNext*/;//xss,个人觉得应该是pPre->pNext = pBuffer;
      
  682.         }  
  683.     }  
  684.   
  685.   
  686.     // 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
      
  687.     CIOCPBuffer *ptr = pContext->pOutOfOrderReads;  
  688.     if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))  
  689.     {  
  690.         pContext->pOutOfOrderReads = ptr->pNext;  
  691.         return ptr;  
  692.     }  
  693.     return NULL;  
  694. }  
  695.   
  696.   
  697.   
  698.   
  699. BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)  // 在监听套节字上投递Accept请求
      
  700. {  
  701.         // 设置I/O类型
      
  702.         pBuffer->nOperation = OP_ACCEPT;  
  703.   
  704.   
  705.         // 投递此重叠I/O  
      
  706.         DWORD dwBytes;  
  707.         pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);  
  708.         BOOL b = m_lpfnAcceptEx(m_sListen,   
  709.             pBuffer->sClient,  
  710.             pBuffer->buff,   
  711.             pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),//xss,第一次都是收一个cmd_header
      
  712.             sizeof(sockaddr_in) + 16,   
  713.             sizeof(sockaddr_in) + 16,   
  714.             &dwBytes,   
  715.             &pBuffer->ol);  
  716.         if(!b && ::WSAGetLastError() != WSA_IO_PENDING)  
  717.         {  
  718.             return FALSE;  
  719.         }  
  720.         if(pBuffer->nOperation == 0)  
  721.         {  
  722.             int x = 0;  
  723.         }  
  724.         return TRUE;  
  725. };  
  726.   
  727.   
  728. BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  729. {  
  730.     // 设置I/O类型
      
  731.     pBuffer->nOperation = OP_READ;     
  732.       
  733.     ::EnterCriticalSection(&pContext->Lock);  
  734.   
  735.   
  736.     // 设置序列号
      
  737.     pBuffer->nSequenceNumber = pContext->nReadSequence;  
  738.   
  739.   
  740.     // 投递此重叠I/O
      
  741.     DWORD dwBytes;  
  742.     DWORD dwFlags = 0;  
  743.     WSABUF buf;  
  744.     buf.buf = pBuffer->buff;  
  745.     buf.len = pBuffer->nLen;  
  746.     if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
  747.     {  
  748.         if(::WSAGetLastError() != WSA_IO_PENDING)  
  749.         {  
  750.             printf("WSARecv出错:%d\n",WSAGetLastError());  
  751.             ::LeaveCriticalSection(&pContext->Lock);  
  752.             return FALSE;  
  753.         }  
  754.     }  
  755.   
  756.   
  757.     // 增加套节字上的重叠I/O计数和读序列号计数
      
  758.   
  759.   
  760.     pContext->nOutstandingRecv ++;  
  761.     pContext->nReadSequence ++;  
  762.   
  763.   
  764.     ::LeaveCriticalSection(&pContext->Lock);  
  765.       
  766.     return TRUE;  
  767. }  
  768.   
  769.   
  770. BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  771. {     
  772.     // 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
      
  773.     if(pContext->nOutstandingSend > m_nMaxSends)  
  774.         return FALSE;  
  775.   
  776.   
  777.     // 设置I/O类型,增加套节字上的重叠I/O计数
      
  778.     pBuffer->nOperation = OP_WRITE;  
  779.   
  780.   
  781.     // 投递此重叠I/O
      
  782.     DWORD dwBytes;  
  783.     DWORD dwFlags = 0;  
  784.     WSABUF buf;  
  785.     buf.buf = pBuffer->buff;  
  786.     buf.len = pBuffer->nLen;  
  787.     if(::WSASend(pContext->s,   
  788.             &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
  789.     {  
  790.         int x;  
  791.         if((x=::WSAGetLastError()) != WSA_IO_PENDING)  
  792.         {  
  793.             printf("发送失败!错误码:%d",x);  
  794.             return FALSE;  
  795.         }  
  796.     }     
  797.     // 增加套节字上的重叠I/O计数
      
  798.   
  799.   
  800.     ::EnterCriticalSection(&pContext->Lock);  
  801.     pContext->nOutstandingSend ++;  
  802.     ::LeaveCriticalSection(&pContext->Lock);  
  803.   
  804.   
  805.     if(pBuffer->nOperation == 0)  
  806.     {  
  807.         int x = 0;  
  808.     }  
  809.     return TRUE;  
  810. }  
  811.   
  812.   
  813.   
  814.   
  815. BOOL CIOCPServer::Start(int nPort, int nMaxConnections,   
  816.             int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)  
  817. {  
  818.     // 检查服务是否已经启动
      
  819.     if(m_bServerStarted)  
  820.         return FALSE;  
  821.   
  822.   
  823.     // 保存用户参数
      
  824.     m_nPort = nPort;  
  825.     m_nMaxConnections = nMaxConnections;  
  826.     m_nMaxFreeBuffers = nMaxFreeBuffers;  
  827.     m_nMaxFreeContexts = nMaxFreeContexts;  
  828.     m_nInitialReads = nInitialReads;  
  829.   
  830.   
  831.     // 初始化状态变量
      
  832.     m_bShutDown = FALSE;  
  833.     m_bServerStarted = TRUE;  
  834.   
  835.   
  836.   
  837.   
  838.     // 创建监听套节字,绑定到本地端口,进入监听模式
      
  839.     m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);  
  840.     SOCKADDR_IN si;  
  841.     si.sin_family = AF_INET;  
  842.     si.sin_port = ::ntohs(m_nPort);  
  843.     si.sin_addr.S_un.S_addr = INADDR_ANY;  
  844.     if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)  
  845.     {  
  846.         m_bServerStarted = FALSE;  
  847.         return FALSE;  
  848.     }  
  849.     ::listen(m_sListen, 200);  
  850.   
  851.   
  852.     // 创建完成端口对象
      
  853.     m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);  
  854.   
  855.   
  856.     // 加载扩展函数AcceptEx
      
  857.     GUID GuidAcceptEx = WSAID_ACCEPTEX;  
  858.     DWORD dwBytes;  
  859.     ::WSAIoctl(m_sListen,   
  860.         SIO_GET_EXTENSION_FUNCTION_POINTER,   
  861.         &GuidAcceptEx,   
  862.         sizeof(GuidAcceptEx),  
  863.         &m_lpfnAcceptEx,   
  864.         sizeof(m_lpfnAcceptEx),   
  865.         &dwBytes,   
  866.         NULL,   
  867.         NULL);  
  868.       
  869.     // 加载扩展函数GetAcceptExSockaddrs
      
  870.     GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;  
  871.     ::WSAIoctl(m_sListen,  
  872.         SIO_GET_EXTENSION_FUNCTION_POINTER,  
  873.         &GuidGetAcceptExSockaddrs,  
  874.         sizeof(GuidGetAcceptExSockaddrs),  
  875.         &m_lpfnGetAcceptExSockaddrs,  
  876.         sizeof(m_lpfnGetAcceptExSockaddrs),  
  877.         &dwBytes,  
  878.         NULL,  
  879.         NULL  
  880.         );  
  881.       
  882.       
  883.     // 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
      
  884.     ::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);  
  885.   
  886.   
  887.     // 注册FD_ACCEPT事件。
      
  888.     // 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
      
  889.     WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);  
  890.   
  891.   
  892.     // 创建监听线程
      
  893.     m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);  
  894.       
  895.     return TRUE;  
  896. }  
  897.   
  898.   
  899. void CIOCPServer::Shutdown()  
  900. {  
  901.     if(!m_bServerStarted)  
  902.         return;  
  903.   
  904.   
  905.     // 通知监听线程,马上停止服务
      
  906.     m_bShutDown = TRUE;  
  907.     ::SetEvent(m_hAcceptEvent);  
  908.     // 等待监听线程退出
      
  909.     ::WaitForSingleObject(m_hListenThread, INFINITE);  
  910.     ::CloseHandle(m_hListenThread);  
  911.     m_hListenThread = NULL;  
  912.   
  913.   
  914.     m_bServerStarted = FALSE;  
  915. }  
  916.   
  917.   
  918. DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)  
  919. {  
  920.     CIOCPServer *pThis = (CIOCPServer*)lpParam;  
  921.   
  922.   
  923.     // 先在监听套节字上投递几个Accept I/O
      
  924.     CIOCPBuffer *pBuffer;  
  925.     for(int i=0; i<pThis->m_nInitialAccepts; i++)  
  926.     {  
  927.         pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE
      
  928.         if(pBuffer == NULL)  
  929.             return -1;  
  930.         pThis->InsertPendingAccept(pBuffer);  
  931.         pThis->PostAccept(pBuffer);  
  932.     }  
  933.   
  934.   
  935.     // 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
      
  936.     HANDLE hWaitEvents[2 + MAX_THREAD];  
  937.     int nEventCount = 0;  
  938.     hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;  
  939.     hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;  
  940.   
  941.   
  942.     // 创建指定数量的工作线程在完成端口上处理I/O
      
  943.     for(int i=0; i<MAX_THREAD; i++)  
  944.     {  
  945.         hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);  
  946.     }  
  947.   
  948.   
  949.     // 下面进入无限循环,处理事件对象数组中的事件
      
  950.     while(TRUE)  
  951.     {  
  952.         int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);  
  953.       
  954.         // 首先检查是否要停止服务
      
  955.         if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)  
  956.         {  
  957.             // 关闭所有连接
      
  958.             pThis->CloseAllConnections();  
  959.             ::Sleep(0);     // 给I/O工作线程一个执行的机会
      
  960.             // 关闭监听套节字
      
  961.             ::closesocket(pThis->m_sListen);  
  962.             pThis->m_sListen = INVALID_SOCKET;  
  963.             ::Sleep(0);     // 给I/O工作线程一个执行的机会
      
  964.   
  965.   
  966.             // 通知所有I/O处理线程退出
      
  967.             for(int i=2; i<MAX_THREAD + 2; i++)  
  968.             {     
  969.                 ::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);  
  970.             }  
  971.   
  972.   
  973.             // 等待I/O处理线程退出
      
  974.             ::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);  
  975.   
  976.   
  977.             for(int i=2; i<MAX_THREAD + 2; i++)  
  978.             {     
  979.                 ::CloseHandle(hWaitEvents[i]);  
  980.             }  
  981.           
  982.             ::CloseHandle(pThis->m_hCompletion);  
  983.   
  984.   
  985.             pThis->FreeBuffers();  
  986.             pThis->FreeContexts();  
  987.             ::ExitThread(0);  
  988.         }     
  989.   
  990.   
  991.         // 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
      
  992.         if(nIndex == WSA_WAIT_TIMEOUT)  
  993.         {  
  994.             pBuffer = pThis->m_pPendingAccepts;  
  995.             while(pBuffer != NULL)  
  996.             {  
  997.                 int nSeconds;  
  998.                 int nLen = sizeof(nSeconds);  
  999.                 // 取得连接建立的时间
      
  1000.                 ::getsockopt(pBuffer->sClient,   
  1001.                     SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);   
  1002.                 // 如果超过2分钟客户还不发送初始数据,就让这个客户go away
      
  1003.                 if(nSeconds != -1 && nSeconds > /*2*60*/50)  
  1004.                 {     
  1005.                     closesocket(pBuffer->sClient);  
  1006.                     pBuffer->sClient = INVALID_SOCKET;  
  1007.                 }  
  1008.   
  1009.   
  1010.                 pBuffer = pBuffer->pNext;  
  1011.             }  
  1012.         }  
  1013.         else  
  1014.         {  
  1015.             nIndex = nIndex - WAIT_OBJECT_0;  
  1016.             WSANETWORKEVENTS ne;  
  1017.             int nLimit=0;  
  1018.             if(nIndex == 0)         // 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
      
  1019.             {  
  1020.                 ::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);  
  1021.                 if(ne.lNetworkEvents & FD_ACCEPT)  
  1022.                 {  
  1023.                     nLimit = 50;  // 增加的个数,这里设为50个
      
  1024.                 }  
  1025.             }  
  1026.             else if(nIndex == 1)    // 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
      
  1027.             {  
  1028.                 nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);  
  1029.             }  
  1030.             else if(nIndex > 1)      // I/O服务线程退出,说明有错误发生,关闭服务器
      
  1031.             {  
  1032.                 pThis->m_bShutDown = TRUE;  
  1033.                 continue;  
  1034.             }  
  1035.   
  1036.   
  1037.             // 投递nLimit个AcceptEx I/O请求
      
  1038.             int i = 0;  
  1039.             while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)  
  1040.             {  
  1041.                 pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);  
  1042.                 if(pBuffer != NULL)  
  1043.                 {  
  1044.                     pThis->InsertPendingAccept(pBuffer);  
  1045.                     pThis->PostAccept(pBuffer);  
  1046.                 }  
  1047.             }  
  1048.         }  
  1049.     }  
  1050.     return 0;  
  1051. }  
  1052.   
  1053.   
  1054. DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)  
  1055. {  
  1056. #ifdef _DEBUG
      
  1057.             ::OutputDebugString("   WorkerThread 启动... \n");  
  1058. #endif // _DEBUG
      
  1059.   
  1060.   
  1061.     CIOCPServer *pThis = (CIOCPServer*)lpParam;  
  1062.   
  1063.   
  1064.     CIOCPBuffer *pBuffer = NULL;  
  1065.     DWORD dwKey;  
  1066.     DWORD dwTrans;  
  1067.     LPOVERLAPPED lpol;  
  1068.   
  1069.   
  1070.     while(TRUE)  
  1071.     {  
  1072.         // 在关联到此完成端口的所有套节字上等待I/O完成
      
  1073.         BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,   
  1074.                     &dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);  
  1075.           
  1076.         if(dwTrans == -1) // 用户通知退出
      
  1077.         {  
  1078. #ifdef _DEBUG
      
  1079.             ::OutputDebugString("   WorkerThread 退出 \n");  
  1080. #endif // _DEBUG
      
  1081.             ::ExitThread(0);  
  1082.         }  
  1083.         if(dwTrans != -2)  
  1084.             pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);  
  1085.         int nError = NO_ERROR;  
  1086.         if(!bOK)                        // 在此套节字上有错误发生
      
  1087.         {  
  1088.             printf("完成端口套接字上有错误:%d\n",GetLastError());  
  1089.             SOCKET s;  
  1090.             if(pBuffer->nOperation == OP_ACCEPT)  
  1091.             {  
  1092.                 s = pThis->m_sListen;  
  1093.             }  
  1094.             else  
  1095.             {  
  1096.                 if(dwKey == 0)  
  1097.                     break;  
  1098.                 s = ((CIOCPContext*)dwKey)->s;  
  1099.             }  
  1100.             DWORD dwFlags = 0;  
  1101.             if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))  
  1102.             {  
  1103.                 nError = ::WSAGetLastError();  
  1104.             }  
  1105.         }  
  1106.         pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);  
  1107.         printf("Buffer:%d     Context:%d\n",iBufferCount,iContextCount);  
  1108.     }  
  1109.   
  1110.   
  1111. #ifdef _DEBUG
      
  1112.             ::OutputDebugString("   WorkerThread 退出 \n");  
  1113. #endif // _DEBUG
      
  1114.     return 0;  
  1115. }  
  1116.   
  1117.   
  1118. int g_x = 0;  
  1119. void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)  
  1120. {  
  1121.     CIOCPContext *pContext = (CIOCPContext *)dwKey;  
  1122.   
  1123.   
  1124. #ifdef _DEBUG
      
  1125.             ::OutputDebugString("   HandleIO... \n");  
  1126. #endif // _DEBUG
      
  1127.       
  1128.     // 1)首先减少套节字上的未决I/O计数
      
  1129.     if(dwTrans == -2)  
  1130.     {  
  1131.         CloseAConnection(pContext);  
  1132.         return;  
  1133.     }  
  1134.     if(pContext != NULL)  
  1135.     {  
  1136.         ::EnterCriticalSection(&pContext->Lock);  
  1137.           
  1138.         if(pBuffer->nOperation == OP_READ)  
  1139.             pContext->nOutstandingRecv --;  
  1140.         else if(pBuffer->nOperation == OP_WRITE)  
  1141.             pContext->nOutstandingSend --;  
  1142.           
  1143.         ::LeaveCriticalSection(&pContext->Lock);  
  1144.           
  1145.         // 2)检查套节字是否已经被我们关闭
      
  1146.         if(pContext->bClosing)   
  1147.         {  
  1148. #ifdef _DEBUG
      
  1149.             ::OutputDebugString("   检查到套节字已经被我们关闭 \n");  
  1150. #endif // _DEBUG
      
  1151.             if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1152.             {         
  1153.                 ReleaseContext(pContext);  
  1154.                 pContext = NULL;  
  1155.             }  
  1156.             // 释放已关闭套节字的未决I/O
      
  1157.             ReleaseBuffer(pBuffer);  
  1158.             pBuffer = NULL;  
  1159.             return;  
  1160.         }  
  1161.     }  
  1162.     else  
  1163.     {  
  1164.         RemovePendingAccept(pBuffer);  
  1165.     }  
  1166.   
  1167.   
  1168.     // 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
      
  1169.     if(nError != NO_ERROR)  
  1170.     {  
  1171.         if(pBuffer->nOperation != OP_ACCEPT)  
  1172.         {  
  1173.             OnConnectionError(pContext, pBuffer, nError);  
  1174.             CloseAConnection(pContext);  
  1175.             if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1176.             {         
  1177.                 ReleaseContext(pContext);  
  1178.                 pContext = NULL;  
  1179.             }  
  1180. #ifdef _DEBUG
      
  1181.             ::OutputDebugString("   检查到客户套节字上发生错误 \n");  
  1182. #endif // _DEBUG
      
  1183.         }  
  1184.         else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
      
  1185.         {  
  1186.             // 客户端出错,释放I/O缓冲区
      
  1187.             if(pBuffer->sClient != INVALID_SOCKET)  
  1188.             {  
  1189.                 ::closesocket(pBuffer->sClient);  
  1190.                 pBuffer->sClient = INVALID_SOCKET;  
  1191.             }  
  1192. #ifdef _DEBUG
      
  1193.             ::OutputDebugString("   检查到监听套节字上发生错误 \n");  
  1194. #endif // _DEBUG
      
  1195.         }  
  1196.   
  1197.   
  1198.         ReleaseBuffer(pBuffer);  
  1199.         pBuffer = NULL;  
  1200.         return;  
  1201.     }  
  1202.   
  1203.   
  1204.   
  1205.   
  1206.     // 开始处理
      
  1207.     if(pBuffer->nOperation == OP_ACCEPT)  
  1208.     {  
  1209.         if(dwTrans == 0)  
  1210.         {  
  1211. #ifdef _DEBUG
      
  1212.             ::OutputDebugString("   监听套节字上客户端关闭 \n");  
  1213. #endif // _DEBUG
      
  1214.               
  1215.             if(pBuffer->sClient != INVALID_SOCKET)  
  1216.             {  
  1217.                 ::closesocket(pBuffer->sClient);  
  1218.                 pBuffer->sClient = INVALID_SOCKET;  
  1219.             }  
  1220.         }  
  1221.         else  
  1222.         {  
  1223.             // 为新接受的连接申请客户上下文对象
      
  1224.             CIOCPContext *pClient = AllocateContext(pBuffer->sClient);  
  1225.             if(pClient != NULL)  
  1226.             {  
  1227.                 if(AddAConnection(pClient))  
  1228.                 {     
  1229.                     // 取得客户地址
      
  1230.                     int nLocalLen, nRmoteLen;  
  1231.                     LPSOCKADDR pLocalAddr, pRemoteAddr;  
  1232.                     m_lpfnGetAcceptExSockaddrs(  
  1233.                         pBuffer->buff,  
  1234.                         pBuffer->nLen - (sizeof(sockaddr_in) + 16) * 2/*sizeof(cmd_header)*/,  
  1235.                         sizeof(sockaddr_in) + 16,  
  1236.                         sizeof(sockaddr_in) + 16,  
  1237.                         (SOCKADDR **)&pLocalAddr,  
  1238.                         &nLocalLen,  
  1239.                         (SOCKADDR **)&pRemoteAddr,  
  1240.                         &nRmoteLen);  
  1241.                     memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);  
  1242.                     memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);  
  1243.                       
  1244.                     // 关联新连接到完成端口对象
      
  1245.                     ::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);  
  1246.                       
  1247.                     // 通知用户
      
  1248.                     pBuffer->nLen = dwTrans;  
  1249.                     OnConnectionEstablished(pClient, pBuffer);  
  1250.   
  1251.   
  1252.                     if(pClient->bClosing && pClient->nOutstandingRecv == 0 && pClient->nOutstandingSend == 0)  
  1253.                     {  
  1254.                         ReleaseContext(pClient);  
  1255.                         pContext = NULL;  
  1256.                     }  
  1257.                     else if(pClient->hTimer == NULL)//接收一个客户端的同时创建一个检测I/O超时的Timer
      
  1258.                     {  
  1259.                         pClient->hCompletion = m_hCompletion;  
  1260.                         CreateTimerQueueTimer(&pClient->hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0);  
  1261.                     }  
  1262.                       
  1263.                     // 向新连接投递Read请求或者Write请求,直接关闭这些空间在套节字关闭或出错时释放
      
  1264. //                      CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
      
  1265. //                      if(p != NULL)
      
  1266. //                      {
      
  1267. //                          if(!PostRecv(pClient, p))
      
  1268. //                          {
      
  1269. //                              CloseAConnection(pClient);
      
  1270. //                          }
      
  1271. //                      }
      
  1272.   
  1273.   
  1274.                 }  
  1275.                 else    // 连接数量已满,关闭连接
      
  1276.                 {  
  1277.                     CloseAConnection(pClient);  
  1278.                     ReleaseContext(pClient);  
  1279.                     pContext = NULL;  
  1280.                 }  
  1281.             }  
  1282.             else  
  1283.             {  
  1284.                 // 资源不足,关闭与客户的连接即可
      
  1285.                 ::closesocket(pBuffer->sClient);  
  1286.                 pBuffer->sClient = INVALID_SOCKET;  
  1287.             }  
  1288.         }  
  1289.           
  1290.         // Accept请求完成,释放I/O缓冲区
      
  1291.         ReleaseBuffer(pBuffer);  
  1292.         pBuffer = NULL;  
  1293.   
  1294.   
  1295.         // 通知监听线程继续再投递一个Accept请求
      
  1296.         ::InterlockedIncrement(&m_nRepostCount);  
  1297.   
  1298.   
  1299.         ::SetEvent(m_hRepostEvent);  
  1300.     }  
  1301.     else if(pBuffer->nOperation == OP_READ)  
  1302.     {  
  1303.         if(dwTrans == 0)    // 对方关闭套节字
      
  1304.         {  
  1305.             // 先通知用户
      
  1306.             pBuffer->nLen = 0;  
  1307.             OnConnectionClosing(pContext, pBuffer);   
  1308.             // 再关闭连接
      
  1309.             CloseAConnection(pContext);  
  1310.             // 释放客户上下文和缓冲区对象
      
  1311.             if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1312.             {         
  1313.                 ReleaseContext(pContext);  
  1314.                 pContext = NULL;  
  1315.             }  
  1316.             ReleaseBuffer(pBuffer);  
  1317.             pBuffer = NULL;  
  1318.         }  
  1319.         else  
  1320.         {  
  1321.             pBuffer->nLen = dwTrans;  
  1322.             // 按照I/O投递的顺序读取接收到的数据
      
  1323.             CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);  
  1324.             while(p != NULL)  
  1325.             {  
  1326.                 // 通知用户
      
  1327.                 OnReadCompleted(pContext, p);  
  1328.                 // 增加要读的序列号的值
      
  1329.                 ::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);  
  1330.                 // 释放这个已完成的I/O
      
  1331.                 ReleaseBuffer(p);  
  1332.                 p = GetNextReadBuffer(pContext, NULL);  
  1333.             }  
  1334.   
  1335.   
  1336.             if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1337.             {  
  1338.                 ReleaseContext(pContext);  
  1339.                 pContext = NULL;  
  1340.             }  
  1341.             else if(pContext->hTimer != NULL)  
  1342.             {             
  1343.                 ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);//重置监视时间,当一个投递完成后,60s内无任何交互则断开。
      
  1344.             }  
  1345.   
  1346.   
  1347.             // 继续投递一个新的接收请求
      
  1348.          //   pBuffer = AllocateBuffer(BUFFER_SIZE);
      
  1349.             //if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
      
  1350.             //{
      
  1351.             //  CloseAConnection(pContext);
      
  1352.             //}
      
  1353.         }  
  1354.     }  
  1355.     else if(pBuffer->nOperation == OP_WRITE)  
  1356.     {  
  1357.   
  1358.   
  1359.         if(dwTrans == 0)    // 对方关闭套节字
      
  1360.         {  
  1361.             // 先通知用户
      
  1362.             pBuffer->nLen = 0;  
  1363.             OnConnectionClosing(pContext, pBuffer);   
  1364.   
  1365.   
  1366.             // 再关闭连接
      
  1367.             CloseAConnection(pContext);  
  1368.   
  1369.   
  1370.             // 释放客户上下文和缓冲区对象
      
  1371.             if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1372.             {         
  1373.                 ReleaseContext(pContext);  
  1374.                 pContext = NULL;  
  1375.             }  
  1376.             ReleaseBuffer(pBuffer);  
  1377.             pBuffer = NULL;  
  1378.         }  
  1379.         else  
  1380.         {  
  1381.             if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1382.             {  
  1383.                 ReleaseContext(pContext);  
  1384.                 pContext = NULL;  
  1385.                 ReleaseBuffer(pBuffer);  
  1386.                 pBuffer = NULL;  
  1387.                 return;  
  1388.             }  
  1389.             else if(pContext->hTimer != NULL)  
  1390.             {             
  1391.                 ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);  
  1392.             }  
  1393.   
  1394.   
  1395.             // 写操作完成,通知用户
      
  1396.             if(dwTrans < pBuffer->nLen)//如果此send没有发送完全,则发送剩下的部分(此部分如果还是没发完全,这里同样进行)
      
  1397.             {  
  1398.                 printf("send未发送完全,发送:%d,总长度:%d\n",dwTrans,pBuffer->nLen);  
  1399.                 CIOCPBuffer* p = AllocateBuffer(pBuffer->nLen - dwTrans);  
  1400.                 if(p != NULL)  
  1401.                     memcpy(p->buff,pBuffer->buff + dwTrans,pBuffer->nLen - dwTrans);  
  1402.                 if(p == NULL || !PostSend(pContext,p))  
  1403.                 {  
  1404.                     CloseAConnection(pContext);  
  1405.                     return;  
  1406.                 }  
  1407.             }  
  1408.             else  
  1409.             {  
  1410.                 if(!PostNextWriteBuffer(pContext,pBuffer))  
  1411.                 {  
  1412.                     CloseAConnection(pContext);  
  1413.                     return;  
  1414.                 }  
  1415.             }  
  1416.             pBuffer->nLen = dwTrans;  
  1417.             OnWriteCompleted(pContext, pBuffer);  
  1418.             if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)  
  1419.             {  
  1420.                 ReleaseContext(pContext);  
  1421.                 pContext = NULL;  
  1422.             }  
  1423.             // 释放SendText函数申请的缓冲区
      
  1424.             ReleaseBuffer(pBuffer);  
  1425.             pBuffer = NULL;  
  1426.         }  
  1427.     }  
  1428. }  
  1429.   
  1430.   
  1431.   
  1432.   
  1433. BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)  
  1434. {  
  1435.     CIOCPBuffer *pBuffer = AllocateBuffer(nLen);  
  1436.     if(pBuffer != NULL)  
  1437.     {  
  1438.         memcpy(pBuffer->buff, pszText, nLen);  
  1439.         return PostSend(pContext, pBuffer);  
  1440.     }  
  1441.     return FALSE;  
  1442. }  
  1443.   
  1444.   
  1445. //投递接收请求示例
      
  1446. //CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
      
  1447. //if(p != NULL)
      
  1448. //{   
  1449. //  if(!PostRecv(pContext, p))
      
  1450. //  {   
  1451. //      CloseAConnection(pContext);
      
  1452. //  }   
  1453. //}
      
  1454. //投递发送请求示例
      
  1455. //CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
      
  1456. //if(p != NULL)
      
  1457. //{
      
  1458. //  if(!PostSendToList(pContext, p))
      
  1459. //  {
      
  1460. //      CloseAConnection(pContext);
      
  1461. //  }
      
  1462. //}   
  1463. void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  1464. {  
  1465.     //连接建立,且第一次数据接收完成。
      
  1466.     //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
      
  1467. }  
  1468.   
  1469.   
  1470. void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  1471. {  
  1472. }  
  1473.   
  1474.   
  1475. void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  1476. {  
  1477.     //一次数据接收完成。
      
  1478.     //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
      
  1479. }  
  1480.   
  1481.   
  1482. void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)  
  1483. {  
  1484.     //一次数据发送完成。
      
  1485.     //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
      
  1486. }  
  1487.   
  1488.   
  1489. void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)  
  1490. {  
  1491. }  

抱歉!评论已关闭.