现在的位置: 首页 > 综合 > 正文

【转】一个简单的线程池(c++版)

2012年12月12日 ⁄ 综合 ⁄ 共 7697字 ⁄ 字号 评论关闭
#ifndef _ThreadPool_H_
#define _ThreadPool_H_
#pragma warning(disable: 4530)
#pragma warning(disable: 4786)
#include 
<cassert>
#include 
<vector>
#include 
<queue>
#include 
<windows.h>
class ThreadJob  //工作基类
{
public:
    
//供线程池调用的虚函数
    virtual void DoJob(void *pPara) = 0;
};
class ThreadPool
{
public:
    
//dwNum 线程池规模
    ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
    {
        InitializeCriticalSection(
&_csThreadVector);
        InitializeCriticalSection(
&_csWorkQueue);
        _EventComplete 
= CreateEvent(0falsefalse, NULL);
        _EventEnd 
= CreateEvent(0truefalse, NULL);
        _SemaphoreCall 
= CreateSemaphore(00,  0x7FFFFFFF, NULL);
        _SemaphoreDel 
=  CreateSemaphore(00,  0x7FFFFFFF, NULL);
        assert(_SemaphoreCall 
!= INVALID_HANDLE_VALUE);
        assert(_EventComplete 
!= INVALID_HANDLE_VALUE);
        assert(_EventEnd 
!= INVALID_HANDLE_VALUE);
        assert(_SemaphoreDel 
!= INVALID_HANDLE_VALUE);
        AdjustSize(dwNum 
<= 0 ? 4 : dwNum);
    }
    
~ThreadPool()
    {
        DeleteCriticalSection(
&_csWorkQueue);
        CloseHandle(_EventEnd);
        CloseHandle(_EventComplete);
        CloseHandle(_SemaphoreCall);
        CloseHandle(_SemaphoreDel);
        vector
<ThreadItem*>::iterator iter;
        
for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
        {
            
if(*iter)
                delete 
*iter;
        }
        DeleteCriticalSection(
&_csThreadVector);
    }
    
//调整线程池规模
    int AdjustSize(int iNum)
    {
        
if(iNum > 0)
        {
            ThreadItem 
*pNew;
            EnterCriticalSection(
&_csThreadVector);
            
for(int _i=0; _i<iNum; _i++)
            {
                _ThreadVector.push_back(pNew 
= new ThreadItem(this));
                assert(pNew);
                pNew
->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
                assert(pNew
->_Handle);
            }
            LeaveCriticalSection(
&_csThreadVector);
        }
        
else
        {
            iNum 
*= -1;
            ReleaseSemaphore(_SemaphoreDel,  iNum 
> _lThreadNum ? _lThreadNum : iNum, NULL);
        }
        
return (int)_lThreadNum;
    }
    
//调用线程池
    void Call(void (*pFunc)(void  *), void *pPara = NULL)
    {
        assert(pFunc);
        EnterCriticalSection(
&_csWorkQueue);
        _JobQueue.push(
new JobItem(pFunc, pPara));
        LeaveCriticalSection(
&_csWorkQueue);
        ReleaseSemaphore(_SemaphoreCall, 
1, NULL);
    }
    
//调用线程池
    inline void Call(ThreadJob * p, void *pPara = NULL)
    {
        Call(CallProc, 
new CallProcPara(p, pPara));
    }
    
//结束线程池, 并同步等待
    bool EndAndWait(DWORD dwWaitTime = INFINITE)
    {
        SetEvent(_EventEnd);
        
return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
    }
    
//结束线程池
    inline void End()
    {
        SetEvent(_EventEnd);
    }
    inline DWORD Size()
    {
        
return (DWORD)_lThreadNum;
    }
    inline DWORD GetRunningSize()
    {
        
return (DWORD)_lRunningNum;
    }
    
bool IsRunning()
    {
        
return _lRunningNum > 0;
    }
protected:
    
//工作线程
    static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
    {
        ThreadItem 
*pThread = static_cast<ThreadItem*>(lpParameter);
        assert(pThread);
        ThreadPool 
*pThreadPoolObj = pThread->_pThis;
        assert(pThreadPoolObj);
        InterlockedIncrement(
&pThreadPoolObj->_lThreadNum);
        HANDLE hWaitHandle[
3];
        hWaitHandle[
0= pThreadPoolObj->_SemaphoreCall;
        hWaitHandle[
1= pThreadPoolObj->_SemaphoreDel;
        hWaitHandle[
2= pThreadPoolObj->_EventEnd;
        JobItem 
*pJob;
        
bool fHasJob;
        
for(;;)
        {
            DWORD wr 
= WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
            
//响应删除线程信号
            if(wr == WAIT_OBJECT_0 + 1
                
break;
            
//从队列里取得用户作业
            EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
            
if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
            {
                pJob 
= pThreadPoolObj->_JobQueue.front();
                pThreadPoolObj
->_JobQueue.pop();
                assert(pJob);
            }
            LeaveCriticalSection(
&pThreadPoolObj->_csWorkQueue);
            
//受到结束线程信号确定是否结束线程(结束线程信号&& 是否还有工作)
            if(wr == WAIT_OBJECT_0 + 2 && !fHasJob) 
                
break;
            
if(fHasJob && pJob)
            {
                InterlockedIncrement(
&pThreadPoolObj->_lRunningNum);
                pThread
->_dwLastBeginTime = GetTickCount();
                pThread
->_dwCount++;
                pThread
->_fIsRunning = true;
                pJob
->_pFunc(pJob->_pPara); //运行用户作业
                delete pJob;
                pThread
->_fIsRunning = false;
                InterlockedDecrement(
&pThreadPoolObj->_lRunningNum);
            }
        }
        
//删除自身结构
        EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
    pThreadPoolObj
->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
        LeaveCriticalSection(
&pThreadPoolObj->_csThreadVector);
        delete pThread;
        InterlockedDecrement(
&pThreadPoolObj->_lThreadNum);
        
if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
            SetEvent(pThreadPoolObj->_EventComplete);
        
return 0;
    }
    
//调用用户对象虚函数
    static void CallProc(void *pPara)
    {
        CallProcPara 
*cp = static_cast<CallProcPara *>(pPara);
        assert(cp);
        
if(cp)
        {
            cp
->_pObj->DoJob(cp->_pPara);
            delete cp;
        }
    }
    
//用户对象结构
    struct CallProcPara 
    {
        ThreadJob
* _pObj;//用户对象
        void *_pPara;//用户参数
        CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
    };
    
//用户函数结构
    struct JobItem
    {
        
void (*_pFunc)(void  *);//函数
        void *_pPara; //参数
        JobItem(void (*pFunc)(void  *= NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
    };
    
//线程池中的线程结构
    struct ThreadItem
    {
        HANDLE _Handle; 
//线程句柄
        ThreadPool *_pThis;  //线程池的指针
        DWORD _dwLastBeginTime; //最后一次运行开始时间
        DWORD _dwCount; //运行次数
        bool _fIsRunning;
        ThreadItem(ThreadPool 
*pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
        
~ThreadItem()
        {
            
if(_Handle)
            {
                CloseHandle(_Handle);
                _Handle 
= NULL;
            }
        }
    };
    std::queue
<JobItem *> _JobQueue;  //工作队列
    std::vector<ThreadItem *>  _ThreadVector; //线程数据
    CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
    HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号,删除线程信号
    long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
};
#endif //_ThreadPool_H_

使用说明1

调用方法使用时注意几点:

void threadfunc(void *p)
{
     YourClass
* yourObject = (YourClass*)    p;
 
//
}
 ThreadPool tp;
 
for(i=0; i<100; i++)
  tp.Call(threadfunc);
ThreadPool tp(
20);//20为初始线程池规模
 tp.Call(threadfunc, lpPara);

 

1. ThreadJob  没什么用,直接写线程函数吧。 

2. 线程函数(threadfunc)的入口参数void* 可以转成自定义的类型对象,这个对象可以记录下线程运行中的数据,并设置线程当前状态,以此与线程进行交互。

3. 线程池有一个EndAndWait函数,用于让线程池中所有计算正常结束。有时线程池中的一个线程可能要运行很长时间,怎么办?可以通过线程函数threadfunc的入口参数对象来处理,比如:在主线程中设置yourClass->cmd = 1,该线程就会自然结束。

class YourClass 
{
  
int cmd; // cmd = 1是上线程停止计算,正常退出。
};
threadfunc(
void* p) {
  YourClass
* yourObject = (YourClass*)p;
  
while (true) {
    
// do some calculation
    if (yourClass->cmd == 1)
      
break;
  }
}

 

使用说明2 


void threadfunc(void *p)
{
 
//
}
 ThreadPool tp;
 
for(i=0; i<100; i++)
  tp.Call(threadfunc);
 ThreadPool tp(
20);//20为初始线程池规模
 tp.Call(threadfunc, lpPara);
 tp.AdjustSize(
50);//增加50
 tp.AdjustSize(-30);//减少30

 

class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展
{
public:
 
virtual void DoJob(void *p)//自定义的虚函数
 {
  
//.
 }
};
 MyThreadJob mt[
10];
 ThreadPool tp;
 
for(i=0; i<100 i++)
  tp.Call(mt 
+ i);//tp.Call(mt + i, para);

抱歉!评论已关闭.