现在的位置: 首页 > 综合 > 正文

高性能 IO Completion Port 简明代码示例

2013年11月02日 ⁄ 综合 ⁄ 共 4102字 ⁄ 字号 评论关闭

IO Completion Port, Windows上提供的最有效实现高性能server的方式(无论是file server, web server还是别的任何类似大量并发io请求的server),IIS本身就是基于此的。可惜,到目前为止没有一个真正简单的示例。今日便让我打响这第一炮吧。

 

没有一个简明例程的根源,可以说是因为IoCompletionPort本身的API设计非常糟糕,一个CreateIoCompletionPort包含了太多功能,名称又很confusing,让人云里雾里。所有的的例程,为了便于理解,都把这些让人迷惑的API封装,构造自己的class,但是呢,这样虽然从软件设计角度来说清晰了,但是对于了解IoCompletionPort的使用来说,反而更迷惑了(因为调用被分散到各个class中)。

 

本文的目的是用最简明的例子来介绍如何使用IO Completion Port。

 

在此之前,先要说IO Completion Port到底是什么东西-----就是threads pool,一个由Windows自动管理的threads pool. 好,你就需要了解这么多,再说多了就违背了本文的宗旨---提供简明例程。

 

1. IO Completion Port的程序,大致上可以划分为以下步骤:

 

2. CreateIOCompletionPort (可以理解为初始化threads pool)

 

3. 建立threads (就是一般的CreateThread或者_beginthreadex,将第一步所得到的HANDLE作为参数传进去,这个跟一般的thread没任何差别)

 

4. 开始IO 操作,比如建立SOCKET, bind...

 

5. 在第一个Async IO之前,将上一步建立的HANDLE(比如socket)绑定到第一步得到的IO Completion Port的HANDLE 上

 

6. 根据具体情况操作IO

 

 

好吧,还是用代码来看比较直接:

 

先来看主程序:

  1. int _tmain(int argc, _TCHAR* argv[])  
  2. {  
  3.   
  4.         // argv[1]为ip, argv[2]为port  
  5.         // CTcpServer只是一个对socket的简单封装,代码  
  6.          // 后面给出  
  7.     CTcpServer server(argv[1], argv[2]);  
  8.     if (!server.StartListening())  
  9.     {  
  10.         printf("failed listening/n");  
  11.         return 1;  
  12.     }  
  13.   
  14.     // initialize IoCompletionPort  
  15.     SOCKET& listeningSocket = server.Socket();  
  16.     //1. 初始化IO Completion Port  
  17.     HANDLE hIocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);  
  18.       
  19.     if(hIocp == NULL)  
  20.     {  
  21.         printf("cannot create io completion port/n");  
  22.         return 1;  
  23.     }  
  24.   
  25.         //2. 绑定socket到completion port  
  26.         //这意思是说,后面所有在这个socket上的async   
  27.         // io 操作,都会notify这个completion port的  
  28.          //threads  
  29.     if (0 == ::CreateIoCompletionPort((HANDLE)listeningSocket, hIocp, (ULONG_PTR)0, 0))  
  30.     {  
  31.         printf("cannot associate listening socket to io completion port/n");  
  32.         return 1;  
  33.     }  
  34.   
  35.         //3. 创建threads,将io completion port  
  36.         // HANDLE 作为参数传入,这样每个thread  
  37.         // 都能在有IO请求时query其status,参见  
  38.          // thread的具体代码  
  39.     int threadPoolSize = 4;  
  40.   
  41.     HANDLE hWorker;  
  42.   
  43.     for(int i=0;i<threadPoolSize; ++i)  
  44.     {  
  45.         hWorker = CreateThread(NULL, 0, WorkerThreadFunc, (LPVOID)hIocp, 0, NULL);  
  46.         CloseHandle(hWorker);  
  47.     }  
  48.   
  49.         //4. 等待新连接,因为我们不要busy loop, 所以  
  50.          // 每个TCP连接需要等待并检查  
  51.          // FD_ACCEPT event,然后用AcceptEx  
  52.     while(true)  
  53.     {  
  54.             printf("waiting for connection/n");  
  55.             if (server.WaitForAcceptEvent(10000))  
  56.             {  
  57.         // 只管Accept了,至于Recv/Send由上面建立的  
  58.     // thread来负责,后面会说thread的功能  
  59.               server.AcceptNewConnection();  
  60.             }  
  61.     }  
  62.   
  63.          return 1;  
  64. }  

 

 

 

 

 

然后先来看thread的实现. IO Completion Port的 thread因为是放入一个thread pool中,所以每个thread是“通用”的,换句话说,每个thread要能够完成多种功能,用伪代码来说是这样:

 

Wait For IO Notification;  ---> 等待比如Socket上的一个event,至于是什么event先不管。无妨想象成interrupt,也比较类似WaitForSingleObject,总之thread在这时候是sleep的

 

Check IO status Operation;  ----> 检查IO状态,更关键是看到底是什么Event

 

switch (event.status)

{

  case ACCEPT: ...

  case READ: ...

  case WRITE: ...

  case WHATEVER: ...
}

 

所以要清楚,io completion port的thread并不是去给每个read或者write建一个thread(也不是不可以,不过就是画蛇添足多此一举),而是依靠自定义的Overlapped结构来判断到底对IO进行什么操作。还是看下面的源代码吧。

 

  1. DWORD WINAPI WorkerThreadFunc(LPVOID lpParam)  
  2. {      
  3.     ULONG_PTR       *PerHandleKey;  
  4.     WSAOVERLAPPED      *pOverlap;  
  5.   
  6.     OVERLAPPEDPLUS  *pOverlapPlus,  
  7.                     *newolp;  
  8.     DWORD           dwBytesXfered;  
  9.   
  10.     int ret;  
  11.   
  12.     HANDLE hIocp = (HANDLE)lpParam;  
  13.   
  14.     while (true)  
  15.     {  
  16.         //这里查询IO 状态  
  17.         ret = GetQueuedCompletionStatus(  
  18.             hIocp,  
  19.             &dwBytesXfered,  
  20.             (PULONG_PTR)&PerHandleKey,  
  21.             &pOverlap,  
  22.             INFINITE);  
  23.         if (ret == 0)  
  24.         {  
  25.             // Operation failed  
  26.             printf("cannot get queued completion status/n");  
  27.             continue;  
  28.         }  
  29.   
  30. //OVERLAPPEDPLUS是我们自己定义的data structure,一般把正常的  
  31. //OVERLAPPED作为第一个field,所以指针是指向同样地址,后面会给出具  
  32. //体定义. CONTAINING_RECORD是一个标准win32 macro  
  33.   
  34.         pOverlapPlus = CONTAINING_RECORD(pOverlap, OVERLAPPEDPLUS, overlapped);  
  35.       
  36. // OP_ACCEPT也是我们自己定义的 value,只是一个标识:  
  37. // #define OP_ACCEPT 1  
  38.   
  39.     switch (pOverlapPlus->OpCode)  
  40.     {  
  41.     case OP_ACCEPT:  

抱歉!评论已关闭.