现在的位置: 首页 > 综合 > 正文

IOCP–Windows服务器编程之服务端代码

2013年05月08日 ⁄ 综合 ⁄ 共 19098字 ⁄ 字号 评论关闭

/*
==========================================================================

Purpose:

This is a sample code that demonstrates for the following:

* Use of the I/O Completion ports with WinSock.  The idea is to create a
  simple application that will use IOCP, highlight how to use IOCP.

Notes:

* The server will create IOCP, Worker threads, all incoming client sockets
  will be associated with IOCP, the server will accept the client sent
  message will display it and then send the same message back as an
  acknowledgement.

Author:

* Swarajya Pendharkar

Date:

* 10th March 2006

Updates:

* Implemented IOCP with Overlapped I/O - 24th March 2006
* More updates pertaining to Overlapped I/O - 19th April 2006
* Updates pertaining to IOCP implementation and proper shutdown
  of application - 28th May 2006
* Minor tweaks and comments - 9th June 2006
* Updates to allow clients to send multiple messages and other
  code improvements - 19th August 2006
* Minor updates - 22th August 2006
==========================================================================
*/

#include "stdafx.h"

#include <stdio.h>
#include <stdlib.h>
#include <conio.h>
#include <string.h>
#include <winsock2.h>
#include <vector>

//Op codes for IOCP
#define OP_READ     0
#define OP_WRITE    1

//The norm is to create two worker threads per processor
//Many programs will find out how many processors are there on the host
//and then multiply it by two to decide on the number of worker threads
#define MAX_WORKER_THREADS 2

//Buffer Length
#define MAX_BUFFER_LEN 256

//Graceful shutdown Event
//For this simple implementation,
//We can use global variable as well.
//Wanted to demonstrate use of event
//for shutdown
HANDLE g_hShutdownEvent = NULL;

///Handles of Worker Threads
HANDLE g_hWorkerThreads[MAX_WORKER_THREADS];

//Handle for Accept related thread
HANDLE g_hAcceptThread = NULL;

//Network Event for Accept
WSAEVENT    g_hAcceptEvent;

CRITICAL_SECTION g_csConsole; //When threads write to console we need mutual exclusion
CRITICAL_SECTION g_csClientList; //Need to protect the client list

//Global I/O completion port handle
HANDLE g_hIOCompletionPort = NULL;

class CClientContext  //To store and manage client related information
{
public:
    
     OVERLAPPED        m_ol;
     WSABUF            m_wbuf;
    
     int               m_nTotalBytes;
     int               m_nSentBytes;
    
     SOCKET            m_Socket;  //accepted socket
     char              m_szBuffer[MAX_BUFFER_LEN];
     int               m_nOpCode; //will be used by the worker thread to decide what operation to perform
    
     //Get/Set calls
     void SetOpCode(int n)
     {
          m_nOpCode = n;
     }
    
     int GetOpCode()
     {
          return m_nOpCode;
     }
    
     void SetSocket(SOCKET s)
     {
          m_Socket = s;
     }
    
     SOCKET GetSocket()
     {
          return m_Socket;
     }
    
     void SetBuffer(char *szBuffer)
     {
          strcpy(m_szBuffer, szBuffer);
     }
    
     void GetBuffer(char *szBuffer)
     {
          strcpy(szBuffer, m_szBuffer);
     }
    
     void ZeroBuffer()
     {
          ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
     }
    
     void SetWSABUFLength(int nLength)
     {
          m_wbuf.len = nLength;
     }
    
     int GetWSABUFLength()
     {
          return m_wbuf.len;
     }
    
     void ResetWSABUF()
     {
          ZeroBuffer();
          m_wbuf.buf = m_szBuffer;
          m_wbuf.len = MAX_BUFFER_LEN;
     }
    
     //Constructor
     CClientContext()
     {
          ZeroMemory(&m_ol, sizeof(OVERLAPPED));
         
          m_Socket =  SOCKET_ERROR;
         
          ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
         
          m_wbuf.buf = m_szBuffer;
          m_wbuf.len = MAX_BUFFER_LEN;
         
          m_nOpCode = 0;
          m_nTotalBytes = 0;
          m_nSentBytes = 0;
     }
};

