现在的位置: 首页 > 综合 > 正文

[C#]I/O完成端口的类定义和测试实例

2012年06月18日 ⁄ 综合 ⁄ 共 5279字 ⁄ 字号 评论关闭
整理者:郑昀@UltraPower

日期:2005-04-13

从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用

System.Threading.NativeOverlapped:。

附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范:

我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

1     先在主线程中调用CreateIoCompletionPort创建IOCP

CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

此时我们只需传递INVALID_HANDLE_VALUE,NULL0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

2     我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT1秒。

 

当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息:       传输的字节数、完成键和OVERLAPPED结构的地址。

 

在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

 

需要注意的是:

第一,   线程池的数目是有限制的,和CPU数目有关系。

第二,   IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

第三,   最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

 

测试代码:



using System;

using System.Threading;  // Included for the Thread.Sleep call

using Continuum.Threading;

using System.Runtime.InteropServices;


namespace IOCPDemo

{

    
//=============================================================================

    /// <summary> Sample class for the threading class </summary>

    public class UtilThreadingSample

    
{

        
//*****************************************************************************   

        /// <summary> Test Method </summary>

        static void Main()

        
{

            
// Create the MSSQL IOCP Thread Pool

            IOCPThreadPool pThreadPool = new IOCPThreadPool(01020new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));

      

            
//for(int i =1;i<10000;i++)

            {

                pThreadPool.PostEvent(
1234);

            }


      

            Thread.Sleep(
100);

      

            pThreadPool.Dispose();

        }


    

        
//********************************************************************

        /// <summary> Function to be called by the IOCP thread pool.  Called when

        
///           a command is posted for processing by the SocketManager </summary>

        
/// <param name="iValue"> The value provided by the thread posting the event </param>


        static public void IOCPThreadFunction(int iValue)

        
{

            
try

            
{

                Console.WriteLine(
"Value: {0}", iValue.ToString());

                Thread.Sleep(
3000);

            }


      

            
catch (Exception pException)

            
{

                Console.WriteLine(pException.Message);

            }


        }


    }



}


类代码:

using System;

using System.Threading;

using System.Runtime.InteropServices;


namespace IOCPThreading

{

    [StructLayout(LayoutKind.Sequential, CharSet
=CharSet.Auto)]


    
public sealed class IOCPThreadPool

    
{

        [DllImport(
"Kernel32", CharSet=CharSet.Auto)]

        
private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);


        [DllImport(
"Kernel32", CharSet=CharSet.Auto)]

        
private unsafe static extern Boolean CloseHandle(UInt32 hObject);


        [DllImport(
"Kernel32", CharSet=CharSet.Auto)]

        
private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);


        [DllImport(
"Kernel32", CharSet=CharSet.Auto)]

        
private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);


        
private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;

        
private const UInt32 INIFINITE = 0xffffffff;

        
private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;

        
public delegate void USER_FUNCTION(int iValue);

        
private UInt32 m_hHandle;

        
private UInt32 GetHandle get return m_hHandle; } set { m_hHandle = value; } }


        
private Int32 m_uiMaxConcurrency;


        
private Int32 GetMaxConcurrency get return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }



        
private Int32 m_iMinThreadsInPool;


        
private Int32 GetMinThreadsInPool get return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }


        
private Int32 m_iMaxThreadsInPool;


        
private Int32 GetMaxThreadsInPool get return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }



        
private Object m_pCriticalSection;


        
private Object GetCriticalSection get return m_pCriticalSection; } set { m_pCriticalSection = value; } }



        
private USER_FUNCTION m_pfnUserFunction;


        
private USER_FUNCTION GetUserFunction get return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }



        
private Boolean m_bDisposeFlag;


        
/// <summary> SimType: Flag to indicate if the class is disposing </summary>



        
private Boolean IsDisposed get return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }


        
private Int32 m_iCurThreadsInPool;


        

抱歉!评论已关闭.