http://blog.csdn.net/woshinia/article/details/8585930
部分代码参考《[WINDOWS网络与通信程序设计].王艳平》,网络中一些I/O模型的代码都没有对socket是否可写做过深入研究,我这边会提供一些解决方法。
阻塞模式下,send会发生阻塞(非阻塞模式下send返回WSAEWOULDBLOCK错误,重叠I/O下表现为投递的发送请求一直无法完成)的情况一般可以分为3种 :
1, 服务器虽然发送了大量数据,但客户端并未调用recv函数去接。
2,网络状况不佳,发送缓冲区中的数据一直发不出去。
3,发送数据量很大,如下载功能,协议发送数据的速度比不上send函数将数据拷贝到发送缓冲区的速度。
对于1,2情况,我们似乎可以直接关闭套接字,让客户端重新请求。但对于3,却不行。而且实际操作过程中,我们无法区分是1,2,还是3,我们能做的是尽量去保证发送的正确性。当然防止1情况或者2情况中长时间网络不畅,可以设定超时。若socket一直处于不可写状态超过1分钟,那么就关闭套接字。在最后的IOCP模型中就加入了这种超时机制。其他模型若要加入,可参考它来做。
一,基本的阻塞模型
- #include <WinSock2.h>
- #include <Windows.h>
- #include <stdio.h>
- #pragma comment(lib,"Ws2_32.lib")
- DWORD WINAPI WorkThread(void* param)
- {
- SOCKET* psClient = (SOCKET*)param;
- char buf[4096];
- while(true)
- {
- int len = recv(*psClient,buf,4096,0);
- if(len <= 0)
- {
- printf("recv失败!%d\n",WSAGetLastError());
- Sleep(5000);
- break;
- }
- buf[len] = '\0';
- printf("收到数据:%s\n",buf);
- }
- closesocket(*psClient);
- delete psClient;
- return 0;
- }
- int main()
- {
- WSAData wsaData;
- if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
- {
- printf("WSAStartup失败!\n",WSAGetLastError());
- Sleep(5000);
- return 0;
- }
- USHORT nPort = 3456;
- SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(nPort);
- sin.sin_addr.S_un.S_addr = INADDR_ANY;
- if(SOCKET_ERROR == ::bind(sListen,(sockaddr*)&sin,sizeof(sin)))
- {
- printf("bind失败!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- ::listen(sListen,5);
- while(true)
- {
- sockaddr_in addrRemote;
- int nAddrLen = sizeof(addrRemote);
- SOCKET *psClient = new SOCKET;
- *psClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
- HANDLE hThread = CreateThread(NULL,0,WorkThread,psClient,0,NULL);
- CloseHandle(hThread);
- }
- closesocket(sListen);
- WSACleanup();
- }
#include <WinSock2.h> #include <Windows.h> #include <stdio.h> #pragma comment(lib,"Ws2_32.lib") DWORD WINAPI WorkThread(void* param) { SOCKET* psClient = (SOCKET*)param; char buf[4096]; while(true) { int len = recv(*psClient,buf,4096,0); if(len <= 0) { printf("recv失败!%d\n",WSAGetLastError()); Sleep(5000); break; } buf[len] = '\0'; printf("收到数据:%s\n",buf); } closesocket(*psClient); delete psClient; return 0; } int main() { WSAData wsaData; if(0 != WSAStartup(MAKEWORD(2,2),&wsaData)) { printf("WSAStartup失败!\n",WSAGetLastError()); Sleep(5000); return 0; } USHORT nPort = 3456; SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nPort); sin.sin_addr.S_un.S_addr = INADDR_ANY; if(SOCKET_ERROR == ::bind(sListen,(sockaddr*)&sin,sizeof(sin))) { printf("bind失败!%d\n",WSAGetLastError()); Sleep(5000); return -1; } ::listen(sListen,5); while(true) { sockaddr_in addrRemote; int nAddrLen = sizeof(addrRemote); SOCKET *psClient = new SOCKET; *psClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen); HANDLE hThread = CreateThread(NULL,0,WorkThread,psClient,0,NULL); CloseHandle(hThread); } closesocket(sListen); WSACleanup(); }
二,无任何优化的非阻塞模型
- #include <WinSock2.h>
- #include <Windows.h>
- #include <stdio.h>
- #include <vector>
- using namespace std;
- #pragma comment(lib,"Ws2_32.lib")
- CRITICAL_SECTION g_cs;
- HANDLE g_StartEvent;
- vector<SOCKET> g_vecClients;
- int g_iVecSize = 0;
- DWORD WINAPI WorkThread(void* param)
- {
- char buf[4096];
- while(1)
- {
- if(g_vecClients.empty())
- {
- ResetEvent(g_StartEvent);
- WaitForSingleObject(g_StartEvent,INFINITE);
- }
- EnterCriticalSection(&g_cs);
- for(vector<SOCKET>::iterator it = g_vecClients.begin();it != g_vecClients.end();)
- {
- int len = recv(*it,buf,4096,0);
- if(len == SOCKET_ERROR)
- {
- if(WSAEWOULDBLOCK != WSAGetLastError())
- {
- printf("recv Error:%d\n",WSAGetLastError());
- closesocket(*it);
- it = g_vecClients.erase(it);
- }
- else
- {
- printf("%d.",*it);
- ++it;
- }
- }
- else
- {
- buf[len] = 0;
- printf("收到数据: %s\n",buf);
- ++it;
- }
- }
- LeaveCriticalSection(&g_cs);
- Sleep(100);
- }
- return 0;
- }
- int main()
- {
- InitializeCriticalSectionAndSpinCount(&g_cs,4000);
- g_StartEvent = CreateEvent(NULL,FALSE,FALSE,NULL);
- WSAData wsaDate;
- WSAStartup(MAKEWORD(2,2),&wsaDate);
- USHORT nport = 3456;
- u_long ul = 1;
- SOCKET s = socket(AF_INET,SOCK_STREAM,0);
- ioctlsocket(s,FIONBIO,&ul);
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(nport);
- sin.sin_addr.S_un.S_addr = ADDR_ANY;
- if(SOCKET_ERROR == ::bind(s,(sockaddr*)&sin,sizeof(sin)))
- {
- return -1;
- }
- ::listen(s,5);
- HANDLE hThread = CreateThread(NULL,0,WorkThread,NULL,0,NULL);
- CloseHandle(hThread);
- while(true)
- {
- sockaddr_in addrRemote;
- int nAddrLen = sizeof(addrRemote);
- SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
- if(sClient != SOCKET_ERROR)
- {
- EnterCriticalSection(&g_cs);
- g_vecClients.push_back(sClient);
- LeaveCriticalSection(&g_cs);
- if(g_vecClients.size() == 1)
- SetEvent(g_StartEvent);
- }
- else if(WSAEWOULDBLOCK == WSAGetLastError())
- {
- printf(".");
- Sleep(100);
- }
- else
- {
- printf("accept failed! %d\n",WSAGetLastError());
- }
- }
- closesocket(s);
- WSACleanup();
- CloseHandle(g_StartEvent);
- DeleteCriticalSection(&g_cs);
- }
#include <WinSock2.h> #include <Windows.h> #include <stdio.h> #include <vector> using namespace std; #pragma comment(lib,"Ws2_32.lib") CRITICAL_SECTION g_cs; HANDLE g_StartEvent; vector<SOCKET> g_vecClients; int g_iVecSize = 0; DWORD WINAPI WorkThread(void* param) { char buf[4096]; while(1) { if(g_vecClients.empty()) { ResetEvent(g_StartEvent); WaitForSingleObject(g_StartEvent,INFINITE); } EnterCriticalSection(&g_cs); for(vector<SOCKET>::iterator it = g_vecClients.begin();it != g_vecClients.end();) { int len = recv(*it,buf,4096,0); if(len == SOCKET_ERROR) { if(WSAEWOULDBLOCK != WSAGetLastError()) { printf("recv Error:%d\n",WSAGetLastError()); closesocket(*it); it = g_vecClients.erase(it); } else { printf("%d.",*it); ++it; } } else { buf[len] = 0; printf("收到数据: %s\n",buf); ++it; } } LeaveCriticalSection(&g_cs); Sleep(100); } return 0; } int main() { InitializeCriticalSectionAndSpinCount(&g_cs,4000); g_StartEvent = CreateEvent(NULL,FALSE,FALSE,NULL); WSAData wsaDate; WSAStartup(MAKEWORD(2,2),&wsaDate); USHORT nport = 3456; u_long ul = 1; SOCKET s = socket(AF_INET,SOCK_STREAM,0); ioctlsocket(s,FIONBIO,&ul); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nport); sin.sin_addr.S_un.S_addr = ADDR_ANY; if(SOCKET_ERROR == ::bind(s,(sockaddr*)&sin,sizeof(sin))) { return -1; } ::listen(s,5); HANDLE hThread = CreateThread(NULL,0,WorkThread,NULL,0,NULL); CloseHandle(hThread); while(true) { sockaddr_in addrRemote; int nAddrLen = sizeof(addrRemote); SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen); if(sClient != SOCKET_ERROR) { EnterCriticalSection(&g_cs); g_vecClients.push_back(sClient); LeaveCriticalSection(&g_cs); if(g_vecClients.size() == 1) SetEvent(g_StartEvent); } else if(WSAEWOULDBLOCK == WSAGetLastError()) { printf("."); Sleep(100); } else { printf("accept failed! %d\n",WSAGetLastError()); } } closesocket(s); WSACleanup(); CloseHandle(g_StartEvent); DeleteCriticalSection(&g_cs); }
三,select模型
- #include <WinSock2.h>
- #include <Windows.h>
- #include <MSWSock.h>
- #include <stdio.h>
- #include <map>
- using namespace std;
- #pragma comment(lib,"Ws2_32.lib")
- #pragma comment(lib,"Mswsock.lib")
- struct ThreadObj{
- OVERLAPPED *pOl;
- HANDLE s;
- };
- int g_iIndex = 0;
- map<SOCKET,char*> g_map;
- int main()
- {
- WSAData wsaData;
- if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
- {
- printf("初始化失败!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- USHORT nport = 3456;
- SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- u_long ul = 1;
- ioctlsocket(sListen,FIONBIO,&ul);
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(nport);
- sin.sin_addr.S_un.S_addr = ADDR_ANY;
- if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
- {
- printf("bind failed!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- listen(sListen,5);
- //1)初始化一个套接字集合fdSocket,并将监听套接字放入
- fd_set fdSocket;
- FD_ZERO(&fdSocket);
- FD_SET(sListen,&fdSocket);
- TIMEVAL time={1,0};
- char buf[4096];
- fd_set fdWrite;
- FD_ZERO(&fdWrite);
- while(true)
- {
- //2)将fdSocket的一个拷贝fdRead传给select函数
- fd_set fdRead = fdSocket;
- fd_set fdTmp = fdWrite;
- int nRetAll = 0;
- if(fdTmp.fd_count > 0)
- nRetAll = select(0,&fdRead,&fdTmp,NULL,NULL/*&time*/);//若不设置超时则select为阻塞
- else
- nRetAll = select(0,&fdRead,NULL,NULL,NULL/*&time*/);
- if(nRetAll > 0)
- {
- //3)通过将原来的fdSocket和被select处理过的fdRead进行比较,决定由哪些socket有数据可以读取
- for(int i=0;i<fdSocket.fd_count;i++)
- {
- if(FD_ISSET(fdSocket.fd_array[i],&fdRead))
- {
- if(fdSocket.fd_array[i] == sListen)
- {
- if(fdSocket.fd_count < FD_SETSIZE)
- {
- sockaddr_in addrRemote;
- int nAddrLen = sizeof(addrRemote);
- SOCKET sClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen);
- FD_SET(sClient,&fdSocket);
- printf("接收到连接:(%s)\n",inet_ntoa(addrRemote.sin_addr));
- }
- else
- {
- printf("连接数量已达上限!\n");
- continue;
- }
- }
- else
- {
- int nRecv = recv(fdSocket.fd_array[i],buf,4096,0);
- if(nRecv > 0)
- {
- buf[nRecv] = 0;
- printf("收到数据:%s\n",buf);
- int nRet = send(fdSocket.fd_array[i],buf,nRecv,0);
- if(nRet <= 0)
- {
- SOCKET s = fdSocket.fd_array[i];
- if(GetLastError() == WSAEWOULDBLOCK)
- {
- if(g_map.find(s) == g_map.end())
- {
- char* szTmp = new char[nRecv + 1];
- strncpy(szTmp,buf,nRecv);
- szTmp[nRecv] = 0;
- g_map[s] = szTmp;
- }
- else
- {
- char* szOld = g_map[s];
- char* szTmp2 = new char[strlen(szOld) + nRecv + 1];
- strncpy(szTmp2,szOld,strlen(szOld));
- strncpy(szTmp2 + strlen(szOld),buf,nRecv);
- szTmp2[strlen(szOld) + nRecv] = 0;
- delete [] szOld;
- g_map[s] = szTmp2;
- }
- FD_SET(fdSocket.fd_array[i],&fdWrite);
- }
- else
- {
- closesocket(fdSocket.fd_array[i]);
- if(g_map.find(s) != g_map.end())
- {
- if(g_map[s] != NULL)
- delete [] g_map[s];
- g_map.erase(s);
- }
- FD_CLR(fdSocket.fd_array[i],&fdSocket);
- }
- }
- printf("发送了%d\n",nRet);
- }
- else
- {
- printf("1个Client已断开\n");
- closesocket(fdSocket.fd_array[i]);
- FD_CLR(fdSocket.fd_array[i],&fdSocket);
- }
- }
- }
- if(FD_ISSET(fdSocket.fd_array[i],&fdTmp))
- {
- SOCKET s = fdSocket.fd_array[i];
- if(g_map.find(s) != g_map.end())
- {
- char* szToSend = g_map[s];
- int nToSend = strlen(szToSend);
- int nRet = send(fdSocket.fd_array[i],szToSend,nToSend,0);
- if(nRet <= 0)
- {
- if(GetLastError() == WSAEWOULDBLOCK)
- {
- //do nothing
- }
- else
- {
- closesocket(fdSocket.fd_array[i]);
- if(g_map.find(s) != g_map.end())
- {
- if(g_map[s] != NULL)
- delete [] g_map[s];
- g_map.erase(s);
- }
- FD_CLR(fdSocket.fd_array[i],&fdSocket);
- }
- }
- else if(nRet < nToSend)
- {
- printf("发送了%d/%d\n",nRet,nToSend);
- nToSend -= nRet;
- char* szTmp = new char[nToSend + 1];
- strncpy(szTmp,szToSend + nRet,nToSend);
- szTmp[nToSend] = 0;
- delete [] szToSend;
- g_map[s] = szTmp;
- }
- else
- {
- if(g_map[s] != NULL)
- delete [] g_map[s];
- g_map.erase(s);
- FD_CLR(fdSocket.fd_array[i],&fdWrite);
- }
- printf("============================================发送了%d\n",nRet);
- }
- }
- }
- }
- else if(nRetAll == 0)
- {
- printf("time out!\n");
- }
- else
- {
- printf("select error!%d\n",WSAGetLastError());
- Sleep(5000);
- break;
- }
- }
- closesocket(sListen);
- WSACleanup();
- }
#include <WinSock2.h> #include <Windows.h> #include <MSWSock.h> #include <stdio.h> #include <map> using namespace std; #pragma comment(lib,"Ws2_32.lib") #pragma comment(lib,"Mswsock.lib") struct ThreadObj{ OVERLAPPED *pOl; HANDLE s; }; int g_iIndex = 0; map<SOCKET,char*> g_map; int main() { WSAData wsaData; if(0 != WSAStartup(MAKEWORD(2,2),&wsaData)) { printf("初始化失败!%d\n",WSAGetLastError()); Sleep(5000); return -1; } USHORT nport = 3456; SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); u_long ul = 1; ioctlsocket(sListen,FIONBIO,&ul); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nport); sin.sin_addr.S_un.S_addr = ADDR_ANY; if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin))) { printf("bind failed!%d\n",WSAGetLastError()); Sleep(5000); return -1; } listen(sListen,5); //1)初始化一个套接字集合fdSocket,并将监听套接字放入 fd_set fdSocket; FD_ZERO(&fdSocket); FD_SET(sListen,&fdSocket); TIMEVAL time={1,0}; char buf[4096]; fd_set fdWrite; FD_ZERO(&fdWrite); while(true) { //2)将fdSocket的一个拷贝fdRead传给select函数 fd_set fdRead = fdSocket; fd_set fdTmp = fdWrite; int nRetAll = 0; if(fdTmp.fd_count > 0) nRetAll = select(0,&fdRead,&fdTmp,NULL,NULL/*&time*/);//若不设置超时则select为阻塞 else nRetAll = select(0,&fdRead,NULL,NULL,NULL/*&time*/); if(nRetAll > 0) { //3)通过将原来的fdSocket和被select处理过的fdRead进行比较,决定由哪些socket有数据可以读取 for(int i=0;i<fdSocket.fd_count;i++) { if(FD_ISSET(fdSocket.fd_array[i],&fdRead)) { if(fdSocket.fd_array[i] == sListen) { if(fdSocket.fd_count < FD_SETSIZE) { sockaddr_in addrRemote; int nAddrLen = sizeof(addrRemote); SOCKET sClient = accept(sListen,(sockaddr*)&addrRemote,&nAddrLen); FD_SET(sClient,&fdSocket); printf("接收到连接:(%s)\n",inet_ntoa(addrRemote.sin_addr)); } else { printf("连接数量已达上限!\n"); continue; } } else { int nRecv = recv(fdSocket.fd_array[i],buf,4096,0); if(nRecv > 0) { buf[nRecv] = 0; printf("收到数据:%s\n",buf); int nRet = send(fdSocket.fd_array[i],buf,nRecv,0); if(nRet <= 0) { SOCKET s = fdSocket.fd_array[i]; if(GetLastError() == WSAEWOULDBLOCK) { if(g_map.find(s) == g_map.end()) { char* szTmp = new char[nRecv + 1]; strncpy(szTmp,buf,nRecv); szTmp[nRecv] = 0; g_map[s] = szTmp; } else { char* szOld = g_map[s]; char* szTmp2 = new char[strlen(szOld) + nRecv + 1]; strncpy(szTmp2,szOld,strlen(szOld)); strncpy(szTmp2 + strlen(szOld),buf,nRecv); szTmp2[strlen(szOld) + nRecv] = 0; delete [] szOld; g_map[s] = szTmp2; } FD_SET(fdSocket.fd_array[i],&fdWrite); } else { closesocket(fdSocket.fd_array[i]); if(g_map.find(s) != g_map.end()) { if(g_map[s] != NULL) delete [] g_map[s]; g_map.erase(s); } FD_CLR(fdSocket.fd_array[i],&fdSocket); } } printf("发送了%d\n",nRet); } else { printf("1个Client已断开\n"); closesocket(fdSocket.fd_array[i]); FD_CLR(fdSocket.fd_array[i],&fdSocket); } } } if(FD_ISSET(fdSocket.fd_array[i],&fdTmp)) { SOCKET s = fdSocket.fd_array[i]; if(g_map.find(s) != g_map.end()) { char* szToSend = g_map[s]; int nToSend = strlen(szToSend); int nRet = send(fdSocket.fd_array[i],szToSend,nToSend,0); if(nRet <= 0) { if(GetLastError() == WSAEWOULDBLOCK) { //do nothing } else { closesocket(fdSocket.fd_array[i]); if(g_map.find(s) != g_map.end()) { if(g_map[s] != NULL) delete [] g_map[s]; g_map.erase(s); } FD_CLR(fdSocket.fd_array[i],&fdSocket); } } else if(nRet < nToSend) { printf("发送了%d/%d\n",nRet,nToSend); nToSend -= nRet; char* szTmp = new char[nToSend + 1]; strncpy(szTmp,szToSend + nRet,nToSend); szTmp[nToSend] = 0; delete [] szToSend; g_map[s] = szTmp; } else { if(g_map[s] != NULL) delete [] g_map[s]; g_map.erase(s); FD_CLR(fdSocket.fd_array[i],&fdWrite); } printf("============================================发送了%d\n",nRet); } } } } else if(nRetAll == 0) { printf("time out!\n"); } else { printf("select error!%d\n",WSAGetLastError()); Sleep(5000); break; } } closesocket(sListen); WSACleanup(); }
四,异步选择模型
注意:收到FD_Write消息有2种情况:1,在socket第一次和窗口句柄绑定后。2,socket从不可写状态变成可写状态。下面的事件选择模型也是同理。
- #include <WinSock2.h>
- #include <Windows.h>
- #include <stdio.h>
- #include <map>
- using namespace std;
- #pragma comment(lib,"Ws2_32.lib")
- #define WM_SOCKET (WM_USER + 100)
- map<SOCKET,char*> g_map;
- LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam)
- {
- switch(uMsg)
- {
- case WM_SOCKET:
- {
- SOCKET s = wParam;
- if(WSAGETSELECTERROR(lParam))
- {
- printf("消息错误!\n");
- closesocket(s);
- return 0;
- }
- switch(WSAGETSELECTEVENT(lParam))
- {
- case FD_ACCEPT:
- {
- sockaddr_in addrRemote;
- int nAddrLen = sizeof(addrRemote);
- SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen);
- WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE);
- }break;
- case FD_WRITE:
- {
- printf("write====================\n");
- if(!g_map.empty())
- {
- char* buf = g_map[s];
- int nLenth = strlen(buf);
- while(nLenth > 0)
- {
- int nRet = send(s,buf,nLenth,0);
- if(nRet > 0)
- {
- buf += nRet;
- nLenth -= nRet;
- }
- else if(10035 == GetLastError())
- {
- char* newBuf = new char[nLenth + 1];
- strncpy(newBuf,buf,nLenth);
- newBuf[nLenth] = 0;
- delete [] g_map[s];
- g_map[s] = newBuf;
- break;
- }
- else
- {
- delete [] g_map[s];
- g_map.erase(s);
- closesocket(s);
- }
- }
- if(nLenth == 0)
- {
- g_map.erase(s);
- }
- }
- }break;
- case FD_READ:
- {
- char buf[4096];
- int nRet = recv(s,buf,4096,0);
- if(nRet > 0)
- {
- buf[nRet] = 0;
- //printf("收到数据:%s\n",buf);
- int x = send(s,buf,nRet,0);
- printf("已发送字节数:%d , 线程号:%d\n",x,GetCurrentThreadId());
- if(x < 0)
- {
- int iError = GetLastError();
- printf("数据:%s ,错误:%d\n",buf,iError);
- if(10035 == iError)
- {
- if(g_map.end() != g_map.find(s))
- {
- int newLength = strlen(g_map[s]) + strlen(buf);
- char* newBuf = new char[newLength + 1];
- strncpy(newBuf,g_map[s],strlen(g_map[s]));
- strncpy(newBuf+strlen(g_map[s]),buf,strlen(buf));
- newBuf[newLength] = 0;
- delete [] g_map[s];
- g_map[s] = newBuf;
- }
- else
- {
- char* newBuf = new char[strlen(buf) + 1];
- strncpy(newBuf,buf,strlen(buf));
- newBuf[strlen(buf)] = 0;
- g_map[s] = newBuf;
- }
- }
- else
- {
- if(g_map.end() != g_map.find(s))
- {
- delete [] g_map[s];
- g_map.erase(s);
- }
- closesocket(s);
- }
- }
- }
- else
- {
- printf("1个Client已经断开1111!\n");
- if(g_map.end() != g_map.find(s))
- {
- delete [] g_map[s];
- g_map.erase(s);
- }
- closesocket(s);
- }
- }break;
- case FD_CLOSE:
- {
- printf("1个Client已经断开222!\n");
- if(g_map.end() != g_map.find(s))
- {
- delete [] g_map[s];
- g_map.erase(s);
- }
- closesocket(s);
- }break;
- }
- }break;
- case WM_DESTROY:
- {
- printf("窗口已关闭!\n");
- PostQuitMessage(0);
- }
- }
- return DefWindowProc(hwnd,uMsg,wParam,lParam);
- }
- int main()
- {
- char szClassName[] = "WSAAsyncSelect Test";
- static WNDCLASSEX wndClass;
- wndClass.cbSize = sizeof(wndClass);
- wndClass.style = CS_HREDRAW | CS_VREDRAW;
- wndClass.lpfnWndProc = WindowProc;
- wndClass.cbClsExtra = 0;
- wndClass.cbWndExtra = 0;
- wndClass.hInstance = GetModuleHandle(0);
- wndClass.hIcon = LoadIcon(NULL,IDI_APPLICATION);
- wndClass.hCursor = LoadCursor(NULL,IDC_ARROW);
- wndClass.hbrBackground = (HBRUSH)GetStockObject(WHITE_BRUSH);
- wndClass.lpszMenuName = NULL;
- wndClass.lpszClassName = szClassName;
- wndClass.hIconSm = NULL;
- ATOM atom = RegisterClassEx(&wndClass);
- if(0 == atom)
- {
- char error[256];
- sprintf(error,"RegisterClassEx错误!%d",GetLastError());
- MessageBox(NULL,error,"error",MB_OK);
- return -1;
- }
- HWND hwnd = CreateWindowEx(0,(char *)atom,"",WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT,
- CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL);
- if(hwnd == NULL)
- {
- char error[256];
- sprintf(error,"创建窗口错误!%d",GetLastError());
- MessageBox(NULL,error,"error",MB_OK);
- return -1;
- }
- WSAData wsaData;
- if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
- {
- printf("初始化失败!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- USHORT nport = 3456;
- SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(nport);
- sin.sin_addr.S_un.S_addr = ADDR_ANY;
- if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
- {
- printf("bind failed!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE);
- listen(sListen,5);
- MSG msg;
- while(GetMessage(&msg,NULL,0,0))
- {
- TranslateMessage(&msg);
- DispatchMessage(&msg);
- }
- closesocket(sListen);
- WSACleanup();
- return msg.wParam;
- }
#include <WinSock2.h> #include <Windows.h> #include <stdio.h> #include <map> using namespace std; #pragma comment(lib,"Ws2_32.lib") #define WM_SOCKET (WM_USER + 100) map<SOCKET,char*> g_map; LRESULT WINAPI WindowProc(HWND hwnd,UINT uMsg,WPARAM wParam,LPARAM lParam) { switch(uMsg) { case WM_SOCKET: { SOCKET s = wParam; if(WSAGETSELECTERROR(lParam)) { printf("消息错误!\n"); closesocket(s); return 0; } switch(WSAGETSELECTEVENT(lParam)) { case FD_ACCEPT: { sockaddr_in addrRemote; int nAddrLen = sizeof(addrRemote); SOCKET sClient = accept(s,(sockaddr*)&addrRemote,&nAddrLen); WSAAsyncSelect(sClient,hwnd,WM_SOCKET,FD_READ | FD_WRITE | FD_CLOSE); }break; case FD_WRITE: { printf("write====================\n"); if(!g_map.empty()) { char* buf = g_map[s]; int nLenth = strlen(buf); while(nLenth > 0) { int nRet = send(s,buf,nLenth,0); if(nRet > 0) { buf += nRet; nLenth -= nRet; } else if(10035 == GetLastError()) { char* newBuf = new char[nLenth + 1]; strncpy(newBuf,buf,nLenth); newBuf[nLenth] = 0; delete [] g_map[s]; g_map[s] = newBuf; break; } else { delete [] g_map[s]; g_map.erase(s); closesocket(s); } } if(nLenth == 0) { g_map.erase(s); } } }break; case FD_READ: { char buf[4096]; int nRet = recv(s,buf,4096,0); if(nRet > 0) { buf[nRet] = 0; //printf("收到数据:%s\n",buf); int x = send(s,buf,nRet,0); printf("已发送字节数:%d , 线程号:%d\n",x,GetCurrentThreadId()); if(x < 0) { int iError = GetLastError(); printf("数据:%s ,错误:%d\n",buf,iError); if(10035 == iError) { if(g_map.end() != g_map.find(s)) { int newLength = strlen(g_map[s]) + strlen(buf); char* newBuf = new char[newLength + 1]; strncpy(newBuf,g_map[s],strlen(g_map[s])); strncpy(newBuf+strlen(g_map[s]),buf,strlen(buf)); newBuf[newLength] = 0; delete [] g_map[s]; g_map[s] = newBuf; } else { char* newBuf = new char[strlen(buf) + 1]; strncpy(newBuf,buf,strlen(buf)); newBuf[strlen(buf)] = 0; g_map[s] = newBuf; } } else { if(g_map.end() != g_map.find(s)) { delete [] g_map[s]; g_map.erase(s); } closesocket(s); } } } else { printf("1个Client已经断开1111!\n"); if(g_map.end() != g_map.find(s)) { delete [] g_map[s]; g_map.erase(s); } closesocket(s); } }break; case FD_CLOSE: { printf("1个Client已经断开222!\n"); if(g_map.end() != g_map.find(s)) { delete [] g_map[s]; g_map.erase(s); } closesocket(s); }break; } }break; case WM_DESTROY: { printf("窗口已关闭!\n"); PostQuitMessage(0); } } return DefWindowProc(hwnd,uMsg,wParam,lParam); } int main() { char szClassName[] = "WSAAsyncSelect Test"; static WNDCLASSEX wndClass; wndClass.cbSize = sizeof(wndClass); wndClass.style = CS_HREDRAW | CS_VREDRAW; wndClass.lpfnWndProc = WindowProc; wndClass.cbClsExtra = 0; wndClass.cbWndExtra = 0; wndClass.hInstance = GetModuleHandle(0); wndClass.hIcon = LoadIcon(NULL,IDI_APPLICATION); wndClass.hCursor = LoadCursor(NULL,IDC_ARROW); wndClass.hbrBackground = (HBRUSH)GetStockObject(WHITE_BRUSH); wndClass.lpszMenuName = NULL; wndClass.lpszClassName = szClassName; wndClass.hIconSm = NULL; ATOM atom = RegisterClassEx(&wndClass); if(0 == atom) { char error[256]; sprintf(error,"RegisterClassEx错误!%d",GetLastError()); MessageBox(NULL,error,"error",MB_OK); return -1; } HWND hwnd = CreateWindowEx(0,(char *)atom,"",WS_OVERLAPPEDWINDOW,CW_USEDEFAULT,CW_USEDEFAULT, CW_USEDEFAULT,CW_USEDEFAULT,HWND_MESSAGE,NULL,NULL,NULL); if(hwnd == NULL) { char error[256]; sprintf(error,"创建窗口错误!%d",GetLastError()); MessageBox(NULL,error,"error",MB_OK); return -1; } WSAData wsaData; if(0 != WSAStartup(MAKEWORD(2,2),&wsaData)) { printf("初始化失败!%d\n",WSAGetLastError()); Sleep(5000); return -1; } USHORT nport = 3456; SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nport); sin.sin_addr.S_un.S_addr = ADDR_ANY; if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin))) { printf("bind failed!%d\n",WSAGetLastError()); Sleep(5000); return -1; } WSAAsyncSelect(sListen,hwnd,WM_SOCKET,FD_ACCEPT | FD_CLOSE); listen(sListen,5); MSG msg; while(GetMessage(&msg,NULL,0,0)) { TranslateMessage(&msg); DispatchMessage(&msg); } closesocket(sListen); WSACleanup(); return msg.wParam; }
五,事件选择模型
事件选择模型主要难点是对线程池的使用,send操作可以参考异步选择模型。
- #include <WinSock2.h>
- #include <Windows.h>
- #include <stdio.h>
- #include <vector>
- using namespace std;
- #pragma comment(lib,"Ws2_32.lib")
- typedef struct _THREAD_OBJ
- {
- HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
- SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS];
- int nSocksUsed;
- CRITICAL_SECTION cs;
- _THREAD_OBJ *pNext;
- }THREAD_OBJ,*PTHREAD_OBJ;
- PTHREAD_OBJ g_pThreadList = NULL;
- CRITICAL_SECTION g_cs;
- BOOL g_bServerRunning = FALSE;
- HANDLE g_hThreads[1000] = {0};
- int g_nThreadsCount = 0;
- PTHREAD_OBJ CreateThreadObj()
- {
- PTHREAD_OBJ pThread = new THREAD_OBJ();
- if(pThread != NULL)
- {
- InitializeCriticalSectionAndSpinCount(&pThread->cs,4000);
- pThread->events[0] = WSACreateEvent();
- pThread->nSocksUsed = 1;
- EnterCriticalSection(&g_cs);
- pThread->pNext = g_pThreadList;
- g_pThreadList = pThread;
- LeaveCriticalSection(&g_cs);
- }
- return pThread;
- }
- void FreeThreadObj(PTHREAD_OBJ pThread)
- {
- if(pThread == NULL)
- return;
- EnterCriticalSection(&g_cs);
- PTHREAD_OBJ p = g_pThreadList;
- if(p == pThread)
- {
- g_pThreadList = p->pNext;
- }
- else
- {
- while(p != NULL && p->pNext != pThread)
- {
- p = p->pNext;
- }
- if(p != NULL)
- {
- p->pNext = pThread->pNext;
- }
- }
- LeaveCriticalSection(&g_cs);
- DeleteCriticalSection(&pThread->cs);
- WSACloseEvent(pThread->events[0]);
- delete pThread;
- }
- LONG g_nTotalConnections;
- LONG g_nCurrentConnections;
- BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s)
- {
- if(pThread == NULL || s == INVALID_SOCKET)
- return FALSE;
- BOOL bRet = FALSE;
- EnterCriticalSection(&pThread->cs);
- if(pThread->nSocksUsed < WSA_MAXIMUM_WAIT_EVENTS)
- {
- pThread->events[pThread->nSocksUsed] = WSACreateEvent();
- pThread->sockets[pThread->nSocksUsed] = s;
- WSAEventSelect(s,pThread->events[pThread->nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE);
- pThread->nSocksUsed++;
- bRet = TRUE;
- WSASetEvent(pThread->events[0]);//通知线程,有新的事件加入了,需要重新调用WSAWaitFormultipleEvents
- }
- LeaveCriticalSection(&pThread->cs);
- if(bRet)
- {
- InterlockedIncrement(&g_nTotalConnections);
- InterlockedIncrement(&g_nCurrentConnections);
- }
- return bRet;
- }
- void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s)
- {
- if(pThread == NULL || s == INVALID_SOCKET)
- return;
- EnterCriticalSection(&pThread->cs);
- for(int i=1;i<pThread->nSocksUsed;i++)
- {
- if(pThread->sockets[i] == s)
- {
- WSACloseEvent(pThread->events[i]);
- closesocket(s);
- for(int j=i;j<pThread->nSocksUsed - 1;j++)
- {
- pThread->events[j] = pThread->events[j+1];
- pThread->sockets[j] = pThread->sockets[j+1];
- }
- pThread->nSocksUsed--;
- break;
- }
- }
- LeaveCriticalSection(&pThread->cs);
- InterlockedDecrement(&g_nCurrentConnections);
- }
- BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex)
- {
- WSANETWORKEVENTS event;
- SOCKET s = pThread->sockets[nIndex];
- HANDLE sEvent = pThread->events[nIndex];
- if(0 != WSAEnumNetworkEvents(s,sEvent,&event))
- {
- printf("socket error!\n");
- RemoveSocket(pThread,s);
- return FALSE;
- }
- do
- {
- if(event.lNetworkEvents & FD_READ)
- {
- if(event.iErrorCode[FD_READ_BIT] == 0)
- {
- char szText[256];
- int nRecv = recv(s,szText,strlen(szText),0);
- if(nRecv > 0)
- {
- szText[nRecv] = '\0';
- printf("接收到数据:%s\n",szText);
- }
- else
- {
- break;
- }
- }
- else
- break;
- }
- else if(event.lNetworkEvents & FD_CLOSE)
- {
- break;
- }
- else if(event.lNetworkEvents & FD_WRITE)
- {
- printf("FD_WRITE==========================\n");
- }
- return TRUE;
- } while (FALSE);
- printf("socket error2!\n");
- RemoveSocket(pThread,s);
- return FALSE;
- }
- DWORD WINAPI ServerThread(LPVOID lpParam)
- {
- PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;
- while(TRUE)
- {
- int nIndex = WSAWaitForMultipleEvents(
- pThread->nSocksUsed,pThread->events,FALSE,WSA_INFINITE,FALSE);
- nIndex = nIndex - WSA_WAIT_EVENT_0;
- if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)
- {
- printf("WSAWaitForMultipleEvents error!\n");
- continue;
- }
- else if(nIndex == 0)
- {
- ResetEvent(pThread->events[0]);
- }
- else
- {
- HandleIo(pThread,nIndex);
- }
- if(!g_bServerRunning && pThread->nSocksUsed == 1)
- break;
- }
- FreeThreadObj(pThread);
- return 0;
- }
- BOOL AssignToFreeThread(SOCKET s)
- {
- if(s == INVALID_SOCKET)
- return FALSE;
- BOOL bAllSucceed = TRUE;
- EnterCriticalSection(&g_cs);
- PTHREAD_OBJ pThread = g_pThreadList;
- while(pThread != NULL)
- {
- if(InsertSocket(pThread,s))
- {
- break;
- }
- pThread = pThread->pNext;
- }
- if(pThread == NULL)
- {
- if(g_nThreadsCount < 1000)
- {
- pThread = CreateThreadObj();
- HANDLE hThread = CreateThread(NULL,0,ServerThread,pThread,0,NULL);
- if(!hThread)
- {
- bAllSucceed = FALSE;
- FreeThreadObj(pThread);
- }
- else
- {
- g_hThreads[g_nThreadsCount++] = hThread;
- InsertSocket(pThread,s);
- }
- }
- else
- bAllSucceed = FALSE;
- }
- LeaveCriticalSection(&g_cs);
- return bAllSucceed;
- }
- DWORD WINAPI ControlThread(LPVOID lpParma)
- {
- HANDLE wsaEvent = (HANDLE)lpParma;
- char cmd[128];
- while(scanf("%s",cmd))
- {
- if(cmd[0] == 's')
- {
- g_bServerRunning = FALSE;
- EnterCriticalSection(&g_cs);
- PTHREAD_OBJ pThread = g_pThreadList;
- while(pThread != NULL)
- {
- EnterCriticalSection(&pThread->cs);
- for(int i=0;i<pThread->nSocksUsed;i++)
- {
- closesocket(pThread->sockets[i]);
- }
- WSASetEvent(pThread->events[0]);
- LeaveCriticalSection(&pThread->cs);
- pThread = pThread->pNext;
- }
- LeaveCriticalSection(&g_cs);
- WSASetEvent(wsaEvent);
- break;
- }
- }
- return 0;
- }
- int main()
- {
- WSAData wsaData;
- if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
- {
- printf("初始化失败!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- USHORT nport = 3456;
- SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(nport);
- sin.sin_addr.S_un.S_addr = ADDR_ANY;
- if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
- {
- printf("bind failed!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- listen(sListen,200);
- WSAEVENT wsaEvent = WSACreateEvent();
- WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE);
- InitializeCriticalSectionAndSpinCount(&g_cs,4000);
- g_bServerRunning = TRUE;
- HANDLE hThread = CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL);
- CloseHandle(hThread);
- while(TRUE)
- {
- int nRet = WaitForSingleObject(wsaEvent,5*1000);
- if(!g_bServerRunning)
- {
- closesocket(sListen);
- WSACloseEvent(wsaEvent);
- WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE);
- for(int i=0;i<g_nThreadsCount;i++)
- {
- CloseHandle(g_hThreads[i]);
- }
- break;
- }
- if(nRet == WAIT_FAILED)
- {
- printf("WaitForSingleObject Failed!\n");
- break;
- }
- else if(nRet == WAIT_TIMEOUT)
- {
- printf("\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n",
- g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount);
- continue;
- }
- else
- {
- ResetEvent(wsaEvent);
- while(TRUE)
- {
- sockaddr_in addrRemote;
- int nLen = sizeof(addrRemote);
- SOCKET sNew = accept(sListen,(sockaddr*)&addrRemote,&nLen);
- if(sNew == SOCKET_ERROR)
- break;
- if(!AssignToFreeThread(sNew))
- {
- closesocket(sNew);
- printf("AssignToFreeThread Failed!\n");
- }
- }
- }
- }
- DeleteCriticalSection(&g_cs);
- return 0;
- }
#include <WinSock2.h> #include <Windows.h> #include <stdio.h> #include <vector> using namespace std; #pragma comment(lib,"Ws2_32.lib") typedef struct _THREAD_OBJ { HANDLE events[WSA_MAXIMUM_WAIT_EVENTS]; SOCKET sockets[WSA_MAXIMUM_WAIT_EVENTS]; int nSocksUsed; CRITICAL_SECTION cs; _THREAD_OBJ *pNext; }THREAD_OBJ,*PTHREAD_OBJ; PTHREAD_OBJ g_pThreadList = NULL; CRITICAL_SECTION g_cs; BOOL g_bServerRunning = FALSE; HANDLE g_hThreads[1000] = {0}; int g_nThreadsCount = 0; PTHREAD_OBJ CreateThreadObj() { PTHREAD_OBJ pThread = new THREAD_OBJ(); if(pThread != NULL) { InitializeCriticalSectionAndSpinCount(&pThread->cs,4000); pThread->events[0] = WSACreateEvent(); pThread->nSocksUsed = 1; EnterCriticalSection(&g_cs); pThread->pNext = g_pThreadList; g_pThreadList = pThread; LeaveCriticalSection(&g_cs); } return pThread; } void FreeThreadObj(PTHREAD_OBJ pThread) { if(pThread == NULL) return; EnterCriticalSection(&g_cs); PTHREAD_OBJ p = g_pThreadList; if(p == pThread) { g_pThreadList = p->pNext; } else { while(p != NULL && p->pNext != pThread) { p = p->pNext; } if(p != NULL) { p->pNext = pThread->pNext; } } LeaveCriticalSection(&g_cs); DeleteCriticalSection(&pThread->cs); WSACloseEvent(pThread->events[0]); delete pThread; } LONG g_nTotalConnections; LONG g_nCurrentConnections; BOOL InsertSocket(PTHREAD_OBJ pThread,SOCKET s) { if(pThread == NULL || s == INVALID_SOCKET) return FALSE; BOOL bRet = FALSE; EnterCriticalSection(&pThread->cs); if(pThread->nSocksUsed < WSA_MAXIMUM_WAIT_EVENTS) { pThread->events[pThread->nSocksUsed] = WSACreateEvent(); pThread->sockets[pThread->nSocksUsed] = s; WSAEventSelect(s,pThread->events[pThread->nSocksUsed],FD_READ | FD_CLOSE | FD_WRITE); pThread->nSocksUsed++; bRet = TRUE; WSASetEvent(pThread->events[0]);//通知线程,有新的事件加入了,需要重新调用WSAWaitFormultipleEvents } LeaveCriticalSection(&pThread->cs); if(bRet) { InterlockedIncrement(&g_nTotalConnections); InterlockedIncrement(&g_nCurrentConnections); } return bRet; } void RemoveSocket(PTHREAD_OBJ pThread,SOCKET s) { if(pThread == NULL || s == INVALID_SOCKET) return; EnterCriticalSection(&pThread->cs); for(int i=1;i<pThread->nSocksUsed;i++) { if(pThread->sockets[i] == s) { WSACloseEvent(pThread->events[i]); closesocket(s); for(int j=i;j<pThread->nSocksUsed - 1;j++) { pThread->events[j] = pThread->events[j+1]; pThread->sockets[j] = pThread->sockets[j+1]; } pThread->nSocksUsed--; break; } } LeaveCriticalSection(&pThread->cs); InterlockedDecrement(&g_nCurrentConnections); } BOOL HandleIo(PTHREAD_OBJ pThread,int nIndex) { WSANETWORKEVENTS event; SOCKET s = pThread->sockets[nIndex]; HANDLE sEvent = pThread->events[nIndex]; if(0 != WSAEnumNetworkEvents(s,sEvent,&event)) { printf("socket error!\n"); RemoveSocket(pThread,s); return FALSE; } do { if(event.lNetworkEvents & FD_READ) { if(event.iErrorCode[FD_READ_BIT] == 0) { char szText[256]; int nRecv = recv(s,szText,strlen(szText),0); if(nRecv > 0) { szText[nRecv] = '\0'; printf("接收到数据:%s\n",szText); } else { break; } } else break; } else if(event.lNetworkEvents & FD_CLOSE) { break; } else if(event.lNetworkEvents & FD_WRITE) { printf("FD_WRITE==========================\n"); } return TRUE; } while (FALSE); printf("socket error2!\n"); RemoveSocket(pThread,s); return FALSE; } DWORD WINAPI ServerThread(LPVOID lpParam) { PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam; while(TRUE) { int nIndex = WSAWaitForMultipleEvents( pThread->nSocksUsed,pThread->events,FALSE,WSA_INFINITE,FALSE); nIndex = nIndex - WSA_WAIT_EVENT_0; if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT) { printf("WSAWaitForMultipleEvents error!\n"); continue; } else if(nIndex == 0) { ResetEvent(pThread->events[0]); } else { HandleIo(pThread,nIndex); } if(!g_bServerRunning && pThread->nSocksUsed == 1) break; } FreeThreadObj(pThread); return 0; } BOOL AssignToFreeThread(SOCKET s) { if(s == INVALID_SOCKET) return FALSE; BOOL bAllSucceed = TRUE; EnterCriticalSection(&g_cs); PTHREAD_OBJ pThread = g_pThreadList; while(pThread != NULL) { if(InsertSocket(pThread,s)) { break; } pThread = pThread->pNext; } if(pThread == NULL) { if(g_nThreadsCount < 1000) { pThread = CreateThreadObj(); HANDLE hThread = CreateThread(NULL,0,ServerThread,pThread,0,NULL); if(!hThread) { bAllSucceed = FALSE; FreeThreadObj(pThread); } else { g_hThreads[g_nThreadsCount++] = hThread; InsertSocket(pThread,s); } } else bAllSucceed = FALSE; } LeaveCriticalSection(&g_cs); return bAllSucceed; } DWORD WINAPI ControlThread(LPVOID lpParma) { HANDLE wsaEvent = (HANDLE)lpParma; char cmd[128]; while(scanf("%s",cmd)) { if(cmd[0] == 's') { g_bServerRunning = FALSE; EnterCriticalSection(&g_cs); PTHREAD_OBJ pThread = g_pThreadList; while(pThread != NULL) { EnterCriticalSection(&pThread->cs); for(int i=0;i<pThread->nSocksUsed;i++) { closesocket(pThread->sockets[i]); } WSASetEvent(pThread->events[0]); LeaveCriticalSection(&pThread->cs); pThread = pThread->pNext; } LeaveCriticalSection(&g_cs); WSASetEvent(wsaEvent); break; } } return 0; } int main() { WSAData wsaData; if(0 != WSAStartup(MAKEWORD(2,2),&wsaData)) { printf("初始化失败!%d\n",WSAGetLastError()); Sleep(5000); return -1; } USHORT nport = 3456; SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nport); sin.sin_addr.S_un.S_addr = ADDR_ANY; if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin))) { printf("bind failed!%d\n",WSAGetLastError()); Sleep(5000); return -1; } listen(sListen,200); WSAEVENT wsaEvent = WSACreateEvent(); WSAEventSelect(sListen,wsaEvent,FD_ACCEPT | FD_CLOSE); InitializeCriticalSectionAndSpinCount(&g_cs,4000); g_bServerRunning = TRUE; HANDLE hThread = CreateThread(NULL,0,ControlThread,wsaEvent,0,NULL); CloseHandle(hThread); while(TRUE) { int nRet = WaitForSingleObject(wsaEvent,5*1000); if(!g_bServerRunning) { closesocket(sListen); WSACloseEvent(wsaEvent); WSAWaitForMultipleEvents(g_nThreadsCount,g_hThreads,TRUE,INFINITE,FALSE); for(int i=0;i<g_nThreadsCount;i++) { CloseHandle(g_hThreads[i]); } break; } if(nRet == WAIT_FAILED) { printf("WaitForSingleObject Failed!\n"); break; } else if(nRet == WAIT_TIMEOUT) { printf("\nTotalConnections: %d\nCurrentConnections: %d\nThreads:%d\n", g_nTotalConnections,g_nCurrentConnections,g_nThreadsCount); continue; } else { ResetEvent(wsaEvent); while(TRUE) { sockaddr_in addrRemote; int nLen = sizeof(addrRemote); SOCKET sNew = accept(sListen,(sockaddr*)&addrRemote,&nLen); if(sNew == SOCKET_ERROR) break; if(!AssignToFreeThread(sNew)) { closesocket(sNew); printf("AssignToFreeThread Failed!\n"); } } } } DeleteCriticalSection(&g_cs); return 0; }
六,重叠I/O模型。
若需要建线程池,可参考事件选择模型。若纠结于send,可参考下面的IOCP。
- #include <WinSock2.h>
- #include <Windows.h>
- #include <MSWSock.h>
- #include <stdio.h>
- #pragma comment(lib,"Ws2_32.lib")
- #define BUFFER_SIZE 4096
- typedef struct _SOCKET_OBJ
- {
- SOCKET s;
- int nOutstandingOps;
- LPFN_ACCEPTEX lpfnAcceptEx;
- }SOCKET_OBJ,*PSOCKET_OBJ;
- PSOCKET_OBJ CreateSocketObj(SOCKET s)
- {
- PSOCKET_OBJ pSocket = new SOCKET_OBJ();
- if(pSocket != NULL)
- pSocket->s = s;
- return pSocket;
- }
- void FreeSocketObj(PSOCKET_OBJ pSocket)
- {
- if(pSocket == NULL)
- return;
- if(pSocket->s != INVALID_SOCKET)
- closesocket(pSocket->s);
- delete pSocket;
- }
- typedef struct _BUFFER_OBJ
- {
- OVERLAPPED ol;
- char* buff;
- int nLen;
- PSOCKET_OBJ pSocket;
- int nOperation;
- #define OP_ACCEPT 1
- #define OP_READ 2
- #define OP_WRITE 3
- SOCKET sAccept;
- _BUFFER_OBJ* pNext;
- }BUFFER_OBJ,*PBUFFER_OBJ;
- HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS];
- int g_nBufferCount;
- PBUFFER_OBJ g_pBufferHeader,g_pBufferTail;
- BOOL g_bServerRunning;
- CRITICAL_SECTION g_cs;
- PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen)
- {
- if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1)
- return NULL;
- PBUFFER_OBJ pBuffer = new BUFFER_OBJ();
- if(pBuffer != NULL)
- {
- pBuffer->buff = new char[nLen];
- pBuffer->nLen = nLen;
- pBuffer->ol.hEvent = WSACreateEvent();
- pBuffer->pSocket = pSocket;
- pBuffer->sAccept = INVALID_SOCKET;
- pBuffer->pNext = NULL;
- EnterCriticalSection(&g_cs);
- if(g_pBufferHeader == NULL)
- {
- g_pBufferHeader = g_pBufferTail = pBuffer;
- }
- else
- {
- g_pBufferTail->pNext = pBuffer;
- g_pBufferTail = pBuffer;
- }
- LeaveCriticalSection(&g_cs);
- g_events[++g_nBufferCount] = pBuffer->ol.hEvent;
- }
- return pBuffer;
- }
- void FreeBufferObj(PBUFFER_OBJ pBuffer)
- {
- EnterCriticalSection(&g_cs);
- PBUFFER_OBJ pTest = g_pBufferHeader;
- BOOL bFind = FALSE;
- if(pTest == pBuffer)
- {
- if(g_pBufferHeader == g_pBufferTail)
- g_pBufferHeader = g_pBufferTail = NULL;
- else
- g_pBufferHeader = g_pBufferHeader->pNext;
- bFind = TRUE;
- }
- else
- {
- while(pTest != NULL && pTest->pNext != pBuffer)
- pTest = pTest->pNext;
- if(pTest != NULL)
- {
- pTest->pNext = pBuffer->pNext;
- if(pTest->pNext == NULL)
- g_pBufferTail = pTest;
- bFind = TRUE;
- }
- }
- if(bFind)
- {
- g_nBufferCount--;
- WSACloseEvent(pBuffer->ol.hEvent);
- delete [] pBuffer->buff;
- delete pBuffer;
- }
- LeaveCriticalSection(&g_cs);
- }
- PBUFFER_OBJ FindBufferObj(HANDLE hEvent)
- {
- if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE)
- return NULL;
- EnterCriticalSection(&g_cs);
- PBUFFER_OBJ pTest = g_pBufferHeader;
- while(pTest != NULL && pTest->ol.hEvent != hEvent)
- pTest = pTest->pNext;
- LeaveCriticalSection(&g_cs);
- return pTest;
- }
- void RebuildArray()
- {
- EnterCriticalSection(&g_cs);
- PBUFFER_OBJ pBuffer = g_pBufferHeader;
- int i=1;
- while(pBuffer != NULL)
- {
- g_events[i++] = pBuffer->ol.hEvent;
- pBuffer = pBuffer->pNext;
- }
- LeaveCriticalSection(&g_cs);
- }
- BOOL PostAccept(PBUFFER_OBJ pBuffer)
- {
- PSOCKET_OBJ pSocket = pBuffer->pSocket;
- if(pSocket->lpfnAcceptEx != NULL)
- {
- pBuffer->nOperation = OP_ACCEPT;
- pSocket->nOutstandingOps++;
- DWORD dwBytes;
- pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
- BOOL b = pSocket->lpfnAcceptEx(pSocket->s,
- pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2),
- sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol);
- if(!b)
- {
- if(WSAGetLastError() != WSA_IO_PENDING)
- return FALSE;
- }
- return TRUE;
- }
- return FALSE;
- }
- BOOL PostRecv(PBUFFER_OBJ pBuffer)
- {
- pBuffer->nOperation = OP_READ;
- pBuffer->pSocket->nOutstandingOps++;
- DWORD dwBytes;
- DWORD dwFlags = 0;
- WSABUF buf;
- buf.buf = pBuffer->buff;
- buf.len = pBuffer->nLen;
- if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL))
- {
- if(WSAGetLastError() != WSA_IO_PENDING)
- return FALSE;
- }
- return TRUE;
- }
- BOOL PostSend(PBUFFER_OBJ pBuffer)
- {
- pBuffer->nOperation = OP_WRITE;
- pBuffer->pSocket->nOutstandingOps++;
- DWORD dwBytes;
- DWORD dwFlags = 0;
- WSABUF buf;
- buf.buf = pBuffer->buff;
- buf.len = pBuffer->nLen;
- if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL))
- {
- if(WSAGetLastError() != WSA_IO_PENDING)
- return FALSE;
- }
- return TRUE;
- }
- BOOL HandleIo(PBUFFER_OBJ pBuffer)
- {
- if(pBuffer == NULL)
- return FALSE;
- PSOCKET_OBJ pSocket = pBuffer->pSocket;
- pSocket->nOutstandingOps--;
- DWORD dwTrans;
- DWORD dwFlags;
- BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags);
- if(!bRet)
- {
- if(pSocket->s != INVALID_SOCKET)
- {
- closesocket(pSocket->s);
- pSocket->s = INVALID_SOCKET;
- }
- if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET)
- {
- closesocket(pBuffer->sAccept);
- pBuffer->sAccept = INVALID_SOCKET;
- }
- if(pSocket->nOutstandingOps == 0)
- {
- FreeSocketObj(pSocket);
- }
- FreeBufferObj(pBuffer);
- return FALSE;
- }
- switch(pBuffer->nOperation)
- {
- case OP_ACCEPT:
- {
- if(dwTrans > 0)
- {
- pBuffer->buff[dwTrans] = 0;
- printf("Accept收到数据:%s\n",pBuffer->buff);
- PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept);
- PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE);
- if(pRecv == NULL)
- {
- printf("Too much connections!\n");
- FreeSocketObj(pClient);
- return FALSE;
- }
- RebuildArray();
- if(!PostRecv(pRecv))
- {
- FreeSocketObj(pClient);
- FreeBufferObj(pBuffer);
- return FALSE;
- }
- }
- else
- {
- if(pSocket->s != INVALID_SOCKET)
- {
- closesocket(pSocket->s);
- pSocket->s = INVALID_SOCKET;
- }
- if(pBuffer->sAccept != INVALID_SOCKET)
- {
- closesocket(pBuffer->sAccept);
- pBuffer->sAccept = INVALID_SOCKET;
- }
- if(pSocket->nOutstandingOps == 0)
- {
- FreeSocketObj(pSocket);
- }
- FreeBufferObj(pBuffer);
- }
- // PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE);
- //if(pSend == NULL)
- //{
- // printf("Too much connections!\n");
- // FreeSocketObj(pClient);
- // return FALSE;
- //}
- //RebuildArray();
- //pSend->nLen = dwTrans;
- //memcpy(pSend->buff,pBuffer->buff,dwTrans);
- //if(!PostSend(pSend))
- //{
- // FreeSocketObj(pSocket);
- // FreeBufferObj(pBuffer);
- // return FALSE;
- //}
- PostAccept(pBuffer);
- }break;
- case OP_READ:
- {
- if(dwTrans > 0)
- {
- pBuffer->buff[dwTrans] = 0;
- printf("Recv收到数据:%s\n",pBuffer->buff);
- PostRecv(pBuffer);
- }
- else
- {
- if(pSocket->s != INVALID_SOCKET)
- {
- closesocket(pSocket->s);
- pSocket->s = INVALID_SOCKET;
- }
- if(pSocket->nOutstandingOps == 0)
- {
- FreeSocketObj(pSocket);
- }
- FreeBufferObj(pBuffer);
- }
- }break;
- case OP_WRITE:
- {
- if(dwTrans > 0)
- {
- pBuffer->buff[dwTrans] = 0;
- printf("发送数据: %s 成功!\n",pBuffer->buff);
- FreeBufferObj(pBuffer);
- }
- else
- {
- if(pSocket->s != INVALID_SOCKET)
- {
- closesocket(pSocket->s);
- pSocket->s = INVALID_SOCKET;
- }
- if(pSocket->nOutstandingOps == 0)
- {
- FreeSocketObj(pSocket);
- }
- FreeBufferObj(pBuffer);
- }
- }break;
- }
- }
- DWORD WINAPI ControlThread(LPVOID lpParma)
- {
- char cmd[128];
- while(scanf("%s",cmd))
- {
- if(cmd[0] == 's')
- {
- g_bServerRunning = FALSE;
- EnterCriticalSection(&g_cs);
- PBUFFER_OBJ pBuffer = g_pBufferHeader;
- while(pBuffer != NULL)
- {
- if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET)
- closesocket(pBuffer->pSocket->s);
- pBuffer = pBuffer->pNext;
- }
- LeaveCriticalSection(&g_cs);
- break;
- }
- }
- return 0;
- }
- int main()
- {
- InitializeCriticalSectionAndSpinCount(&g_cs,4000);
- WSAData wsaData;
- if(0 != WSAStartup(MAKEWORD(2,2),&wsaData))
- {
- printf("初始化失败!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- USHORT nport = 3456;
- SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
- sockaddr_in sin;
- sin.sin_family = AF_INET;
- sin.sin_port = htons(nport);
- sin.sin_addr.S_un.S_addr = ADDR_ANY;
- if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin)))
- {
- printf("bind failed!%d\n",WSAGetLastError());
- Sleep(5000);
- return -1;
- }
- listen(sListen,200);
- g_bServerRunning = TRUE;
- PSOCKET_OBJ pListen = CreateSocketObj(sListen);
- GUID GuidAcceptEx = WSAID_ACCEPTEX;
- DWORD dwBytes;
- WSAIoctl(pListen->s,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &GuidAcceptEx,
- sizeof(GuidAcceptEx),
- &pListen->lpfnAcceptEx,
- sizeof(pListen->lpfnAcceptEx),
- &dwBytes,
- NULL,
- NULL);
- g_events[0] = WSACreateEvent();
- for(int i=0;i<5;++i)
- {
- PostAccept(CreateBufferObj(pListen,BUFFER_SIZE));
- }
- HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL);
- while(TRUE)
- {
- int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE);
- if(nIndex == WSA_WAIT_FAILED)
- {
- printf("WSAWaitForMultipleEvents Failed!\n");
- break;
- }
- nIndex = nIndex - WSA_WAIT_EVENT_0;
- for(int i=nIndex;i<= g_nBufferCount;i++)
- {
- int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE);
- if(nRet == WSA_WAIT_TIMEOUT)
- continue;
- if(i == 0)
- {
- RebuildArray();
- continue;
- }
- PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]);
- if(pBuffer != NULL)
- {
- if(!HandleIo(pBuffer))
- RebuildArray();
- }
- }
- if(!g_bServerRunning && g_nBufferCount == 0)
- break;
- }
- WSACloseEvent(g_events[0]);
- WaitForSingleObject(hThread,INFINITE);
- CloseHandle(hThread);
- closesocket(sListen);
- WSACleanup();
- DeleteCriticalSection(&g_cs);
- return 0;
- }
#include <WinSock2.h> #include <Windows.h> #include <MSWSock.h> #include <stdio.h> #pragma comment(lib,"Ws2_32.lib") #define BUFFER_SIZE 4096 typedef struct _SOCKET_OBJ { SOCKET s; int nOutstandingOps; LPFN_ACCEPTEX lpfnAcceptEx; }SOCKET_OBJ,*PSOCKET_OBJ; PSOCKET_OBJ CreateSocketObj(SOCKET s) { PSOCKET_OBJ pSocket = new SOCKET_OBJ(); if(pSocket != NULL) pSocket->s = s; return pSocket; } void FreeSocketObj(PSOCKET_OBJ pSocket) { if(pSocket == NULL) return; if(pSocket->s != INVALID_SOCKET) closesocket(pSocket->s); delete pSocket; } typedef struct _BUFFER_OBJ { OVERLAPPED ol; char* buff; int nLen; PSOCKET_OBJ pSocket; int nOperation; #define OP_ACCEPT 1 #define OP_READ 2 #define OP_WRITE 3 SOCKET sAccept; _BUFFER_OBJ* pNext; }BUFFER_OBJ,*PBUFFER_OBJ; HANDLE g_events[WSA_MAXIMUM_WAIT_EVENTS]; int g_nBufferCount; PBUFFER_OBJ g_pBufferHeader,g_pBufferTail; BOOL g_bServerRunning; CRITICAL_SECTION g_cs; PBUFFER_OBJ CreateBufferObj(PSOCKET_OBJ pSocket,ULONG nLen) { if(g_nBufferCount > WSA_MAXIMUM_WAIT_EVENTS - 1) return NULL; PBUFFER_OBJ pBuffer = new BUFFER_OBJ(); if(pBuffer != NULL) { pBuffer->buff = new char[nLen]; pBuffer->nLen = nLen; pBuffer->ol.hEvent = WSACreateEvent(); pBuffer->pSocket = pSocket; pBuffer->sAccept = INVALID_SOCKET; pBuffer->pNext = NULL; EnterCriticalSection(&g_cs); if(g_pBufferHeader == NULL) { g_pBufferHeader = g_pBufferTail = pBuffer; } else { g_pBufferTail->pNext = pBuffer; g_pBufferTail = pBuffer; } LeaveCriticalSection(&g_cs); g_events[++g_nBufferCount] = pBuffer->ol.hEvent; } return pBuffer; } void FreeBufferObj(PBUFFER_OBJ pBuffer) { EnterCriticalSection(&g_cs); PBUFFER_OBJ pTest = g_pBufferHeader; BOOL bFind = FALSE; if(pTest == pBuffer) { if(g_pBufferHeader == g_pBufferTail) g_pBufferHeader = g_pBufferTail = NULL; else g_pBufferHeader = g_pBufferHeader->pNext; bFind = TRUE; } else { while(pTest != NULL && pTest->pNext != pBuffer) pTest = pTest->pNext; if(pTest != NULL) { pTest->pNext = pBuffer->pNext; if(pTest->pNext == NULL) g_pBufferTail = pTest; bFind = TRUE; } } if(bFind) { g_nBufferCount--; WSACloseEvent(pBuffer->ol.hEvent); delete [] pBuffer->buff; delete pBuffer; } LeaveCriticalSection(&g_cs); } PBUFFER_OBJ FindBufferObj(HANDLE hEvent) { if(hEvent == NULL || hEvent == INVALID_HANDLE_VALUE) return NULL; EnterCriticalSection(&g_cs); PBUFFER_OBJ pTest = g_pBufferHeader; while(pTest != NULL && pTest->ol.hEvent != hEvent) pTest = pTest->pNext; LeaveCriticalSection(&g_cs); return pTest; } void RebuildArray() { EnterCriticalSection(&g_cs); PBUFFER_OBJ pBuffer = g_pBufferHeader; int i=1; while(pBuffer != NULL) { g_events[i++] = pBuffer->ol.hEvent; pBuffer = pBuffer->pNext; } LeaveCriticalSection(&g_cs); } BOOL PostAccept(PBUFFER_OBJ pBuffer) { PSOCKET_OBJ pSocket = pBuffer->pSocket; if(pSocket->lpfnAcceptEx != NULL) { pBuffer->nOperation = OP_ACCEPT; pSocket->nOutstandingOps++; DWORD dwBytes; pBuffer->sAccept = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); BOOL b = pSocket->lpfnAcceptEx(pSocket->s, pBuffer->sAccept,pBuffer->buff,BUFFER_SIZE - ((sizeof(sockaddr_in) + 16)*2), sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16,&dwBytes,&pBuffer->ol); if(!b) { if(WSAGetLastError() != WSA_IO_PENDING) return FALSE; } return TRUE; } return FALSE; } BOOL PostRecv(PBUFFER_OBJ pBuffer) { pBuffer->nOperation = OP_READ; pBuffer->pSocket->nOutstandingOps++; DWORD dwBytes; DWORD dwFlags = 0; WSABUF buf; buf.buf = pBuffer->buff; buf.len = pBuffer->nLen; if(WSARecv(pBuffer->pSocket->s,&buf,1,&dwBytes,&dwFlags,&pBuffer->ol,NULL)) { if(WSAGetLastError() != WSA_IO_PENDING) return FALSE; } return TRUE; } BOOL PostSend(PBUFFER_OBJ pBuffer) { pBuffer->nOperation = OP_WRITE; pBuffer->pSocket->nOutstandingOps++; DWORD dwBytes; DWORD dwFlags = 0; WSABUF buf; buf.buf = pBuffer->buff; buf.len = pBuffer->nLen; if(WSASend(pBuffer->pSocket->s,&buf,1,&dwBytes,dwFlags,&pBuffer->ol,NULL)) { if(WSAGetLastError() != WSA_IO_PENDING) return FALSE; } return TRUE; } BOOL HandleIo(PBUFFER_OBJ pBuffer) { if(pBuffer == NULL) return FALSE; PSOCKET_OBJ pSocket = pBuffer->pSocket; pSocket->nOutstandingOps--; DWORD dwTrans; DWORD dwFlags; BOOL bRet = WSAGetOverlappedResult(pSocket->s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags); if(!bRet) { if(pSocket->s != INVALID_SOCKET) { closesocket(pSocket->s); pSocket->s = INVALID_SOCKET; } if(pBuffer->nOperation == OP_ACCEPT && pBuffer->sAccept != INVALID_SOCKET) { closesocket(pBuffer->sAccept); pBuffer->sAccept = INVALID_SOCKET; } if(pSocket->nOutstandingOps == 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); return FALSE; } switch(pBuffer->nOperation) { case OP_ACCEPT: { if(dwTrans > 0) { pBuffer->buff[dwTrans] = 0; printf("Accept收到数据:%s\n",pBuffer->buff); PSOCKET_OBJ pClient = CreateSocketObj(pBuffer->sAccept); PBUFFER_OBJ pRecv = CreateBufferObj(pClient,BUFFER_SIZE); if(pRecv == NULL) { printf("Too much connections!\n"); FreeSocketObj(pClient); return FALSE; } RebuildArray(); if(!PostRecv(pRecv)) { FreeSocketObj(pClient); FreeBufferObj(pBuffer); return FALSE; } } else { if(pSocket->s != INVALID_SOCKET) { closesocket(pSocket->s); pSocket->s = INVALID_SOCKET; } if(pBuffer->sAccept != INVALID_SOCKET) { closesocket(pBuffer->sAccept); pBuffer->sAccept = INVALID_SOCKET; } if(pSocket->nOutstandingOps == 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); } // PBUFFER_OBJ pSend = CreateBufferObj(pClient,BUFFER_SIZE); //if(pSend == NULL) //{ // printf("Too much connections!\n"); // FreeSocketObj(pClient); // return FALSE; //} //RebuildArray(); //pSend->nLen = dwTrans; //memcpy(pSend->buff,pBuffer->buff,dwTrans); //if(!PostSend(pSend)) //{ // FreeSocketObj(pSocket); // FreeBufferObj(pBuffer); // return FALSE; //} PostAccept(pBuffer); }break; case OP_READ: { if(dwTrans > 0) { pBuffer->buff[dwTrans] = 0; printf("Recv收到数据:%s\n",pBuffer->buff); PostRecv(pBuffer); } else { if(pSocket->s != INVALID_SOCKET) { closesocket(pSocket->s); pSocket->s = INVALID_SOCKET; } if(pSocket->nOutstandingOps == 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); } }break; case OP_WRITE: { if(dwTrans > 0) { pBuffer->buff[dwTrans] = 0; printf("发送数据: %s 成功!\n",pBuffer->buff); FreeBufferObj(pBuffer); } else { if(pSocket->s != INVALID_SOCKET) { closesocket(pSocket->s); pSocket->s = INVALID_SOCKET; } if(pSocket->nOutstandingOps == 0) { FreeSocketObj(pSocket); } FreeBufferObj(pBuffer); } }break; } } DWORD WINAPI ControlThread(LPVOID lpParma) { char cmd[128]; while(scanf("%s",cmd)) { if(cmd[0] == 's') { g_bServerRunning = FALSE; EnterCriticalSection(&g_cs); PBUFFER_OBJ pBuffer = g_pBufferHeader; while(pBuffer != NULL) { if(pBuffer->pSocket != NULL && pBuffer->pSocket->s != INVALID_SOCKET) closesocket(pBuffer->pSocket->s); pBuffer = pBuffer->pNext; } LeaveCriticalSection(&g_cs); break; } } return 0; } int main() { InitializeCriticalSectionAndSpinCount(&g_cs,4000); WSAData wsaData; if(0 != WSAStartup(MAKEWORD(2,2),&wsaData)) { printf("初始化失败!%d\n",WSAGetLastError()); Sleep(5000); return -1; } USHORT nport = 3456; SOCKET sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nport); sin.sin_addr.S_un.S_addr = ADDR_ANY; if(SOCKET_ERROR == bind(sListen,(sockaddr*)&sin,sizeof(sin))) { printf("bind failed!%d\n",WSAGetLastError()); Sleep(5000); return -1; } listen(sListen,200); g_bServerRunning = TRUE; PSOCKET_OBJ pListen = CreateSocketObj(sListen); GUID GuidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes; WSAIoctl(pListen->s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &pListen->lpfnAcceptEx, sizeof(pListen->lpfnAcceptEx), &dwBytes, NULL, NULL); g_events[0] = WSACreateEvent(); for(int i=0;i<5;++i) { PostAccept(CreateBufferObj(pListen,BUFFER_SIZE)); } HANDLE hThread = CreateThread(NULL,0,ControlThread,NULL,0,NULL); while(TRUE) { int nIndex = WSAWaitForMultipleEvents(g_nBufferCount+1,g_events,FALSE,WSA_INFINITE,FALSE); if(nIndex == WSA_WAIT_FAILED) { printf("WSAWaitForMultipleEvents Failed!\n"); break; } nIndex = nIndex - WSA_WAIT_EVENT_0; for(int i=nIndex;i<= g_nBufferCount;i++) { int nRet = WSAWaitForMultipleEvents(1,&g_events[i],FALSE,0,FALSE); if(nRet == WSA_WAIT_TIMEOUT) continue; if(i == 0) { RebuildArray(); continue; } PBUFFER_OBJ pBuffer = FindBufferObj(g_events[i]); if(pBuffer != NULL) { if(!HandleIo(pBuffer)) RebuildArray(); } } if(!g_bServerRunning && g_nBufferCount == 0) break; } WSACloseEvent(g_events[0]); WaitForSingleObject(hThread,INFINITE); CloseHandle(hThread); closesocket(sListen); WSACleanup(); DeleteCriticalSection(&g_cs); return 0; }
七,IOCP。
大框架为书中例子,对强化了发送操作,部分异常处理,且加入了连接超时处理。
注意:当一个投递完成,且对应socket上已经没有未决的投递,必须要再投递一个请求或者关闭连接,否则socket对应的数据结构无法被释放,对应socket连接断开时也无法被
检测到。所以如果业务逻辑结束,要关闭连接。或者你需要等客户端来断开连接,那么你可以在业务逻辑结束后,再投递一个接收请求(客户端断开时,接收请求返回且接收的字节数为0,则此类中的异常处理逻辑便会将资源清理掉)。
头文件
- ////////////////////////////////////////
- // IOCP.h文件
- #ifndef __IOCP_H__
- #define __IOCP_H__
- #include <winsock2.h>
- #include <windows.h>
- #include <Mswsock.h>
- #define BUFFER_SIZE 1024*4 // I/O请求的缓冲区大小
- #define MAX_THREAD 1 // I/O服务线程的数量
- // 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息
- struct CIOCPBuffer
- {
- CIOCPBuffer()
- {
- memset(&ol,0,sizeof(WSAOVERLAPPED));
- sClient = INVALID_SOCKET;
- memset(buff,0,BUFFER_SIZE);
- nLen = 0;
- nSequenceNumber = 0;
- bIsReleased = FALSE;
- nOperation = 0;
- pNext = NULL;
- }
- WSAOVERLAPPED ol;
- SOCKET sClient; // AcceptEx接收的客户方套节字
- char buff[BUFFER_SIZE]; // I/O操作使用的缓冲区
- int nLen; // buff缓冲区(使用的)大小
- ULONG nSequenceNumber; // 此I/O的序列号
- BOOL bIsReleased;
- int nOperation; // 操作类型
- #define OP_ACCEPT 1
- #define OP_WRITE 2
- #define OP_READ 3
- CIOCPBuffer *pNext;
- };
- struct CIOCPNextToSend;
- struct CIOCPTimerData;
- // 这是per-Handle数据。它包含了一个套节字的信息
- struct CIOCPContext
- {
- CIOCPContext()
- {
- s = INVALID_SOCKET;
- memset(&addrLocal,0,sizeof(SOCKADDR_IN));
- memset(&addrRemote,0,sizeof(SOCKADDR_IN));
- bClosing = FALSE;
- nOutstandingRecv = 0;
- nOutstandingSend = 0;
- nReadSequence = 0;
- nCurrentReadSequence = 0;
- nCurrentStep = 0;
- pOutOfOrderReads = NULL;
- pNextToSend = NULL;
- bIsReleased = FALSE;
- pNext = NULL;
- pPreData = NULL;
- strcpy(szClientName,"");
- hTimer = NULL;
- hCompletion = NULL;
- }
- CIOCPBuffer m_pBuffer;
- SOCKET s; // 套节字句柄
- SOCKADDR_IN addrLocal; // 连接的本地地址
- SOCKADDR_IN addrRemote; // 连接的远程地址
- BOOL bClosing; // 套节字是否关闭
- int nOutstandingRecv; // 此套节字上抛出的重叠操作的数量
- int nOutstandingSend;
- ULONG nReadSequence; // 安排给接收的下一个序列号
- ULONG nCurrentReadSequence; // 当前要读的序列号
- CIOCPBuffer *pOutOfOrderReads; // 记录没有按顺序完成的读I/O
- CIOCPNextToSend *pNextToSend; //xss,按顺序发送的下一个要发送的。
- LPVOID pPreData; //xss,用于2个过程之间的数据交流。
- ULONG nCurrentStep;//xss,用于记录当前处于的过程步骤数。
- BOOL bIsReleased;
- CRITICAL_SECTION Lock; // 保护这个结构
- CIOCPContext *pNext;
- char szClientName[256];//xss
- HANDLE hTimer;//xss
- HANDLE hCompletion;//xss
- };
- struct CIOCPNextToSend//xss
- {
- CIOCPBuffer * pBuffer;
- CIOCPNextToSend * pNext;
- };
- struct CIOCPTimerData
- {
- CIOCPContext* pContext;
- HANDLE hCompletion;
- };
- class CIOCPServer // 处理线程
- {
- public:
- CIOCPServer();
- ~CIOCPServer();
- // 开始服务
- BOOL Start(int nPort = 3456, int nMaxConnections = 2000,
- int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);
- // 停止服务
- void Shutdown();
- // 关闭一个连接和关闭所有连接
- void CloseAConnection(CIOCPContext *pContext);
- void CloseAllConnections();
- // 取得当前的连接数量
- ULONG GetCurrentConnection() { return m_nCurrentConnection; }
- // 向指定客户发送文本
- BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);
- protected:
- // 申请和释放缓冲区对象
- CIOCPBuffer *AllocateBuffer(int nLen);
- void ReleaseBuffer(CIOCPBuffer *pBuffer);
- // 申请和释放套节字上下文
- CIOCPContext *AllocateContext(SOCKET s);
- void ReleaseContext(CIOCPContext *pContext);
- // 释放空闲缓冲区对象列表和空闲上下文对象列表
- void FreeBuffers();
- void FreeContexts();
- // 向连接列表中添加一个连接
- BOOL AddAConnection(CIOCPContext *pContext);
- // 插入和移除未决的接受请求
- BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);
- BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);
- //xss,把要发送的数据加入队列,按顺序发送
- BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- //xss,发送下一个需要发送的
- BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- // 取得下一个要读取的
- CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理
- // 投递接受I/O、发送I/O、接收I/O
- BOOL PostAccept(CIOCPBuffer *pBuffer);
- BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);
- // 事件通知函数
- // 建立了一个新的连接
- virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- // 一个连接关闭
- virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- // 在一个连接上发生了错误
- virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);
- // 一个连接上的读操作完成
- virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- // 一个连接上的写操作完成
- virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);
- protected:
- // 记录空闲结构信息
- CIOCPBuffer *m_pFreeBufferList;
- CIOCPContext *m_pFreeContextList;
- int m_nFreeBufferCount;
- int m_nFreeContextCount;
- CRITICAL_SECTION m_FreeBufferListLock;
- CRITICAL_SECTION m_FreeContextListLock;
- CRITICAL_SECTION m_HeapLock;
- CRITICAL_SECTION m_RepostLock;
- // 记录抛出的Accept请求
- CIOCPBuffer *m_pPendingAccepts; // 抛出请求列表。
- long m_nPendingAcceptCount;
- CRITICAL_SECTION m_PendingAcceptsLock;
- // 记录连接列表
- CIOCPContext *m_pConnectionList;
- int m_nCurrentConnection;
- CRITICAL_SECTION m_ConnectionListLock;
- // 用于投递Accept请求
- HANDLE m_hAcceptEvent;
- HANDLE m_hRepostEvent;
- LONG m_nRepostCount;
- int m_nPort; // 服务器监听的端口
- int m_nInitialAccepts;
- int m_nInitialReads;
- int m_nMaxAccepts;
- int m_nMaxSends;
- int m_nMaxFreeBuffers;
- int m_nMaxFreeContexts;
- int m_nMaxConnections;
- HANDLE m_hListenThread; // 监听线程
- HANDLE m_hCompletion; // 完成端口句柄
- SOCKET m_sListen; // 监听套节字句柄
- LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数地址
- LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址
- BOOL m_bShutDown; // 用于通知监听线程退出
- BOOL m_bServerStarted; // 记录服务是否启动
- HANDLE m_hTimerQueue;//xss
- private: // 线程函数
- static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);
- static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);
- };
- #endif // __IOCP_H__
//////////////////////////////////////// // IOCP.h文件 #ifndef __IOCP_H__ #define __IOCP_H__ #include <winsock2.h> #include <windows.h> #include <Mswsock.h> #define BUFFER_SIZE 1024*4 // I/O请求的缓冲区大小 #define MAX_THREAD 1 // I/O服务线程的数量 // 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息 struct CIOCPBuffer { CIOCPBuffer() { memset(&ol,0,sizeof(WSAOVERLAPPED)); sClient = INVALID_SOCKET; memset(buff,0,BUFFER_SIZE); nLen = 0; nSequenceNumber = 0; bIsReleased = FALSE; nOperation = 0; pNext = NULL; } WSAOVERLAPPED ol; SOCKET sClient; // AcceptEx接收的客户方套节字 char buff[BUFFER_SIZE]; // I/O操作使用的缓冲区 int nLen; // buff缓冲区(使用的)大小 ULONG nSequenceNumber; // 此I/O的序列号 BOOL bIsReleased; int nOperation; // 操作类型 #define OP_ACCEPT 1 #define OP_WRITE 2 #define OP_READ 3 CIOCPBuffer *pNext; }; struct CIOCPNextToSend; struct CIOCPTimerData; // 这是per-Handle数据。它包含了一个套节字的信息 struct CIOCPContext { CIOCPContext() { s = INVALID_SOCKET; memset(&addrLocal,0,sizeof(SOCKADDR_IN)); memset(&addrRemote,0,sizeof(SOCKADDR_IN)); bClosing = FALSE; nOutstandingRecv = 0; nOutstandingSend = 0; nReadSequence = 0; nCurrentReadSequence = 0; nCurrentStep = 0; pOutOfOrderReads = NULL; pNextToSend = NULL; bIsReleased = FALSE; pNext = NULL; pPreData = NULL; strcpy(szClientName,""); hTimer = NULL; hCompletion = NULL; } CIOCPBuffer m_pBuffer; SOCKET s; // 套节字句柄 SOCKADDR_IN addrLocal; // 连接的本地地址 SOCKADDR_IN addrRemote; // 连接的远程地址 BOOL bClosing; // 套节字是否关闭 int nOutstandingRecv; // 此套节字上抛出的重叠操作的数量 int nOutstandingSend; ULONG nReadSequence; // 安排给接收的下一个序列号 ULONG nCurrentReadSequence; // 当前要读的序列号 CIOCPBuffer *pOutOfOrderReads; // 记录没有按顺序完成的读I/O CIOCPNextToSend *pNextToSend; //xss,按顺序发送的下一个要发送的。 LPVOID pPreData; //xss,用于2个过程之间的数据交流。 ULONG nCurrentStep;//xss,用于记录当前处于的过程步骤数。 BOOL bIsReleased; CRITICAL_SECTION Lock; // 保护这个结构 CIOCPContext *pNext; char szClientName[256];//xss HANDLE hTimer;//xss HANDLE hCompletion;//xss }; struct CIOCPNextToSend//xss { CIOCPBuffer * pBuffer; CIOCPNextToSend * pNext; }; struct CIOCPTimerData { CIOCPContext* pContext; HANDLE hCompletion; }; class CIOCPServer // 处理线程 { public: CIOCPServer(); ~CIOCPServer(); // 开始服务 BOOL Start(int nPort = 3456, int nMaxConnections = 2000, int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4); // 停止服务 void Shutdown(); // 关闭一个连接和关闭所有连接 void CloseAConnection(CIOCPContext *pContext); void CloseAllConnections(); // 取得当前的连接数量 ULONG GetCurrentConnection() { return m_nCurrentConnection; } // 向指定客户发送文本 BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen); protected: // 申请和释放缓冲区对象 CIOCPBuffer *AllocateBuffer(int nLen); void ReleaseBuffer(CIOCPBuffer *pBuffer); // 申请和释放套节字上下文 CIOCPContext *AllocateContext(SOCKET s); void ReleaseContext(CIOCPContext *pContext); // 释放空闲缓冲区对象列表和空闲上下文对象列表 void FreeBuffers(); void FreeContexts(); // 向连接列表中添加一个连接 BOOL AddAConnection(CIOCPContext *pContext); // 插入和移除未决的接受请求 BOOL InsertPendingAccept(CIOCPBuffer *pBuffer); BOOL RemovePendingAccept(CIOCPBuffer *pBuffer); //xss,把要发送的数据加入队列,按顺序发送 BOOL PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer); //xss,发送下一个需要发送的 BOOL PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 取得下一个要读取的 CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer); void ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer);//xss,错误集中处理 // 投递接受I/O、发送I/O、接收I/O BOOL PostAccept(CIOCPBuffer *pBuffer); BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer); BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer); BOOL PostRecv2(CIOCPContext *pContext, CIOCPBuffer *pBuffer); void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError); // 事件通知函数 // 建立了一个新的连接 virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 一个连接关闭 virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 在一个连接上发生了错误 virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError); // 一个连接上的读操作完成 virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer); // 一个连接上的写操作完成 virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer); protected: // 记录空闲结构信息 CIOCPBuffer *m_pFreeBufferList; CIOCPContext *m_pFreeContextList; int m_nFreeBufferCount; int m_nFreeContextCount; CRITICAL_SECTION m_FreeBufferListLock; CRITICAL_SECTION m_FreeContextListLock; CRITICAL_SECTION m_HeapLock; CRITICAL_SECTION m_RepostLock; // 记录抛出的Accept请求 CIOCPBuffer *m_pPendingAccepts; // 抛出请求列表。 long m_nPendingAcceptCount; CRITICAL_SECTION m_PendingAcceptsLock; // 记录连接列表 CIOCPContext *m_pConnectionList; int m_nCurrentConnection; CRITICAL_SECTION m_ConnectionListLock; // 用于投递Accept请求 HANDLE m_hAcceptEvent; HANDLE m_hRepostEvent; LONG m_nRepostCount; int m_nPort; // 服务器监听的端口 int m_nInitialAccepts; int m_nInitialReads; int m_nMaxAccepts; int m_nMaxSends; int m_nMaxFreeBuffers; int m_nMaxFreeContexts; int m_nMaxConnections; HANDLE m_hListenThread; // 监听线程 HANDLE m_hCompletion; // 完成端口句柄 SOCKET m_sListen; // 监听套节字句柄 LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数地址 LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址 BOOL m_bShutDown; // 用于通知监听线程退出 BOOL m_bServerStarted; // 记录服务是否启动 HANDLE m_hTimerQueue;//xss private: // 线程函数 static DWORD WINAPI _ListenThreadProc(LPVOID lpParam); static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam); }; #endif // __IOCP_H__
cpp文件
- //////////////////////////////////////////////////
- // IOCP.cpp文件
- #define _WIN32_WINNT 0x0500 //xss
- #include "iocp.h"
- #pragma comment(lib, "WS2_32.lib")
- #include <stdio.h>
- #include "httpFun.h"
- static int iBufferCount = 0;
- static int iContextCount = 0;
- CIOCPServer::CIOCPServer()
- {
- // 列表
- m_pFreeBufferList = NULL;
- m_pFreeContextList = NULL;
- m_pPendingAccepts = NULL;
- m_pConnectionList = NULL;
- m_nFreeBufferCount = 0;
- m_nFreeContextCount = 0;
- m_nPendingAcceptCount = 0;
- m_nCurrentConnection = 0;
- ::InitializeCriticalSection(&m_FreeBufferListLock);
- ::InitializeCriticalSection(&m_FreeContextListLock);
- ::InitializeCriticalSection(&m_PendingAcceptsLock);
- ::InitializeCriticalSection(&m_ConnectionListLock);
- ::InitializeCriticalSection(&m_HeapLock);
- ::InitializeCriticalSection(&m_RepostLock);
- // Accept请求
- m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
- m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
- m_nRepostCount = 0;
- m_nPort = 8888;
- m_nInitialAccepts = 10;
- m_nInitialReads = 4;
- m_nMaxAccepts = 100;
- m_nMaxSends = 20;
- m_nMaxFreeBuffers = 200;
- m_nMaxFreeContexts = 100;
- m_nMaxConnections = 2000;
- m_hListenThread = NULL;
- m_hCompletion = NULL;
- m_sListen = INVALID_SOCKET;
- m_lpfnAcceptEx = NULL;
- m_lpfnGetAcceptExSockaddrs = NULL;
- m_bShutDown = FALSE;
- m_bServerStarted = FALSE;
- m_hTimerQueue = ::CreateTimerQueue();
- // 初始化WS2_32.dll
- WSADATA wsaData;
- WORD sockVersion = MAKEWORD(2, 2);
- ::WSAStartup(sockVersion, &wsaData);
- }
- CIOCPServer::~CIOCPServer()
- {
- Shutdown();
- if(m_sListen != INVALID_SOCKET)
- ::closesocket(m_sListen);
- if(m_hListenThread != NULL)
- ::CloseHandle(m_hListenThread);
- ::CloseHandle(m_hRepostEvent);
- ::CloseHandle(m_hAcceptEvent);
- ::DeleteCriticalSection(&m_FreeBufferListLock);
- ::DeleteCriticalSection(&m_FreeContextListLock);
- ::DeleteCriticalSection(&m_PendingAcceptsLock);
- ::DeleteCriticalSection(&m_ConnectionListLock);
- ::DeleteCriticalSection(&m_HeapLock);
- ::DeleteCriticalSection(&m_RepostLock);
- ::DeleteTimerQueue(m_hTimerQueue);//xss
- ::WSACleanup();
- }
- ///////////////////////////////////////
- static VOID CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired)
- {
- CIOCPContext* pContext = (CIOCPContext*)lpParam;
- if(pContext != NULL && pContext->bClosing == FALSE)
- {
- EnterCriticalSection(&pContext->Lock);
- if(pContext->hCompletion != NULL)
- {
- PostQueuedCompletionStatus(pContext->hCompletion,-2,(ULONG_PTR)pContext,NULL);
- }
- LeaveCriticalSection(&pContext->Lock);
- }
- }
- ///////////////////////////////////
- // 自定义帮助函数
- CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
- {
- CIOCPBuffer *pBuffer = NULL;
- if(nLen > BUFFER_SIZE)
- return NULL;
- // 为缓冲区对象申请内存
- ::EnterCriticalSection(&m_FreeBufferListLock);
- if(m_pFreeBufferList == NULL) // 内存池为空,申请新的内存
- {
- // pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(),
- // HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
- pBuffer = new CIOCPBuffer();
- }
- else // 从内存池中取一块来使用
- {
- pBuffer = m_pFreeBufferList;
- m_pFreeBufferList = m_pFreeBufferList->pNext;
- pBuffer->pNext = NULL;
- m_nFreeBufferCount --;
- }
- ::LeaveCriticalSection(&m_FreeBufferListLock);
- EnterCriticalSection(&m_HeapLock);
- iBufferCount++;
- LeaveCriticalSection(&m_HeapLock);
- // 初始化新的缓冲区对象
- if(pBuffer != NULL)
- {
- //pBuffer->buff = (char*)(pBuffer + sizeof(CIOCPBuffer)/*1*/);//xss,个人以为应该+sizeof(CIOCPBuffer);
- pBuffer->nLen = nLen;
- pBuffer->bIsReleased = FALSE;
- }
- return pBuffer;
- }
- void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
- {
- if(pBuffer == NULL || pBuffer->bIsReleased)
- return;
- ::EnterCriticalSection(&m_FreeBufferListLock);
- if(m_nFreeBufferCount <= m_nMaxFreeBuffers) // 将要释放的内存添加到空闲列表中
- {
- memset(pBuffer, 0, sizeof(CIOCPBuffer) /*+ BUFFER_SIZE*/);
- pBuffer->pNext = m_pFreeBufferList;
- m_pFreeBufferList = pBuffer;
- m_nFreeBufferCount ++ ;
- pBuffer->bIsReleased = TRUE;
- }
- else // 已经达到最大值,真正的释放内存
- {
- //::HeapFree(::GetProcessHeap(), 0, pBuffer);
- delete pBuffer;
- }
- ::LeaveCriticalSection(&m_FreeBufferListLock);
- EnterCriticalSection(&m_HeapLock);
- iBufferCount--;
- LeaveCriticalSection(&m_HeapLock);
- }
- CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
- {
- CIOCPContext *pContext;
- // 申请一个CIOCPContext对象
- ::EnterCriticalSection(&m_FreeContextListLock);
- if(m_pFreeContextList == NULL)
- {
- //pContext = (CIOCPContext *)::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext));
- pContext = new CIOCPContext();
- ::InitializeCriticalSection(&pContext->Lock);
- }
- else
- {
- // 在空闲列表中申请
- pContext = m_pFreeContextList;
- m_pFreeContextList = m_pFreeContextList->pNext;
- pContext->pNext = NULL;
- m_nFreeBufferCount --;
- }
- ::LeaveCriticalSection(&m_FreeContextListLock);
- EnterCriticalSection(&m_HeapLock);
- iContextCount++;
- LeaveCriticalSection(&m_HeapLock);
- // 初始化对象成员
- if(pContext != NULL)
- {
- pContext->s = s;
- pContext->bIsReleased = FALSE;
- }
- return pContext;
- }
- void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
- {
- if(pContext == NULL || pContext->bIsReleased)
- return;
- printf("\n%s释放了Context\n\n",pContext->szClientName);
- if(pContext->s != INVALID_SOCKET)
- ::closesocket(pContext->s);
- // 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区
- CIOCPBuffer *pNext;
- while(pContext->pOutOfOrderReads != NULL)
- {
- pNext = pContext->pOutOfOrderReads->pNext;
- ReleaseBuffer(pContext->pOutOfOrderReads);
- pContext->pOutOfOrderReads = pNext;
- }
- //xss,再释放(如果有的话)此套接字上未完成的写I/O缓冲区
- CIOCPNextToSend* pSend = NULL;
- while(pContext->pNextToSend != NULL)
- {
- pSend = pContext->pNextToSend->pNext;
- if(pContext->pNextToSend->pBuffer != NULL && pContext->pNextToSend->pBuffer->bIsReleased == FALSE)
- {
- ReleaseBuffer(pContext->pNextToSend->pBuffer);
- }
- delete pContext->pNextToSend;
- pContext->pNextToSend = pSend;
- }
- if(pContext->hTimer != NULL)
- {
- DeleteTimerQueueTimer(m_hTimerQueue,pContext->hTimer,NULL);
- pContext->hTimer = NULL;
- }
- ::EnterCriticalSection(&m_FreeContextListLock);
- if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表
- {
- // 先将关键代码段变量保存到一个临时变量中
- CRITICAL_SECTION cstmp = pContext->Lock;
- // 将要释放的上下文对象初始化为0
- memset(pContext, 0, sizeof(CIOCPContext));
- // 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头
- pContext->Lock = cstmp;
- pContext->pNext = m_pFreeContextList;
- m_pFreeContextList = pContext;
- // 更新计数
- m_nFreeContextCount ++;
- pContext->bIsReleased = TRUE;
- }
- else
- {
- ::DeleteCriticalSection(&pContext->Lock);
- //::HeapFree(::GetProcessHeap(), 0, pContext);
- delete pContext;
- }
- ::LeaveCriticalSection(&m_FreeContextListLock);
- EnterCriticalSection(&m_HeapLock);
- iContextCount--;
- LeaveCriticalSection(&m_HeapLock);
- }
- void CIOCPServer::FreeBuffers()
- {
- // 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
- ::EnterCriticalSection(&m_FreeBufferListLock);
- CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
- CIOCPBuffer *pNextBuffer;
- while(pFreeBuffer != NULL)
- {
- pNextBuffer = pFreeBuffer->pNext;
- delete pFreeBuffer;
- // if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))
- // {
- // #ifdef _DEBUG
- // ::OutputDebugString(" FreeBuffers释放内存出错!");
- // #endif // _DEBUG
- // break;
- // }
- pFreeBuffer = pNextBuffer;
- }
- m_pFreeBufferList = NULL;
- m_nFreeBufferCount = 0;
- ::LeaveCriticalSection(&m_FreeBufferListLock);
- }
- void CIOCPServer::FreeContexts()
- {
- // 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
- ::EnterCriticalSection(&m_FreeContextListLock);
- CIOCPContext *pFreeContext = m_pFreeContextList;
- CIOCPContext *pNextContext;
- while(pFreeContext != NULL)
- {
- pNextContext = pFreeContext->pNext;
- ::DeleteCriticalSection(&pFreeContext->Lock);
- delete pFreeContext;
- // if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
- // {
- // #ifdef _DEBUG
- // ::OutputDebugString(" FreeBuffers释放内存出错!");
- // #endif // _DEBUG
- // break;
- // }
- pFreeContext = pNextContext;
- }
- m_pFreeContextList = NULL;
- m_nFreeContextCount = 0;
- ::LeaveCriticalSection(&m_FreeContextListLock);
- }
- BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
- {
- // 向客户连接列表添加一个CIOCPContext对象
- ::EnterCriticalSection(&m_ConnectionListLock);
- if(m_nCurrentConnection <= m_nMaxConnections)
- {
- // 添加到表头
- pContext->pNext = m_pConnectionList;
- m_pConnectionList = pContext;
- // 更新计数
- m_nCurrentConnection ++;
- ::LeaveCriticalSection(&m_ConnectionListLock);
- return TRUE;
- }
- ::LeaveCriticalSection(&m_ConnectionListLock);
- return FALSE;
- }
- void CIOCPServer::CloseAConnection(CIOCPContext *pContext)
- {
- if(pContext == NULL || pContext->bClosing == TRUE)
- return;
- // 首先从列表中移除要关闭的连接
- ::EnterCriticalSection(&m_ConnectionListLock);
- CIOCPContext* pTest = m_pConnectionList;
- if(pTest == pContext)
- {
- m_pConnectionList = pContext->pNext;
- m_nCurrentConnection --;
- }
- else
- {
- while(pTest != NULL && pTest->pNext != pContext)
- pTest = pTest->pNext;
- if(pTest != NULL)
- {
- pTest->pNext = pContext->pNext;
- m_nCurrentConnection --;
- }
- }
- ::LeaveCriticalSection(&m_ConnectionListLock);
- // 然后关闭客户套节字
- ::EnterCriticalSection(&pContext->Lock);
- if(pContext->s != INVALID_SOCKET)
- {
- ::closesocket(pContext->s);
- pContext->s = INVALID_SOCKET;
- }
- pContext->bClosing = TRUE;
- ::LeaveCriticalSection(&pContext->Lock);
- }
- void CIOCPServer::CloseAllConnections()
- {
- // 遍历整个连接列表,关闭所有的客户套节字
- ::EnterCriticalSection(&m_ConnectionListLock);
- CIOCPContext *pContext = m_pConnectionList;
- while(pContext != NULL)
- {
- ::EnterCriticalSection(&pContext->Lock);
- if(pContext->s != INVALID_SOCKET)
- {
- ::closesocket(pContext->s);
- pContext->s = INVALID_SOCKET;
- }
- pContext->bClosing = TRUE;
- ::LeaveCriticalSection(&pContext->Lock);
- pContext = pContext->pNext;
- }
- m_pConnectionList = NULL;
- m_nCurrentConnection = 0;
- ::LeaveCriticalSection(&m_ConnectionListLock);
- }
- BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
- {
- // 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
- ::EnterCriticalSection(&m_PendingAcceptsLock);
- if(m_pPendingAccepts == NULL)
- m_pPendingAccepts = pBuffer;
- else
- {
- pBuffer->pNext = m_pPendingAccepts;
- m_pPendingAccepts = pBuffer;
- }
- m_nPendingAcceptCount ++;
- ::LeaveCriticalSection(&m_PendingAcceptsLock);
- return TRUE;
- }
- BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
- {
- BOOL bResult = FALSE;
- // 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
- ::EnterCriticalSection(&m_PendingAcceptsLock);
- CIOCPBuffer *pTest = m_pPendingAccepts;
- if(pTest == pBuffer) // 如果是表头元素
- {
- m_pPendingAccepts = pBuffer->pNext;
- bResult = TRUE;
- }
- else // 不是表头元素的话,就要遍历这个表来查找了
- {
- while(pTest != NULL && pTest->pNext != pBuffer)
- pTest = pTest->pNext;
- if(pTest != NULL)
- {
- pTest->pNext = pBuffer->pNext;
- bResult = TRUE;
- }
- }
- // 更新计数
- if(bResult)
- m_nPendingAcceptCount --;
- ::LeaveCriticalSection(&m_PendingAcceptsLock);
- return bResult;
- }
- void CIOCPServer::ErrorHandle(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- CloseAConnection(pContext);
- }
- BOOL CIOCPServer::PostSendToList(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
- {
- ::EnterCriticalSection(&pContext->Lock);
- CIOCPNextToSend *ptr = pContext->pNextToSend;
- CIOCPNextToSend * pSend = new CIOCPNextToSend();
- pSend->pBuffer = pBuffer;
- pSend->pNext = NULL;
- if(ptr == NULL)
- {
- printf("数据:%10.10s ...,被直接发送。\n",pBuffer->buff);
- //::EnterCriticalSection(&pContext->Lock);
- pContext->pNextToSend = pSend;
- //::LeaveCriticalSection(&pContext->Lock);
- if(!PostSend(pContext,pBuffer))//如果没有需要等待的send就直接发送
- {
- ::LeaveCriticalSection(&pContext->Lock);
- return FALSE;
- }
- }
- else
- {
- printf("数据:%10.10s ...,被放入链表结尾。\n",pBuffer->buff);
- while(ptr->pNext != NULL)
- {
- ptr = ptr->pNext;
- }
- ptr->pNext = pSend;//新的发送请求放在链表结尾
- }
- ::LeaveCriticalSection(&pContext->Lock);
- return TRUE;
- }
- BOOL CIOCPServer::PostNextWriteBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)//xss
- {
- ::EnterCriticalSection(&pContext->Lock);
- CIOCPNextToSend* pSend = pContext->pNextToSend;
- CIOCPNextToSend* pNextSend = NULL;
- if(pSend != NULL && pSend->pNext != NULL)//发送成功的pBuffer是队列的第一个,发送下一个,pNextToSend指向下一个,pBuffer由外面释放。
- {
- pNextSend = pSend->pNext;
- if(pNextSend->pBuffer != NULL)
- {
- printf("数据:%10.10s ...从链表中弹出被发送。\n",pNextSend->pBuffer->buff);
- if(!PostSend(pContext,pNextSend->pBuffer))
- {
- delete pSend;
- pContext->pNextToSend = pNextSend;
- ::LeaveCriticalSection(&pContext->Lock);
- return FALSE;
- }
- }
- }
- if(pSend != NULL)
- {
- pNextSend = pSend->pNext;
- delete pSend;
- pContext->pNextToSend = pNextSend;
- }
- ::LeaveCriticalSection(&pContext->Lock);
- return TRUE;
- }
- CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- if(pBuffer != NULL)
- {
- // 如果与要读的下一个序列号相等,则读这块缓冲区
- if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
- {
- return pBuffer;
- }
- // 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
- // 列表中的缓冲区是按照其序列号从小到大的顺序排列的
- pBuffer->pNext = NULL;
- CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
- CIOCPBuffer *pPre = NULL;
- while(ptr != NULL)
- {
- if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)
- break;
- pPre = ptr;
- ptr = ptr->pNext;
- }
- if(pPre == NULL) // 应该插入到表头
- {
- pBuffer->pNext = pContext->pOutOfOrderReads;
- pContext->pOutOfOrderReads = pBuffer;
- }
- else // 应该插入到表的中间
- {
- pBuffer->pNext = pPre->pNext;
- pPre->pNext = pBuffer/*->pNext*/;//xss,个人觉得应该是pPre->pNext = pBuffer;
- }
- }
- // 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户
- CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
- if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
- {
- pContext->pOutOfOrderReads = ptr->pNext;
- return ptr;
- }
- return NULL;
- }
- BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer) // 在监听套节字上投递Accept请求
- {
- // 设置I/O类型
- pBuffer->nOperation = OP_ACCEPT;
- // 投递此重叠I/O
- DWORD dwBytes;
- pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- BOOL b = m_lpfnAcceptEx(m_sListen,
- pBuffer->sClient,
- pBuffer->buff,
- pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),//xss,第一次都是收一个cmd_header
- sizeof(sockaddr_in) + 16,
- sizeof(sockaddr_in) + 16,
- &dwBytes,
- &pBuffer->ol);
- if(!b && ::WSAGetLastError() != WSA_IO_PENDING)
- {
- return FALSE;
- }
- if(pBuffer->nOperation == 0)
- {
- int x = 0;
- }
- return TRUE;
- };
- BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- // 设置I/O类型
- pBuffer->nOperation = OP_READ;
- ::EnterCriticalSection(&pContext->Lock);
- // 设置序列号
- pBuffer->nSequenceNumber = pContext->nReadSequence;
- // 投递此重叠I/O
- DWORD dwBytes;
- DWORD dwFlags = 0;
- WSABUF buf;
- buf.buf = pBuffer->buff;
- buf.len = pBuffer->nLen;
- if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
- {
- if(::WSAGetLastError() != WSA_IO_PENDING)
- {
- printf("WSARecv出错:%d\n",WSAGetLastError());
- ::LeaveCriticalSection(&pContext->Lock);
- return FALSE;
- }
- }
- // 增加套节字上的重叠I/O计数和读序列号计数
- pContext->nOutstandingRecv ++;
- pContext->nReadSequence ++;
- ::LeaveCriticalSection(&pContext->Lock);
- return TRUE;
- }
- BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- // 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
- if(pContext->nOutstandingSend > m_nMaxSends)
- return FALSE;
- // 设置I/O类型,增加套节字上的重叠I/O计数
- pBuffer->nOperation = OP_WRITE;
- // 投递此重叠I/O
- DWORD dwBytes;
- DWORD dwFlags = 0;
- WSABUF buf;
- buf.buf = pBuffer->buff;
- buf.len = pBuffer->nLen;
- if(::WSASend(pContext->s,
- &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
- {
- int x;
- if((x=::WSAGetLastError()) != WSA_IO_PENDING)
- {
- printf("发送失败!错误码:%d",x);
- return FALSE;
- }
- }
- // 增加套节字上的重叠I/O计数
- ::EnterCriticalSection(&pContext->Lock);
- pContext->nOutstandingSend ++;
- ::LeaveCriticalSection(&pContext->Lock);
- if(pBuffer->nOperation == 0)
- {
- int x = 0;
- }
- return TRUE;
- }
- BOOL CIOCPServer::Start(int nPort, int nMaxConnections,
- int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)
- {
- // 检查服务是否已经启动
- if(m_bServerStarted)
- return FALSE;
- // 保存用户参数
- m_nPort = nPort;
- m_nMaxConnections = nMaxConnections;
- m_nMaxFreeBuffers = nMaxFreeBuffers;
- m_nMaxFreeContexts = nMaxFreeContexts;
- m_nInitialReads = nInitialReads;
- // 初始化状态变量
- m_bShutDown = FALSE;
- m_bServerStarted = TRUE;
- // 创建监听套节字,绑定到本地端口,进入监听模式
- m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- SOCKADDR_IN si;
- si.sin_family = AF_INET;
- si.sin_port = ::ntohs(m_nPort);
- si.sin_addr.S_un.S_addr = INADDR_ANY;
- if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)
- {
- m_bServerStarted = FALSE;
- return FALSE;
- }
- ::listen(m_sListen, 200);
- // 创建完成端口对象
- m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
- // 加载扩展函数AcceptEx
- GUID GuidAcceptEx = WSAID_ACCEPTEX;
- DWORD dwBytes;
- ::WSAIoctl(m_sListen,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &GuidAcceptEx,
- sizeof(GuidAcceptEx),
- &m_lpfnAcceptEx,
- sizeof(m_lpfnAcceptEx),
- &dwBytes,
- NULL,
- NULL);
- // 加载扩展函数GetAcceptExSockaddrs
- GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
- ::WSAIoctl(m_sListen,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &GuidGetAcceptExSockaddrs,
- sizeof(GuidGetAcceptExSockaddrs),
- &m_lpfnGetAcceptExSockaddrs,
- sizeof(m_lpfnGetAcceptExSockaddrs),
- &dwBytes,
- NULL,
- NULL
- );
- // 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
- ::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);
- // 注册FD_ACCEPT事件。
- // 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
- WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);
- // 创建监听线程
- m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);
- return TRUE;
- }
- void CIOCPServer::Shutdown()
- {
- if(!m_bServerStarted)
- return;
- // 通知监听线程,马上停止服务
- m_bShutDown = TRUE;
- ::SetEvent(m_hAcceptEvent);
- // 等待监听线程退出
- ::WaitForSingleObject(m_hListenThread, INFINITE);
- ::CloseHandle(m_hListenThread);
- m_hListenThread = NULL;
- m_bServerStarted = FALSE;
- }
- DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
- {
- CIOCPServer *pThis = (CIOCPServer*)lpParam;
- // 先在监听套节字上投递几个Accept I/O
- CIOCPBuffer *pBuffer;
- for(int i=0; i<pThis->m_nInitialAccepts; i++)
- {
- pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);//xss,BUFFER_SIZE
- if(pBuffer == NULL)
- return -1;
- pThis->InsertPendingAccept(pBuffer);
- pThis->PostAccept(pBuffer);
- }
- // 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
- HANDLE hWaitEvents[2 + MAX_THREAD];
- int nEventCount = 0;
- hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;
- hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;
- // 创建指定数量的工作线程在完成端口上处理I/O
- for(int i=0; i<MAX_THREAD; i++)
- {
- hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
- }
- // 下面进入无限循环,处理事件对象数组中的事件
- while(TRUE)
- {
- int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);
- // 首先检查是否要停止服务
- if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)
- {
- // 关闭所有连接
- pThis->CloseAllConnections();
- ::Sleep(0); // 给I/O工作线程一个执行的机会
- // 关闭监听套节字
- ::closesocket(pThis->m_sListen);
- pThis->m_sListen = INVALID_SOCKET;
- ::Sleep(0); // 给I/O工作线程一个执行的机会
- // 通知所有I/O处理线程退出
- for(int i=2; i<MAX_THREAD + 2; i++)
- {
- ::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);
- }
- // 等待I/O处理线程退出
- ::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);
- for(int i=2; i<MAX_THREAD + 2; i++)
- {
- ::CloseHandle(hWaitEvents[i]);
- }
- ::CloseHandle(pThis->m_hCompletion);
- pThis->FreeBuffers();
- pThis->FreeContexts();
- ::ExitThread(0);
- }
- // 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间
- if(nIndex == WSA_WAIT_TIMEOUT)
- {
- pBuffer = pThis->m_pPendingAccepts;
- while(pBuffer != NULL)
- {
- int nSeconds;
- int nLen = sizeof(nSeconds);
- // 取得连接建立的时间
- ::getsockopt(pBuffer->sClient,
- SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);
- // 如果超过2分钟客户还不发送初始数据,就让这个客户go away
- if(nSeconds != -1 && nSeconds > /*2*60*/50)
- {
- closesocket(pBuffer->sClient);
- pBuffer->sClient = INVALID_SOCKET;
- }
- pBuffer = pBuffer->pNext;
- }
- }
- else
- {
- nIndex = nIndex - WAIT_OBJECT_0;
- WSANETWORKEVENTS ne;
- int nLimit=0;
- if(nIndex == 0) // 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
- {
- ::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
- if(ne.lNetworkEvents & FD_ACCEPT)
- {
- nLimit = 50; // 增加的个数,这里设为50个
- }
- }
- else if(nIndex == 1) // 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
- {
- nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
- }
- else if(nIndex > 1) // I/O服务线程退出,说明有错误发生,关闭服务器
- {
- pThis->m_bShutDown = TRUE;
- continue;
- }
- // 投递nLimit个AcceptEx I/O请求
- int i = 0;
- while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)
- {
- pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
- if(pBuffer != NULL)
- {
- pThis->InsertPendingAccept(pBuffer);
- pThis->PostAccept(pBuffer);
- }
- }
- }
- }
- return 0;
- }
- DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
- {
- #ifdef _DEBUG
- ::OutputDebugString(" WorkerThread 启动... \n");
- #endif // _DEBUG
- CIOCPServer *pThis = (CIOCPServer*)lpParam;
- CIOCPBuffer *pBuffer = NULL;
- DWORD dwKey;
- DWORD dwTrans;
- LPOVERLAPPED lpol;
- while(TRUE)
- {
- // 在关联到此完成端口的所有套节字上等待I/O完成
- BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,
- &dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);
- if(dwTrans == -1) // 用户通知退出
- {
- #ifdef _DEBUG
- ::OutputDebugString(" WorkerThread 退出 \n");
- #endif // _DEBUG
- ::ExitThread(0);
- }
- if(dwTrans != -2)
- pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);
- int nError = NO_ERROR;
- if(!bOK) // 在此套节字上有错误发生
- {
- printf("完成端口套接字上有错误:%d\n",GetLastError());
- SOCKET s;
- if(pBuffer->nOperation == OP_ACCEPT)
- {
- s = pThis->m_sListen;
- }
- else
- {
- if(dwKey == 0)
- break;
- s = ((CIOCPContext*)dwKey)->s;
- }
- DWORD dwFlags = 0;
- if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
- {
- nError = ::WSAGetLastError();
- }
- }
- pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
- printf("Buffer:%d Context:%d\n",iBufferCount,iContextCount);
- }
- #ifdef _DEBUG
- ::OutputDebugString(" WorkerThread 退出 \n");
- #endif // _DEBUG
- return 0;
- }
- int g_x = 0;
- void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
- {
- CIOCPContext *pContext = (CIOCPContext *)dwKey;
- #ifdef _DEBUG
- ::OutputDebugString(" HandleIO... \n");
- #endif // _DEBUG
- // 1)首先减少套节字上的未决I/O计数
- if(dwTrans == -2)
- {
- CloseAConnection(pContext);
- return;
- }
- if(pContext != NULL)
- {
- ::EnterCriticalSection(&pContext->Lock);
- if(pBuffer->nOperation == OP_READ)
- pContext->nOutstandingRecv --;
- else if(pBuffer->nOperation == OP_WRITE)
- pContext->nOutstandingSend --;
- ::LeaveCriticalSection(&pContext->Lock);
- // 2)检查套节字是否已经被我们关闭
- if(pContext->bClosing)
- {
- #ifdef _DEBUG
- ::OutputDebugString(" 检查到套节字已经被我们关闭 \n");
- #endif // _DEBUG
- if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- }
- // 释放已关闭套节字的未决I/O
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- return;
- }
- }
- else
- {
- RemovePendingAccept(pBuffer);
- }
- // 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
- if(nError != NO_ERROR)
- {
- if(pBuffer->nOperation != OP_ACCEPT)
- {
- OnConnectionError(pContext, pBuffer, nError);
- CloseAConnection(pContext);
- if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- }
- #ifdef _DEBUG
- ::OutputDebugString(" 检查到客户套节字上发生错误 \n");
- #endif // _DEBUG
- }
- else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
- {
- // 客户端出错,释放I/O缓冲区
- if(pBuffer->sClient != INVALID_SOCKET)
- {
- ::closesocket(pBuffer->sClient);
- pBuffer->sClient = INVALID_SOCKET;
- }
- #ifdef _DEBUG
- ::OutputDebugString(" 检查到监听套节字上发生错误 \n");
- #endif // _DEBUG
- }
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- return;
- }
- // 开始处理
- if(pBuffer->nOperation == OP_ACCEPT)
- {
- if(dwTrans == 0)
- {
- #ifdef _DEBUG
- ::OutputDebugString(" 监听套节字上客户端关闭 \n");
- #endif // _DEBUG
- if(pBuffer->sClient != INVALID_SOCKET)
- {
- ::closesocket(pBuffer->sClient);
- pBuffer->sClient = INVALID_SOCKET;
- }
- }
- else
- {
- // 为新接受的连接申请客户上下文对象
- CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
- if(pClient != NULL)
- {
- if(AddAConnection(pClient))
- {
- // 取得客户地址
- int nLocalLen, nRmoteLen;
- LPSOCKADDR pLocalAddr, pRemoteAddr;
- m_lpfnGetAcceptExSockaddrs(
- pBuffer->buff,
- pBuffer->nLen - (sizeof(sockaddr_in) + 16) * 2/*sizeof(cmd_header)*/,
- sizeof(sockaddr_in) + 16,
- sizeof(sockaddr_in) + 16,
- (SOCKADDR **)&pLocalAddr,
- &nLocalLen,
- (SOCKADDR **)&pRemoteAddr,
- &nRmoteLen);
- memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
- memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);
- // 关联新连接到完成端口对象
- ::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);
- // 通知用户
- pBuffer->nLen = dwTrans;
- OnConnectionEstablished(pClient, pBuffer);
- if(pClient->bClosing && pClient->nOutstandingRecv == 0 && pClient->nOutstandingSend == 0)
- {
- ReleaseContext(pClient);
- pContext = NULL;
- }
- else if(pClient->hTimer == NULL)//接收一个客户端的同时创建一个检测I/O超时的Timer
- {
- pClient->hCompletion = m_hCompletion;
- CreateTimerQueueTimer(&pClient->hTimer,m_hTimerQueue,(WAITORTIMERCALLBACK)TimerRoutine,(PVOID)pClient,60*1000,0,0);
- }
- // 向新连接投递Read请求或者Write请求,直接关闭这些空间在套节字关闭或出错时释放
- // CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
- // if(p != NULL)
- // {
- // if(!PostRecv(pClient, p))
- // {
- // CloseAConnection(pClient);
- // }
- // }
- }
- else // 连接数量已满,关闭连接
- {
- CloseAConnection(pClient);
- ReleaseContext(pClient);
- pContext = NULL;
- }
- }
- else
- {
- // 资源不足,关闭与客户的连接即可
- ::closesocket(pBuffer->sClient);
- pBuffer->sClient = INVALID_SOCKET;
- }
- }
- // Accept请求完成,释放I/O缓冲区
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- // 通知监听线程继续再投递一个Accept请求
- ::InterlockedIncrement(&m_nRepostCount);
- ::SetEvent(m_hRepostEvent);
- }
- else if(pBuffer->nOperation == OP_READ)
- {
- if(dwTrans == 0) // 对方关闭套节字
- {
- // 先通知用户
- pBuffer->nLen = 0;
- OnConnectionClosing(pContext, pBuffer);
- // 再关闭连接
- CloseAConnection(pContext);
- // 释放客户上下文和缓冲区对象
- if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- }
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- }
- else
- {
- pBuffer->nLen = dwTrans;
- // 按照I/O投递的顺序读取接收到的数据
- CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
- while(p != NULL)
- {
- // 通知用户
- OnReadCompleted(pContext, p);
- // 增加要读的序列号的值
- ::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
- // 释放这个已完成的I/O
- ReleaseBuffer(p);
- p = GetNextReadBuffer(pContext, NULL);
- }
- if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- }
- else if(pContext->hTimer != NULL)
- {
- ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);//重置监视时间,当一个投递完成后,60s内无任何交互则断开。
- }
- // 继续投递一个新的接收请求
- // pBuffer = AllocateBuffer(BUFFER_SIZE);
- //if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
- //{
- // CloseAConnection(pContext);
- //}
- }
- }
- else if(pBuffer->nOperation == OP_WRITE)
- {
- if(dwTrans == 0) // 对方关闭套节字
- {
- // 先通知用户
- pBuffer->nLen = 0;
- OnConnectionClosing(pContext, pBuffer);
- // 再关闭连接
- CloseAConnection(pContext);
- // 释放客户上下文和缓冲区对象
- if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- }
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- }
- else
- {
- if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- return;
- }
- else if(pContext->hTimer != NULL)
- {
- ChangeTimerQueueTimer(m_hTimerQueue,pContext->hTimer,60*1000,0);
- }
- // 写操作完成,通知用户
- if(dwTrans < pBuffer->nLen)//如果此send没有发送完全,则发送剩下的部分(此部分如果还是没发完全,这里同样进行)
- {
- printf("send未发送完全,发送:%d,总长度:%d\n",dwTrans,pBuffer->nLen);
- CIOCPBuffer* p = AllocateBuffer(pBuffer->nLen - dwTrans);
- if(p != NULL)
- memcpy(p->buff,pBuffer->buff + dwTrans,pBuffer->nLen - dwTrans);
- if(p == NULL || !PostSend(pContext,p))
- {
- CloseAConnection(pContext);
- return;
- }
- }
- else
- {
- if(!PostNextWriteBuffer(pContext,pBuffer))
- {
- CloseAConnection(pContext);
- return;
- }
- }
- pBuffer->nLen = dwTrans;
- OnWriteCompleted(pContext, pBuffer);
- if(pContext->bClosing && pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
- {
- ReleaseContext(pContext);
- pContext = NULL;
- }
- // 释放SendText函数申请的缓冲区
- ReleaseBuffer(pBuffer);
- pBuffer = NULL;
- }
- }
- }
- BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
- {
- CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
- if(pBuffer != NULL)
- {
- memcpy(pBuffer->buff, pszText, nLen);
- return PostSend(pContext, pBuffer);
- }
- return FALSE;
- }
- //投递接收请求示例
- //CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
- //if(p != NULL)
- //{
- // if(!PostRecv(pContext, p))
- // {
- // CloseAConnection(pContext);
- // }
- //}
- //投递发送请求示例
- //CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
- //if(p != NULL)
- //{
- // if(!PostSendToList(pContext, p))
- // {
- // CloseAConnection(pContext);
- // }
- //}
- void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- //连接建立,且第一次数据接收完成。
- //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
- }
- void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- }
- void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- //一次数据接收完成。
- //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
- }
- void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
- {
- //一次数据发送完成。
- //接下来可以根据业务逻辑,PostRecv收或者PostSendToList发或者CloseAConnection(pContext)关闭连接
- }
- void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
- {
- }