//Vector to store pointers of dynamically allocated ClientContext.
//map class can also be used.
//Link list can also be created.
std::vector<CClientContext *> g_ClientContext;

//global functions
bool InitializeIOCP();
bool Initialize();
void CleanUp();
void DeInitialize();
DWORD WINAPI AcceptThread(LPVOID lParam);
void AcceptConnection(SOCKET ListenSocket);
bool AssociateWithIOCP(CClientContext   *pClientContext);
DWORD WINAPI WorkerThread(LPVOID lpParam);
void WriteToConsole(char *szBuffer);
void AddToClientList(CClientContext   *pClientContext);
void RemoveFromClientListAndFreeMemory(CClientContext   *pClientContext);
void CleanClientList();

int main(int argc, char *argv[])
{
     //Validate the input
     if (argc < 2)
     {
          printf("/nUsage: %s port.", argv[0]);
          return 1;
     }
    
     if (false == Initialize())
     {
          return 1;
     }
    
     SOCKET ListenSocket;
    
     struct sockaddr_in ServerAddress;
    
     //Overlapped I/O follows the model established in Windows and can be performed only on
     //sockets created through the WSASocket function
     ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    
     if (INVALID_SOCKET == ListenSocket)
     {
          printf("/nError occurred while opening socket: %d.", WSAGetLastError());
          goto error;
     }
     else
     {
          printf("/nWSASocket() successful.");
     }
    
     //Cleanup and Init with 0 the ServerAddress
     ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
    
     //Port number will be supplied as a command line argument
     int nPortNo;
     nPortNo = atoi(argv[1]);
    
     //Fill up the address structure
     ServerAddress.sin_family = AF_INET;
     ServerAddress.sin_addr.s_addr = INADDR_ANY; //WinSock will supply address
     ServerAddress.sin_port = htons(nPortNo);    //comes from commandline
    
     //Assign local address and port number
     if (SOCKET_ERROR == bind(ListenSocket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
     {
          closesocket(ListenSocket);
          printf("/nError occurred while binding.");
          goto error;
     }
     else
     {
          printf("/nbind() successful.");
     }
    
     //Make the socket a listening socket
     if (SOCKET_ERROR == listen(ListenSocket,SOMAXCONN))
     {
          closesocket(ListenSocket);
          printf("/nError occurred while listening.");
          goto error;
     }
     else
     {
          printf("/nlisten() successful.");
     }
    
     g_hAcceptEvent = WSACreateEvent();
    
     if (WSA_INVALID_EVENT == g_hAcceptEvent)
     {
          printf("/nError occurred while WSACreateEvent().");
          goto error;
     }
    
     if (SOCKET_ERROR == WSAEventSelect(ListenSocket, g_hAcceptEvent, FD_ACCEPT))
     {
          printf("/nError occurred while WSAEventSelect().");
          WSACloseEvent(g_hAcceptEvent);
          goto error;
     }
    
     printf("/nTo exit this server, hit a key at any time on this console...");
    
     DWORD nThreadID;
     g_hAcceptThread = CreateThread(0, 0, AcceptThread, (void *)ListenSocket, 0, &nThreadID);
    
     //Hang in there till a key is hit
     while(!_kbhit())
     {
          Sleep(0);  //switch to some other thread
     }
    
     WriteToConsole("Server is shutting down...");
    
     //Start cleanup
     CleanUp();
    
     //Close open sockets
     closesocket(ListenSocket);
    
     DeInitialize();
    
     return 0; //success
    
error:
     closesocket(ListenSocket);
     DeInitialize();
     return 1;
}

bool Initialize()
{
     //Initialize the Console Critical Section
     InitializeCriticalSection(&g_csConsole);
    
     //Initialize the Client List Critical Section
     InitializeCriticalSection(&g_csClientList);
    
     //Create shutdown event
     g_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    
     // Initialize Winsock
     WSADATA wsaData;
    
     int nResult;
     nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
    
     if (NO_ERROR != nResult)
     {
          printf("/nError occurred while executing WSAStartup().");
          return false; //error
     }
     else
     {
          printf("/nWSAStartup() successful.");
     }
    
     if (false == InitializeIOCP())
     {
          printf("/nError occurred while initializing IOCP");
          return false;
     }
     else
     {
          printf("/nIOCP initialization successful.");
     }
    
     return true;
}

//Function to Initialize IOCP
bool InitializeIOCP()
{
     //Create I/O completion port
     g_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
    
     if ( NULL == g_hIOCompletionPort)
     {
          printf("/nError occurred while creating IOCP: %d.", WSAGetLastError());
          return false;
     }
    
     DWORD nThreadID;
    
     //Create worker threads
     for (int ii = 0; ii < MAX_WORKER_THREADS; ii++)
     {
          g_hWorkerThreads[ii] = CreateThread(0, 0, WorkerThread, (void *)(ii+1), 0, &nThreadID);
     }
    
     return true;
}

void CleanUp()
{
     //Ask all threads to start shutting down
     SetEvent(g_hShutdownEvent);
    
     //Let Accept thread go down
     WaitForSingleObject(g_hAcceptThread, INFINITE);
    
     for (int i = 0; i < MAX_WORKER_THREADS; i++)
     {
          //Help threads get out of blocking - GetQueuedCompletionStatus()
          PostQueuedCompletionStatus(g_hIOCompletionPort, 0, (DWORD) NULL, NULL);
     }
    
     //Let Worker Threads shutdown
     WaitForMultipleObjects(MAX_WORKER_THREADS, g_hWorkerThreads, TRUE, INFINITE);
    
     //We are done with this event
     WSACloseEvent(g_hAcceptEvent);
    
     //Cleanup dynamic memory allocations, if there are any.
     CleanClientList();
}

void DeInitialize()
{
     //Delete the Console Critical Section.
     DeleteCriticalSection(&g_csConsole);
    
     //Delete the Client List Critical Section.
     DeleteCriticalSection(&g_csClientList);
    
     //Cleanup IOCP.
     CloseHandle(g_hIOCompletionPort);
    
     //Clean up the event.
     CloseHandle(g_hShutdownEvent);
    
     //Cleanup Winsock
     WSACleanup();
}

//This thread will look for accept event
DWORD WINAPI AcceptThread(LPVOID lParam)
{
     SOCKET ListenSocket = (SOCKET)lParam;
    
     WSANETWORKEVENTS WSAEvents;
    
     //Accept thread will be around to look for accept event, until a Shutdown event is not Signaled.
     while(WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0))
     {
          if (WSA_WAIT_TIMEOUT != WSAWaitForMultipleEvents(1, &g_hAcceptEvent, FALSE, 0, FALSE))
          {
               WSAEnumNetworkEvents(ListenSocket, g_hAcceptEvent, &WSAEvents);
               if ((WSAEvents.lNetworkEvents & FD_ACCEPT) && (0 == WSAEvents.iErrorCode[FD_ACCEPT_BIT]))
               {
                    //Process it
                    AcceptConnection(ListenSocket);
               }
          }
     }
    
     return 0;
}

//This function will process the accept event
void AcceptConnection(SOCKET ListenSocket)
{
     sockaddr_in ClientAddress;
     int nClientLength = sizeof(ClientAddress);
     char szConsole[MAX_BUFFER_LEN];
    
     //Accept remote connection attempt from the client
     SOCKET Socket = accept(ListenSocket, (sockaddr*)&ClientAddress, &nClientLength);
    
     if (INVALID_SOCKET == Socket)
     {
          sprintf(szConsole, "Error occurred while accepting socket: %ld.", WSAGetLastError());
          WriteToConsole(szConsole);
     }
    
     //Display Client's IP
     sprintf(szConsole, "Client connected from: %s", inet_ntoa(ClientAddress.sin_addr));
     WriteToConsole(szConsole);
    
     //Create a new ClientContext for this newly accepted client
     CClientContext   *pClientContext  = new CClientContext;
    
     pClientContext->SetOpCode(OP_READ);
     pClientContext->SetSocket(Socket);
    
     //Store this object
     AddToClientList(pClientContext);
    
     if (true == AssociateWithIOCP(pClientContext))
     {
          //Once the data is successfully received, we will print it.
          pClientContext->SetOpCode(OP_WRITE);
         
          //Get data.
          DWORD dwFlags = 0;
          DWORD dwBytes = 0;
         
          //Post initial Recv
          //This is a right place to post a initial Recv
          //Posting a initial Recv in WorkerThread will create scalability issues.
          int nBytesRecv = WSARecv(pClientContext->GetSocket(), &pClientContext->m_wbuf, 1,
               &dwBytes, &dwFlags, &pClientContext->m_ol, NULL);
         
          if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
          {
               printf("/nError in Initial Post.");
          }
     }
}

bool AssociateWithIOCP(CClientContext   *pClientContext)
{
     char szConsole[MAX_BUFFER_LEN];
    
     //Associate the socket with IOCP
     HANDLE hTemp = CreateIoCompletionPort((HANDLE)pClientContext->GetSocket(), g_hIOCompletionPort, (DWORD)pClientContext, 0);
    
     if (NULL == hTemp)
     {
          sprintf(szConsole, "Error occurred while executing CreateIoCompletionPort().");
          WriteToConsole(szConsole);
         
          //Let's not work with this client
          RemoveFromClientListAndFreeMemory(pClientContext);
         
          return false;
     }
    
     return true;
}

//Worker thread will service IOCP requests
DWORD WINAPI WorkerThread(LPVOID lpParam)
{   
     int nThreadNo = (int)lpParam;
    
     void *lpContext = NULL;
     //DWORD            *lpContext = NULL;
     OVERLAPPED       *pOverlapped = NULL;
     CClientContext   *pClientContext = NULL;
     DWORD            dwBytesTransfered = 0;
     char szConsole[MAX_BUFFER_LEN];
     int nBytesRecv = 0;
     int nBytesSent = 0;
     DWORD             dwBytes = 0, dwFlags = 0;
    
     //Worker thread will be around to process requests, until a Shutdown event is not Signaled.
     while (WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0))
     {
          BOOL bReturn = GetQueuedCompletionStatus(
               g_hIOCompletionPort,
               &dwBytesTransfered,
               (LPDWORD)&lpContext,
               &pOverlapped,
               INFINITE);
         
          if (NULL == lpContext)
          {
               //We are shutting down
               break;
          }
         
          if ((FALSE == bReturn) || ((TRUE == bReturn) && (0 == dwBytesTransfered)))
          {
               //Client connection gone, remove it.
               RemoveFromClientListAndFreeMemory(pClientContext);
               continue;
          }
         
          //Get the client context
          //The statement below will also work because OVERLAPPED is embedded in CClientContext
          //pClientContext = (CClientContext *)pOverlapped; 
          pClientContext = (CClientContext *)lpContext;
         
          switch (pClientContext->GetOpCode())
          {
          case OP_READ:
              
               pClientContext->m_nSentBytes += dwBytesTransfered;
              
               //Write operation was finished, see if all the data was sent.
               //Else post another write.
               if(pClientContext->m_nSentBytes < pClientContext->m_nTotalBytes)
               {
                    pClientContext->SetOpCode(OP_READ);
                   
                    pClientContext->m_wbuf.buf += pClientContext->m_nSentBytes;
                    pClientContext->m_wbuf.len = pClientContext->m_nTotalBytes - pClientContext->m_nSentBytes;
                   
                    dwFlags = 0;
                   
                    //Overlapped send
                    nBytesSent = WSASend(pClientContext->GetSocket(), &pClientContext->m_wbuf, 1,
                         &dwBytes, dwFlags, &pClientContext->m_ol, NULL);
                   
                    if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError()))
                    {
                         //Let's not work with this client
                         RemoveFromClientListAndFreeMemory(pClientContext);
                    }
               }
               else
               {
                    //Once the data is successfully received, we will print it.
                    pClientContext->SetOpCode(OP_WRITE);
                    pClientContext->ResetWSABUF();
                   
                    dwFlags = 0;
                   
                    //Get the data.
                    nBytesRecv = WSARecv(pClientContext->GetSocket(), &pClientContext->m_wbuf, 1,
                         &dwBytes, &dwFlags, &pClientContext->m_ol, NULL);
                   
                    if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
                    {
                         sprintf(szConsole, "Thread %d: Error occurred while executing WSARecv().", nThreadNo);
                         WriteToConsole(szConsole);
                        
                         //Let's not work with this client
                         RemoveFromClientListAndFreeMemory(pClientContext);
                    }
               }
              
               break;
              
          case OP_WRITE:
              
               char szBuffer[MAX_BUFFER_LEN];
              
               //Display the message we recevied
               pClientContext->GetBuffer(szBuffer);
              
               sprintf(szConsole, "Thread %d: The following message was received: %s", nThreadNo, szBuffer);
               WriteToConsole(szConsole);
              
               //Send the message back to the client.
               pClientContext->SetOpCode(OP_READ);
              
               pClientContext->m_nTotalBytes = dwBytesTransfered;
               pClientContext->m_nSentBytes  = 0;
               pClientContext->m_wbuf.len  = dwBytesTransfered;
              
               dwFlags = 0;
              
               //Overlapped send
               nBytesSent = WSASend(pClientContext->GetSocket(), &pClientContext->m_wbuf, 1,
                    &dwBytes, dwFlags, &pClientContext->m_ol, NULL);
              
               if ((SOCKET_ERROR == nBytesSent) && (WSA_IO_PENDING != WSAGetLastError()))
               {
                    sprintf(szConsole, "Thread %d: Error occurred while executing WSASend().", nThreadNo);
                    WriteToConsole(szConsole);
                   
                    //Let's not work with this client
                    RemoveFromClientListAndFreeMemory(pClientContext);
               }
              
               break;
              
          default:
               //We should never be reaching here, under normal circumstances.
               break;
          } // switch
     } // while
    
     return 0;
}

