该模型与WSAAsyncSelect模型类似,允许应用程序在一个或多个套接字上接受基于事件的网络通知。它与WSAAsyncSelect模型类似是因为它也接收FD_XXX类型的网络事件,不过并不是依靠Windows的消息驱动机制,而是经由事件对象句柄通知。
使用这个模型的基本思路是为感兴趣的一组网络事件创建一个事件对象,再调用WSAEventSelect函数将网络事件和事件对象、套接字关联起来。当网络事件发生时,Winsock使相应的事件对象受信,在事件对象上的等待函数就会返回。之后,调用WSAEnumNetworkEvents函数便可获取到底发生了什么网络事件。
WSAEVENT WSACreateEvent(void); // 返回一个手工重置的事件对象句柄(如果自动重置,可用CreateEvent)
int WSAEventSelect(
SOCKET s, // 套接字句柄
WSAEVENT hEventObject, // 上述描述的创建的事件对象句柄
long lNetworkEvents // 感兴趣的FD_XXX网络事件的组合
);
DWORD WSAWaitForMultipleEvents(
DOWRD cEvents, // 指定下面lphEvents所指的数组中事件对象句柄的个数
const WSAEVENT* lphEvents, // 指向一个事件对象句柄数组
BOOL fWaitAll, // 指定是否等待所有事件对象都变成受信状态
DWORD dwTimeout, // 指定要等待的时间,WSA_INFINITE为无穷大(0,函数立即返回,马上知道事件对象此刻是否受信)
BOOL fAlertable // 在使用WSAEventSelect模型时可以忽略,应设为FALSE
);
WSAWaitForMultipleEvents最多支持WSA_MAXIMUM_WAIT_EVENTS个对象,WSA_MAXIMUM_WAIT_EVENTS被定义为64。因此,这个I/O模型在一个线程中同一时间最多能支持64个套接字,如果需要使用这个模型管理更多套接字,就需要创建额外的工作线程了。
WSAWaitForMultipleEvents的返回值:1. WSA_WAIT_TIMEOUT(超时);2. 如果指定时间内有网络事件发生,返回值指明哪个事件对象促使函数返回,WSA_WAIT_EVENT_0 to (WSA_WAIT_EVENT_0 + cEvents - 1) ;3. WSA_WAIT_FAILED;4. WAIT_IO_COMPLETION,此模型不用。
注意,将fWaitAll参数设为FALSE以后,如果同时有几个事件对象受信,WSAWaitForMultipleEvents函数的返回值也仅能指明一个,就是句柄数组中最前面的那个。如果指明的这个事件对象总有网络事件发生,那么后面其他事件对象所关联的网络事件就得不到处理了。解决办法是,WSAWaitForMultipleEvents函数返回后,对每个事件都再次调用WSAWaitForMultipleEvents函数,以便确定其状态。
一旦事件对象受信,那么找到与之对应的套接字,然后调用WSAEnumNetworkEvents函数即可查看发生了什么网络事件。
int WSAEnumNetworkEvents(
SOCKET s, // 套接字句柄
WSAEVENT hEventObject, // 对应的事件对象句柄。如果提供了此参数,本函数会重置这个事件对象的状态
LPWSANETWORKEVENTS lpNetworkEvents // 指向一个WSANETWORKEVENTS结构
);
最后一个参数用来取得在套接字上发生的网络事件和相关的出错代码,
typedef struct WSANETWORKEVENTS{
long lNetworkEvents; // 指定已发生的网络事件(如FD_ACCEPT、FD_READ等)
int iErrorCode[FD_MAX_EVENTS]; // 与lNetworkEvents相关的出错代码
} WSANETWORKEVENTS, *LPWSANETWORKEVENTS;
iErrorCode的每个成员对应着一个网络事件的出错代码,可以用预定义标识FD_READ_BIT、FD_WRITE_BIT等来索引FD_READ、FD_WRITE等事件发生时的出错代码。如:
if(event.lNetworkEvent & FD_READ) // 处理FD_READ通知消息
{
if(event.iErrorCode[FD_READ_BIT] != 0)
{
...... // FD_READ出错,错误代码为event.iErrorCode[FD_READ_BIT]
}
}
给出实例,仍旧为TCP服务器程序,打印出接受数据。步骤如下:
(1)创建一个事件句柄表和一个对应的套接字句柄表。
(2)每创建一个套接字,就创建一个事件对象,把它们的句柄分别放入上面的两个表中,并调用WSAEventSelect添加它们的关联。
(3)调用WSAWaitForMultipleEvents在所有事件对象上等待,此函数返回后,我们对事件句柄表中的每个事件调用WSAWaitForMultipleEvents函数,以便确认在哪些套接字上发生了网络事件。
(4)处理发生的网络事件,继续在事件对象上等待。
// initsock.h #include <winsock2.h> #pragma comment(lib, "WS2_32") class CInitSock { public: CInitSock(); ~CInitSock(); }; inline CInitSock::CInitSock() { // 初始化WS2_32.dll WSADATA wsaData; WORD sockVersion = MAKEWORD(2, 2); if(::WSAStartup(sockVersion, &wsaData) != 0) { exit(0); } } inline CInitSock::~CInitSock() { ::WSACleanup(); } ////////////////////////////////////////////////// // WSAEventSelect文件 #include "initsock.h" #include <stdio.h> #include <iostream> #include <windows.h> using namespace std; // 初始化Winsock库 CInitSock theSock; int main() { // 事件句柄和套节字句柄表 WSAEVENT eventArray[WSA_MAXIMUM_WAIT_EVENTS]; SOCKET sockArray[WSA_MAXIMUM_WAIT_EVENTS]; int nEventTotal = 0; USHORT nPort = 4567; // 此服务器监听的端口号 // 创建监听套节字 SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nPort); sin.sin_addr.S_un.S_addr = INADDR_ANY; if(::bind(sListen, (sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR) { printf(" Failed bind() \n"); return -1; } ::listen(sListen, 5); // 创建事件对象,并关联到新的套节字 WSAEVENT event = ::WSACreateEvent(); ::WSAEventSelect(sListen, event, FD_ACCEPT|FD_CLOSE); // 添加到表中 eventArray[nEventTotal] = event; sockArray[nEventTotal] = sListen; nEventTotal++; // 处理网络事件 while(TRUE) { // 在所有事件对象上等待 int nIndex = ::WSAWaitForMultipleEvents(nEventTotal, eventArray, FALSE, WSA_INFINITE, FALSE); // 对每个事件调用WSAWaitForMultipleEvents函数,以便确定它的状态 nIndex = nIndex - WSA_WAIT_EVENT_0; for(int i=nIndex; i<nEventTotal; i++) { nIndex = ::WSAWaitForMultipleEvents(1, &eventArray[i], TRUE, 1000, FALSE); if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT) { continue; } else { // 获取到来的通知消息,WSAEnumNetworkEvents函数会自动重置受信事件 WSANETWORKEVENTS event; ::WSAEnumNetworkEvents(sockArray[i], eventArray[i], &event); if(event.lNetworkEvents & FD_ACCEPT) // 处理FD_ACCEPT通知消息 { if(event.iErrorCode[FD_ACCEPT_BIT] == 0) { if(nEventTotal > WSA_MAXIMUM_WAIT_EVENTS) { printf(" Too many connections! \n"); continue; } SOCKET sNew = ::accept(sockArray[i], NULL, NULL); WSAEVENT event = ::WSACreateEvent(); ::WSAEventSelect(sNew, event, FD_READ|FD_CLOSE|FD_WRITE); // 添加到表中 eventArray[nEventTotal] = event; sockArray[nEventTotal] = sNew; nEventTotal++; } } else if(event.lNetworkEvents & FD_READ) // 处理FD_READ通知消息 { if(event.iErrorCode[FD_READ_BIT] == 0) { char szText[256]; int nRecv = ::recv(sockArray[i], szText, strlen(szText), 0); if(nRecv > 0) { szText[nRecv] = '\0'; printf("接收到数据:%s \n", szText); } } } else if(event.lNetworkEvents & FD_CLOSE) // 处理FD_CLOSE通知消息 { if(event.iErrorCode[FD_CLOSE_BIT] == 0) { ::closesocket(sockArray[i]); ::CloseHandle(eventArray[i]); for(int j=i; j<nEventTotal-1; j++) { sockArray[j] = sockArray[j+1]; /* sockArray[j] = sockArray[j+1]; */ eventArray[j] = eventArray[j+1]; } nEventTotal--; } } else if(event.lNetworkEvents & FD_WRITE) // 处理FD_WRITE通知消息 { } } } } return 0; }
WSAEventSelect模型简单易用,也不需要窗口坏境。该模型唯一缺点是有最多等待64个事件对象的限制,当套接字连接数量增加时,就必须创建多个线程来处理I/O,也就是使用所谓的线程池。
接下来采用该模型设计一个可以处理大量客户I/O请求的服务器程序。
设计的总体思路比较简单,程序的主线程负责监听客户端的连接请求,接受到新连接之后,将新套接字安排给工作线程处理I/O。每个工作线程最多处理64个套接字,如果再有新的套接字,就再创建新的工作线程。
程序用下面的SOCKET_OBJ结构来记录每个客户端套接字的信息。
// 套节字对象 typedef struct _SOCKET_OBJ { SOCKET s; // 套节字句柄 HANDLE event; // 与此套节字相关联的事件对象句柄 sockaddr_in addrRemote; // 客户端地址信息 _SOCKET_OBJ *pNext; // 指向下一个SOCKET_OBJ对象,为的是连成一个表 } SOCKET_OBJ, *PSOCKET_OBJ;
每接受到一个新的连接,便为新连接申请一个SOCKET_OBJ结构,初始化该结构的成员。当连接关闭或者出错时,再释放内存空间。下面的GetSocketObj和FreeSocketObj函数分别用于申请和释放一个SOCKET_OBJ对象。
// 申请一个套节字对象,初始化它的成员 PSOCKET_OBJ GetSocketObj(SOCKET s) { PSOCKET_OBJ pSocket = (PSOCKET_OBJ)::GlobalAlloc(GPTR, sizeof(SOCKET_OBJ)); if(pSocket != NULL) { pSocket->s = s; pSocket->event = ::WSACreateEvent(); } return pSocket; } // 释放一个套节字对象 void FreeSocketObj(PSOCKET_OBJ pSocket) { ::CloseHandle(pSocket->event); if(pSocket->s != INVALID_SOCKET) { ::closesocket(pSocket->s); } ::GlobalFree(pSocket); }
下面的THREAD_OBJ结构来记录每个线程的信息。
// 线程对象 typedef struct _THREAD_OBJ { HANDLE events[WSA_MAXIMUM_WAIT_EVENTS]; // 记录当前线程要等待的事件对象的句柄 int nSocketCount; // 记录当前线程处理的套节字的数量 <= WSA_MAXIMUM_WAIT_EVENTS PSOCKET_OBJ pSockHeader; // 当前线程处理的套节字对象列表,pSockHeader指向表头 PSOCKET_OBJ pSockTail; // pSockTail指向表尾 CRITICAL_SECTION cs; // 关键代码段变量,为的是同步对本结构的访问 _THREAD_OBJ *pNext; // 指向下一个THREAD_OBJ对象,为的是连成一个表 } THREAD_OBJ, *PTHREAD_OBJ;
套接字对象列表记录在它所属的线程对象内,线程对象列表记录在全局变量中,下面定义全局变量g_pThreadList做这件事。
// 线程列表 PTHREAD_OBJ g_pThreadList; // 指向线程对象列表表头 CRITICAL_SECTION g_cs; // 同步对此全局变量的访问
当客户数量增加时,服务器就要创建额外的线程去处理I/O。每创建一个线程,便为新线程申请一个THREAD_OBJ结构,初始化该结构的成员。当客户数量减小,处理I/O的线程关闭时,再释放内存空间。下面的GetThreadObj和FreeThreadObj函数分别用于申请和释放一个THREAD_OBJ对象。
// 申请一个线程对象,初始化它的成员,并将它添加到线程对象列表中 PTHREAD_OBJ GetThreadObj() { PTHREAD_OBJ pThread = (PTHREAD_OBJ)::GlobalAlloc(GPTR, sizeof(THREAD_OBJ)); if(pThread != NULL) { ::InitializeCriticalSection(&pThread->cs); // 创建一个事件对象,用于指示该线程的句柄数组需要重组 pThread->events[0] = ::WSACreateEvent(); // 将新申请的线程对象添加到列表中 ::EnterCriticalSection(&g_cs); pThread->pNext = g_pThreadList; g_pThreadList = pThread; ::LeaveCriticalSection(&g_cs); } return pThread; } // 释放一个线程对象,并将它从线程对象列表中移除 void FreeThreadObj(PTHREAD_OBJ pThread) { // 在线程对象列表中查找pThread所指的对象,如果找到就从中移除 ::EnterCriticalSection(&g_cs); PTHREAD_OBJ p = g_pThreadList; if(p == pThread) // 是第一个? { g_pThreadList = p->pNext; } else { while(p != NULL && p->pNext != pThread) { p = p->pNext; } if(p != NULL) { // 此时,p是pThread的前一个,即“p->pNext == pThread” p->pNext = pThread->pNext; } } ::LeaveCriticalSection(&g_cs); // 释放资源 ::CloseHandle(pThread->events[0]); ::DeleteCriticalSection(&pThread->cs); ::GlobalFree(pThread); }
线程启动之后,要在events数组记录的事件上等待。下面的RebuildArray函数将与套接字相关联的事件对象的句柄写入这个数组。
// 重新建立线程对象的events数组 void RebuildArray(PTHREAD_OBJ pThread) { ::EnterCriticalSection(&pThread->cs); PSOCKET_OBJ pSocket = pThread->pSockHeader; int n = 1; // 从第1个开始写,第0个用于指示需要重建了 while(pSocket != NULL) { pThread->events[n++] = pSocket->event; pSocket = pSocket->pNext; } ::LeaveCriticalSection(&pThread->cs); }
在线程运行期间,如果有新的套接字对象添加到这个线程,就使events[0]事件对象受信,通知线程重新调用RebuildArray函数建立events数组
程序的主线程负责接受客户的连接请求,创建处理客户请求的线程,打印服务器的状态信息。该服务器程序仅维护下面两个状态,它们被定义为全局变量。
// 状态信息 LONG g_nTatolConnections; // 总共连接数量 LONG g_nCurrentConnections; // 当前连接数量
主线程在接受到客户请求之后,使用下面两个函数将新的连接安排给其他线程处理。
// 向一个线程的套节字列表中插入一个套节字 BOOL InsertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket) { BOOL bRet = FALSE; ::EnterCriticalSection(&pThread->cs); if(pThread->nSocketCount < WSA_MAXIMUM_WAIT_EVENTS - 1) { if(pThread->pSockHeader == NULL) { pThread->pSockHeader = pThread->pSockTail = pSocket; } else { pThread->pSockTail->pNext = pSocket; pThread->pSockTail = pSocket; } pThread->nSocketCount ++; bRet = TRUE; } ::LeaveCriticalSection(&pThread->cs); // 插入成功,说明成功处理了客户的连接请求 if(bRet) { ::InterlockedIncrement(&g_nTatolConnections); ::InterlockedIncrement(&g_nCurrentConnections); } return bRet; } // 将一个套节字对象安排给空闲的线程处理 void AssignToFreeThread(PSOCKET_OBJ pSocket) { pSocket->pNext = NULL; ::EnterCriticalSection(&g_cs); PTHREAD_OBJ pThread = g_pThreadList; // 试图插入到现存线程 while(pThread != NULL) { if(InsertSocketObj(pThread, pSocket)) break; pThread = pThread->pNext; } // 没有空闲线程,为这个套节字创建新的线程 if(pThread == NULL) { pThread = GetThreadObj(); InsertSocketObj(pThread, pSocket); ::CreateThread(NULL, 0, ServerThread, pThread, 0, NULL); } ::LeaveCriticalSection(&g_cs); // 指示线程重建句柄数组 ::WSASetEvent(pThread->events[0]); }
连接中断之后,要先将中断的套接字对应的套接字对象从线程对象中移除,再释放套接字对象。下面的RemoveSocketObj函数从给定线程的套接字对象列表中移除一个套接字对象。
// 从给定线程的套节字对象列表中移除一个套节字对象 void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket) { ::EnterCriticalSection(&pThread->cs); // 在套节字对象列表中查找指定的套节字对象,找到后将之移除 PSOCKET_OBJ pTest = pThread->pSockHeader; if(pTest == pSocket) { if(pThread->pSockHeader == pThread->pSockTail) pThread->pSockTail = pThread->pSockHeader = pTest->pNext; else pThread->pSockHeader = pTest->pNext; } else { while(pTest != NULL && pTest->pNext != pSocket) pTest = pTest->pNext; if(pTest != NULL) { if(pThread->pSockTail == pSocket) pThread->pSockTail = pTest; pTest->pNext = pSocket->pNext; } } pThread->nSocketCount --; ::LeaveCriticalSection(&pThread->cs); // 指示线程重建句柄数组 ::WSASetEvent(pThread->events[0]); // 说明一个连接中断 ::InterlockedDecrement(&g_nCurrentConnections); }
工作线程ServerThread负责处理客户的I/O请求。该线程运行之后,取得本线程对象的指针,进入无限循环并在events数组所记录的事件对象上等待,处理网络事件。具体如下:
DWORD WINAPI ServerThread(LPVOID lpParam) { // 取得本线程对象的指针 PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam; while(TRUE) { // 等待网络事件 int nIndex = ::WSAWaitForMultipleEvents( pThread->nSocketCount + 1, pThread->events, FALSE, WSA_INFINITE, FALSE); nIndex = nIndex - WSA_WAIT_EVENT_0; // 查看受信的事件对象 for(int i=nIndex; i<pThread->nSocketCount + 1; i++) { nIndex = ::WSAWaitForMultipleEvents(1, &pThread->events[i], TRUE, 1000, FALSE); if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT) { continue; } else { if(i == 0) // events[0]受信,重建数组 { RebuildArray(pThread); // 如果没有客户I/O要处理了,则本线程退出 if(pThread->nSocketCount == 0) { FreeThreadObj(pThread); return 0; } ::WSAResetEvent(pThread->events[0]); } else // 处理网络事件 { // 查找对应的套节字对象指针,调用HandleIO处理网络事件 PSOCKET_OBJ pSocket = (PSOCKET_OBJ)FindSocketObj(pThread, i); if(pSocket != NULL) { if(!HandleIO(pThread, pSocket)) RebuildArray(pThread); } else printf(" Unable to find socket object \n "); } } } } return 0; }
FindSocketObj函数根据事件对象在events数组中的索引查找相应的套接字对象。
PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex) // nIndex从1开始 { // 在套节字列表中查找 PSOCKET_OBJ pSocket = pThread->pSockHeader; while(--nIndex) { if(pSocket == NULL) return NULL; pSocket = pSocket->pNext; } retu
HandleIO函数调用WSAEnumNetworkEvents函数获取具体发生的网络事件,然后按照程序要实现的功能进行处理。
BOOL HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket) { // 获取具体发生的网络事件 WSANETWORKEVENTS event; ::WSAEnumNetworkEvents(pSocket->s, pSocket->event, &event); do { if(event.lNetworkEvents & FD_READ) // 套节字可读 { if(event.iErrorCode[FD_READ_BIT] == 0) { char szText[256]; int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0); if(nRecv > 0) { szText[nRecv] = '\0'; printf("接收到数据:%s \n", szText); } } else break; } else if(event.lNetworkEvents & FD_CLOSE) // 套节字关闭 { break; } else if(event.lNetworkEvents & FD_WRITE) // 套节字可写 { if(event.iErrorCode[FD_WRITE_BIT] == 0) { } else break; } return TRUE; } while(FALSE); // 套节字关闭,或者有错误发生,程序都会转到这里来执行 RemoveSocketObj(pThread, pSocket); FreeSocketObj(pSocket); return FALSE; }
主线程创建监听套接字,初始化全局变量,处理客户端的连接请求,定时打印状态信息等。具体如下:
int main() { USHORT nPort = 4567; // 此服务器监听的端口号 // 创建监听套节字 SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nPort); sin.sin_addr.S_un.S_addr = INADDR_ANY; if(::bind(sListen, (sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR) { printf(" Failed bind() \n"); return -1; } ::listen(sListen, 200); // 创建事件对象,并关联到监听的套节字 WSAEVENT event = ::WSACreateEvent(); ::WSAEventSelect(sListen, event, FD_ACCEPT|FD_CLOSE); ::InitializeCriticalSection(&g_cs); // 处理客户连接请求,打印状态信息 while(TRUE) { int nRet = ::WaitForSingleObject(event, 5*1000); if(nRet == WAIT_FAILED) { printf(" Failed WaitForSingleObject() \n"); break; } else if(nRet == WSA_WAIT_TIMEOUT) // 定时显式状态信息 { printf(" \n"); printf(" TatolConnections: %d \n", g_nTatolConnections); printf(" CurrentConnections: %d \n", g_nCurrentConnections); continue; } else // 有新的连接未决 { ::ResetEvent(event); // 循环处理所有未决的连接请求 while(TRUE) { sockaddr_in si; int nLen = sizeof(si); SOCKET sNew = ::accept(sListen, (sockaddr*)&si, &nLen); if(sNew == SOCKET_ERROR) break; PSOCKET_OBJ pSocket = GetSocketObj(sNew); pSocket->addrRemote = si; ::WSAEventSelect(pSocket->s, pSocket->event, FD_READ|FD_CLOSE|FD_WRITE); AssignToFreeThread(pSocket); } } } ::DeleteCriticalSection(&g_cs); return 0; }
下面附上完整代码:
////////////////////////////////////////////////////////// // initsock.h文件 #include <winsock2.h> #pragma comment(lib, "WS2_32") // 链接到WS2_32.lib class CInitSock { public: CInitSock(BYTE minorVer = 2, BYTE majorVer = 2) { // 初始化WS2_32.dll WSADATA wsaData; WORD sockVersion = MAKEWORD(minorVer, majorVer); if(::WSAStartup(sockVersion, &wsaData) != 0) { exit(0); } } ~CInitSock() { ::WSACleanup(); } }; ///////////////////////////////////////////////////// // EventSelectServer.h文件 DWORD WINAPI ServerThread(LPVOID lpParam); // 套节字对象 typedef struct _SOCKET_OBJ { SOCKET s; // 套节字句柄 HANDLE event; // 与此套节字相关联的事件对象句柄 sockaddr_in addrRemote; // 客户端地址信息 _SOCKET_OBJ *pNext; // 指向下一个SOCKET_OBJ对象,为的是连成一个表 } SOCKET_OBJ, *PSOCKET_OBJ; // 线程对象 typedef struct _THREAD_OBJ { HANDLE events[WSA_MAXIMUM_WAIT_EVENTS]; // 记录当前线程要等待的事件对象的句柄 int nSocketCount; // 记录当前线程处理的套节字的数量 <= WSA_MAXIMUM_WAIT_EVENTS PSOCKET_OBJ pSockHeader; // 当前线程处理的套节字对象列表,pSockHeader指向表头 PSOCKET_OBJ pSockTail; // pSockTail指向表尾 CRITICAL_SECTION cs; // 关键代码段变量,为的是同步对本结构的访问 _THREAD_OBJ *pNext; // 指向下一个THREAD_OBJ对象,为的是连成一个表 } THREAD_OBJ, *PTHREAD_OBJ; // 线程列表 PTHREAD_OBJ g_pThreadList; // 指向线程对象列表表头 CRITICAL_SECTION g_cs; // 同步对此全局变量的访问 // 状态信息 LONG g_nTatolConnections; // 总共连接数量 LONG g_nCurrentConnections; // 当前连接数量 // 申请一个套节字对象,初始化它的成员 PSOCKET_OBJ GetSocketObj(SOCKET s) { PSOCKET_OBJ pSocket = (PSOCKET_OBJ)::GlobalAlloc(GPTR, sizeof(SOCKET_OBJ)); if(pSocket != NULL) { pSocket->s = s; pSocket->event = ::WSACreateEvent(); } return pSocket; } // 释放一个套节字对象 void FreeSocketObj(PSOCKET_OBJ pSocket) { ::CloseHandle(pSocket->event); if(pSocket->s != INVALID_SOCKET) { ::closesocket(pSocket->s); } ::GlobalFree(pSocket); } // 申请一个线程对象,初始化它的成员,并将它添加到线程对象列表中 PTHREAD_OBJ GetThreadObj() { PTHREAD_OBJ pThread = (PTHREAD_OBJ)::GlobalAlloc(GPTR, sizeof(THREAD_OBJ)); if(pThread != NULL) { ::InitializeCriticalSection(&pThread->cs); // 创建一个事件对象,用于指示该线程的句柄数组需要重组 pThread->events[0] = ::WSACreateEvent(); // 将新申请的线程对象添加到列表中 ::EnterCriticalSection(&g_cs); pThread->pNext = g_pThreadList; g_pThreadList = pThread; ::LeaveCriticalSection(&g_cs); } return pThread; } // 释放一个线程对象,并将它从线程对象列表中移除 void FreeThreadObj(PTHREAD_OBJ pThread) { // 在线程对象列表中查找pThread所指的对象,如果找到就从中移除 ::EnterCriticalSection(&g_cs); PTHREAD_OBJ p = g_pThreadList; if(p == pThread) // 是第一个? { g_pThreadList = p->pNext; } else { while(p != NULL && p->pNext != pThread) { p = p->pNext; } if(p != NULL) { // 此时,p是pThread的前一个,即“p->pNext == pThread” p->pNext = pThread->pNext; } } ::LeaveCriticalSection(&g_cs); // 释放资源 ::CloseHandle(pThread->events[0]); ::DeleteCriticalSection(&pThread->cs); ::GlobalFree(pThread); } // 重新建立线程对象的events数组 void RebuildArray(PTHREAD_OBJ pThread) { ::EnterCriticalSection(&pThread->cs); PSOCKET_OBJ pSocket = pThread->pSockHeader; int n = 1; // 从第1个开始写,第0个用于指示需要重建了 while(pSocket != NULL) { pThread->events[n++] = pSocket->event; pSocket = pSocket->pNext; } ::LeaveCriticalSection(&pThread->cs); } ///////////////////////////////////////////////////////////////////// // 向一个线程的套节字列表中插入一个套节字 BOOL InsertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket) { BOOL bRet = FALSE; ::EnterCriticalSection(&pThread->cs); if(pThread->nSocketCount < WSA_MAXIMUM_WAIT_EVENTS - 1) { if(pThread->pSockHeader == NULL) { pThread->pSockHeader = pThread->pSockTail = pSocket; } else { pThread->pSockTail->pNext = pSocket; pThread->pSockTail = pSocket; } pThread->nSocketCount ++; bRet = TRUE; } ::LeaveCriticalSection(&pThread->cs); // 插入成功,说明成功处理了客户的连接请求 if(bRet) { ::InterlockedIncrement(&g_nTatolConnections); ::InterlockedIncrement(&g_nCurrentConnections); } return bRet; } // 将一个套节字对象安排给空闲的线程处理 void AssignToFreeThread(PSOCKET_OBJ pSocket) { pSocket->pNext = NULL; ::EnterCriticalSection(&g_cs); PTHREAD_OBJ pThread = g_pThreadList; // 试图插入到现存线程 while(pThread != NULL) { if(InsertSocketObj(pThread, pSocket)) break; pThread = pThread->pNext; } // 没有空闲线程,为这个套节字创建新的线程 if(pThread == NULL) { pThread = GetThreadObj(); InsertSocketObj(pThread, pSocket); ::CreateThread(NULL, 0, ServerThread, pThread, 0, NULL); } ::LeaveCriticalSection(&g_cs); // 指示线程重建句柄数组 ::WSASetEvent(pThread->events[0]); } // 从给定线程的套节字对象列表中移除一个套节字对象 void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket) { ::EnterCriticalSection(&pThread->cs); // 在套节字对象列表中查找指定的套节字对象,找到后将之移除 PSOCKET_OBJ pTest = pThread->pSockHeader; if(pTest == pSocket) { if(pThread->pSockHeader == pThread->pSockTail) pThread->pSockTail = pThread->pSockHeader = pTest->pNext; else pThread->pSockHeader = pTest->pNext; } else { while(pTest != NULL && pTest->pNext != pSocket) pTest = pTest->pNext; if(pTest != NULL) { if(pThread->pSockTail == pSocket) pThread->pSockTail = pTest; pTest->pNext = pSocket->pNext; } } pThread->nSocketCount --; ::LeaveCriticalSection(&pThread->cs); // 指示线程重建句柄数组 ::WSASetEvent(pThread->events[0]); // 说明一个连接中断 ::InterlockedDecrement(&g_nCurrentConnections); } BOOL HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket) { // 获取具体发生的网络事件 WSANETWORKEVENTS event; ::WSAEnumNetworkEvents(pSocket->s, pSocket->event, &event); do { if(event.lNetworkEvents & FD_READ) // 套节字可读 { if(event.iErrorCode[FD_READ_BIT] == 0) { char szText[256]; int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0); if(nRecv > 0) { szText[nRecv] = '\0'; printf("接收到数据:%s \n", szText); } } else break; } else if(event.lNetworkEvents & FD_CLOSE) // 套节字关闭 { break; } else if(event.lNetworkEvents & FD_WRITE) // 套节字可写 { if(event.iErrorCode[FD_WRITE_BIT] == 0) { } else break; } return TRUE; } while(FALSE); // 套节字关闭,或者有错误发生,程序都会转到这里来执行 RemoveSocketObj(pThread, pSocket); FreeSocketObj(pSocket); return FALSE; } PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex) // nIndex从1开始 { // 在套节字列表中查找 PSOCKET_OBJ pSocket = pThread->pSockHeader; while(--nIndex) { if(pSocket == NULL) return NULL; pSocket = pSocket->pNext; } return pSocket; } DWORD WINAPI ServerThread(LPVOID lpParam) { // 取得本线程对象的指针 PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam; while(TRUE) { // 等待网络事件 int nIndex = ::WSAWaitForMultipleEvents( pThread->nSocketCount + 1, pThread->events, FALSE, WSA_INFINITE, FALSE); nIndex = nIndex - WSA_WAIT_EVENT_0; // 查看受信的事件对象 for(int i=nIndex; i<pThread->nSocketCount + 1; i++) { nIndex = ::WSAWaitForMultipleEvents(1, &pThread->events[i], TRUE, 1000, FALSE); if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT) { continue; } else { if(i == 0) // events[0]受信,重建数组 { RebuildArray(pThread); // 如果没有客户I/O要处理了,则本线程退出 if(pThread->nSocketCount == 0) { FreeThreadObj(pThread); return 0; } ::WSAResetEvent(pThread->events[0]); } else // 处理网络事件 { // 查找对应的套节字对象指针,调用HandleIO处理网络事件 PSOCKET_OBJ pSocket = (PSOCKET_OBJ)FindSocketObj(pThread, i); if(pSocket != NULL) { if(!HandleIO(pThread, pSocket)) RebuildArray(pThread); } else printf(" Unable to find socket object \n "); } } } } return 0; } /////////////////////////////////////////// // EventSelectServer.cpp文件 #include "../common/initsock.h" #include <stdio.h> #include <windows.h> #include "EventSelectServer.h" // 初始化Winsock库 CInitSock theSock; int main() { USHORT nPort = 4567; // 此服务器监听的端口号 // 创建监听套节字 SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(nPort); sin.sin_addr.S_un.S_addr = INADDR_ANY; if(::bind(sListen, (sockaddr*)&sin, sizeof(sin)) == SOCKET_ERROR) { printf(" Failed bind() \n"); return -1; } ::listen(sListen, 200); // 创建事件对象,并关联到监听的套节字 WSAEVENT event = ::WSACreateEvent(); ::WSAEventSelect(sListen, event, FD_ACCEPT|FD_CLOSE); ::InitializeCriticalSection(&g_cs); // 处理客户连接请求,打印状态信息 while(TRUE) { int nRet = ::WaitForSingleObject(event, 5*1000); if(nRet == WAIT_FAILED) { printf(" Failed WaitForSingleObject() \n"); break; } else if(nRet == WSA_WAIT_TIMEOUT) // 定时显式状态信息 { printf(" \n"); printf(" TatolConnections: %d \n", g_nTatolConnections); printf(" CurrentConnections: %d \n", g_nCurrentConnections); continue; } else // 有新的连接未决 { ::ResetEvent(event); // 循环处理所有未决的连接请求 while(TRUE) { sockaddr_in si; int nLen = sizeof(si); SOCKET sNew = ::accept(sListen, (sockaddr*)&si, &nLen); if(sNew == SOCKET_ERROR) break; PSOCKET_OBJ pSocket = GetSocketObj(sNew); pSocket->addrRemote = si; ::WSAEventSelect(pSocket->s, pSocket->event, FD_READ|FD_CLOSE|FD_WRITE); AssignToFreeThread(pSocket); } } } ::DeleteCriticalSection(&g_cs); return 0; }
参考及引用:《Windows网络与通信程序设计》 王艳平 张越
转载请注明!