//Function to synchronize console output
//Threads need to be synchronized while they write to console.
//WriteConsole() API can be used, it is thread-safe, I think.
//I have created my own function.
void WriteToConsole(char *szBuffer)
{
     EnterCriticalSection(&g_csConsole);
    
     printf("/n%s", szBuffer);
    
     LeaveCriticalSection(&g_csConsole);
}

//Store client related information in a vector
void AddToClientList(CClientContext   *pClientContext)
{
     EnterCriticalSection(&g_csClientList);
    
     //Store these structures in vectors
     g_ClientContext.push_back(pClientContext);
    
     LeaveCriticalSection(&g_csClientList);
}

//This function will allow to remove one single client out of the list
void RemoveFromClientListAndFreeMemory(CClientContext   *pClientContext)
{
     EnterCriticalSection(&g_csClientList);
    
     std::vector <CClientContext *>::iterator IterClientContext;
    
     //Remove the supplied ClientContext from the list and release the memory
     for (IterClientContext = g_ClientContext.begin(); IterClientContext != g_ClientContext.end(); IterClientContext++)
     {
          if (pClientContext == *IterClientContext)
          {
               g_ClientContext.erase(IterClientContext);
               //In case any pending I/O request cancel it
               //If you want to wait for them to complete
               //You can use HasOverlappedIoCompleted()
               //to wait for the operations to complete
               //while (!HasOverlappedIoCompleted((LPOVERLAPPED)&pClientContext->m_ol))
               //{
               //  Sleep(0);
               //}
              
               //ATTN:
               //Not a right place to make a call to CancelIo()
               //Need to use CancelIoEx(), once it is available
               CancelIo((HANDLE)(pClientContext->GetSocket()));
               closesocket(pClientContext->GetSocket());
               delete pClientContext;
               break;
          }
     }
    
     LeaveCriticalSection(&g_csClientList);
}

//Clean up the list, this function will be executed at the time of shutdown
void CleanClientList()
{
     EnterCriticalSection(&g_csClientList);
    
     std::vector <CClientContext *>::iterator IterClientContext;
    
     for (IterClientContext = g_ClientContext.begin(); IterClientContext != g_ClientContext.end( ); IterClientContext++)
     {
          //Cancel the pending I/O operations
         
          //ATTN:
          //Not a right place to make a call to CancelIo()
          //Need to use CancelIoEx(), once it is available
          CancelIo((HANDLE)(*IterClientContext)->GetSocket());
          closesocket((*IterClientContext)->GetSocket());
          delete *IterClientContext;
     }
    
     g_ClientContext.clear();
    
     LeaveCriticalSection(&g_csClientList);
}

抱歉!评论已关闭.