现在的位置: 首页 > 综合 > 正文

利用ACE 自己实现的线程池

2019年01月10日 ⁄ 综合 ⁄ 共 10173字 ⁄ 字号 评论关闭

1: 线程池组件的配置文件:

 

[log]
;DEBUG = 0
;INFO  = 1
;WARN  = 2
;FAULT = 3
level=0

;SCREENOUT = 0
;FILEOUT   = 1
;BOTH      = 2
director = 2

;TEST = 0
;RUN  = 1
mode = 0

;ONE  = 0x00
;FOUR = 0x01
split = 0

;AUTODEL   = 0x00
;MANUALDEL = 0x01
manager=0

[threadpool]
minthread = 10
maxthread = 25
maxjobsize = 100000
keepalive = 1

2:线程池的代码:

#pragma once

#include "ace/Task.h"
#include "ace/Synch.h"

class CManager;

class CWorker : public ACE_Task<ACE_MT_SYNCH>
{
public:	
	CWorker(void);
	CWorker(CManager* pmanager);
	~CWorker(void);
public:
	virtual int open();
	virtual int svc();
	virtual int close();	
	void output();	
	void setidletimepoint();
	bool gettimervalid();
	int addtimer();
	int removetimer();
	void increaseinvokenumber();
	int getinvokenumber();
	int handle_timeout(const ACE_Time_Value &tv, const void *arg);
private:
	ACE_thread_t  threads[0x01];
	CManager* m_pmanager;
public:
	int m_idletimepoint;
	int getidletimelength();
	int getthreadid();
	int m_timerid;
	int m_invokenumber;
	static int m_count;
	bool timervalid;
};




#include "Manager.h"
#include "Worker.h"
#include "Job.h"
#include "ace/Reactor.h"
#include "Logger.h"
#include "ace/OS_NS_time.h"
#include "ace/OS_NS_unistd.h"

int CWorker::m_count = 0;  

CWorker::CWorker(void)
{
	m_count++;
}

CWorker::CWorker(CManager* pmanager)
{	
	CLogger::createinstance()->logdebugmsg("(+%d)worker address=%08x\n", m_count, this);
	m_count++;
	m_pmanager = pmanager;
	m_timerid = -1;
	m_invokenumber = 0;	
	timervalid = false;

}

CWorker::~CWorker(void)
{
	m_count--;
	CLogger::createinstance()->logdebugmsg("(-%d)worker address=%08x\n", m_count, this);
}

int CWorker::open()
{
	 return activate(THR_NEW_LWP|THR_CANCEL_ENABLE|THR_JOINABLE, 1, 0, ACE_DEFAULT_THREAD_PRIORITY, -1, this, 0, 0, 0, threads);
}

int CWorker::svc()
{
	if (ACE_Thread::self() == threads[0])
	{
		ACE_Thread_Manager *mgr = this->thr_mgr();
		while(true)
		{
			ACE_Message_Block *mb;
			int result = this->getq(mb);
			if (result == -1)
			{
				break;
			}
			else
			{
				CJob *pjob = (CJob*)mb->base();
				pjob->run();
				mb->release();
				m_pmanager->put(this);
			}
		}
	}
	return 0;
}

int CWorker::close()
{
	return 0;
}

void CWorker::setidletimepoint()
{
	m_idletimepoint = time(NULL);
}

int CWorker::getidletimelength()
{
	return (int)(time(NULL) - m_idletimepoint);
}

int CWorker::getthreadid()
{
	return (int)threads[0];
}

void CWorker::output()
{
	CLogger::createinstance()->logfaultmsg("id=%05d idletime=%d invoke=%d\n", getthreadid(), getidletimelength(), getinvokenumber());
}

int CWorker::addtimer()
{
	ACE_Time_Value tv(m_pmanager->getkeepalivetime(), 0);
	m_timerid = ACE_Reactor::instance()->schedule_timer(this, NULL, tv);
	return m_timerid;
}

int CWorker::removetimer()
{
	return ACE_Reactor::instance()->cancel_timer(m_timerid);
}


bool CWorker::gettimervalid()
{
	return timervalid;
}

int CWorker::handle_timeout(const ACE_Time_Value &tv, const void *arg)
{
	if (m_pmanager->getthreadnumber() > m_pmanager->getcorepoolsize())
	{	
		timervalid = true;
		m_pmanager->reducethreadnumber();	
		this->msg_queue()->deactivate();
		wait();
		m_pmanager->recyclebin(this);
		delete this;
	}
	return 0;
}

void  CWorker::increaseinvokenumber()
{
	m_invokenumber++;
}

int  CWorker::getinvokenumber()
{
	return m_invokenumber;
}
	

 

#pragma once

#include "list"
#include "ace/Synch.h"

using namespace std;

class CWorker;
class CManager;

class CWorkerlist
{
	typedef list<CWorker*> IDLEWORKERLIST;
	typedef list<CWorker*> BUSYWORKERLIST;
public:
	CWorkerlist(void);
	~CWorkerlist(void);
private:
	IDLEWORKERLIST m_idleworkerlist;
	BUSYWORKERLIST m_busyworkerlist;
private:
	ACE_Thread_Mutex m_mutex;
	ACE_Condition<ACE_Thread_Mutex> *m_pcondition;
public:
	CWorker* get();
	void put(CWorker* pworker);
	int recyclebin(CWorker* pworker);
	void getsize(int* idle, int* busy);
	void output();
};


#include "Workerlist.h"
#include "Worker.h"
#include "Manager.h"
#include "logger.h"


CWorkerlist::CWorkerlist(void)
{
	m_pcondition = new ACE_Condition<ACE_Thread_Mutex>(m_mutex);
}


CWorkerlist::~CWorkerlist(void)
{
	delete m_pcondition;
}


CWorker* CWorkerlist::get()
{
	m_mutex.acquire();
	while(m_idleworkerlist.size() == 0)
	{
		m_pcondition->signal();
		m_pcondition->wait();
	}
	CWorker* pworker = NULL;
	pworker = m_idleworkerlist.front();
	//删除定时器
	pworker->removetimer();
	pworker->increaseinvokenumber();

	m_idleworkerlist.pop_front();
	m_busyworkerlist.push_back(pworker);
	m_mutex.release();
	return pworker;
}


int CWorkerlist::recyclebin(CWorker* pworker)
{

	#define NOT_FOUND          0x00
	#define IN_IDLE_QUEUE      0x01
	#define IN_BUSY_QUEUE      0x02
		
	int result = NOT_FOUND;

	m_mutex.acquire();

	typedef list<CWorker*>::iterator ITERATOR;
	ITERATOR LI;

	for(LI = m_idleworkerlist.begin(); LI != m_idleworkerlist.end(); LI++)
	{
		if ((*LI) == pworker)
		{
			result = IN_IDLE_QUEUE;	
			m_idleworkerlist.erase(LI);
			m_mutex.release();
			return result;
		}
	}


	for(LI = m_busyworkerlist.begin(); LI != m_busyworkerlist.end(); LI++)
	{
		if ((*LI) == pworker)
		{
			result = IN_BUSY_QUEUE;	
			m_idleworkerlist.erase(LI);
			m_mutex.release();
			return result;
		}
	}

	m_mutex.release();
	return result;
}



void CWorkerlist::put(CWorker* pworker)
{
	m_mutex.acquire();

	typedef list<CWorker*>::iterator ITERATOR;
	ITERATOR LI;
	
	for(LI = m_busyworkerlist.begin(); LI != m_busyworkerlist.end(); LI++)
	{
		if (*LI == pworker)
		{
			(*LI)->removetimer();
			//空闲定时器已经生效,开始走删除该工作线程的流程了,所以保持在忙的队列中。
			if(pworker->gettimervalid() != true)
			{
				m_busyworkerlist.erase(LI);
			}
			break;
		}
	}
	
	if(pworker->gettimervalid() != true)
	{
		//启动定时器
		pworker->setidletimepoint();
		pworker->addtimer();

		m_idleworkerlist.push_back(pworker);
		m_pcondition->signal();
	}
	m_mutex.release();
}


void CWorkerlist::getsize(int *idle, int *busy)
{
	m_mutex.acquire();
	*idle = m_idleworkerlist.size();
	*busy = m_busyworkerlist.size();
	m_mutex.release();
}


void CWorkerlist::output()
{
	m_mutex.acquire();

	typedef list<CWorker*>::iterator ITERATOR;
	ITERATOR LI;


	CLogger::createinstance()->logfaultmsg("idle thread information as follows\n");
	int idle = 0;
	for(LI = m_idleworkerlist.begin(); LI != m_idleworkerlist.end(); LI++)
	{
		idle++;
		(*LI)->output();
	}

	
	CLogger::createinstance()->logfaultmsg("busy thread information as follows\n");
	int busy = 0;
	for(LI = m_busyworkerlist.begin(); LI != m_busyworkerlist.end(); LI++)
	{
		busy++;
		(*LI)->output();
	}
	
	CLogger::createinstance()->logfaultmsg("idle=%d busy=%d\n", idle, busy);

	m_mutex.release();
}

 

#pragma once

#include "ace/Task.h"
#include "ace/Synch.h"
#include "Job.h"
#include "Workerlist.h"

class CWorker;

class CManager : public ACE_Task<ACE_MT_SYNCH>
{
public:
	CManager(void);
	CManager(int corepoolsize, int maxpoolsize, int maxjobsize, int keepalivetime);
	~CManager(void);
public:
	virtual int open();
	virtual int svc();
	virtual int close();
private:
	ACE_thread_t  threads[0x02];
	CWorkerlist m_workerqueue;
	ACE_Thread_Mutex m_mutex;
	int m_corepoolsize;
	int m_maxpoolsize;
	int m_maxjobsize;
	int m_keepalivetime;
	int m_threadnumber;
	bool m_done;
public:
	int getcorepoolsize();
	int getmaxpoolsize();
	int getmaxjobsize();
	void setcorepoolsize(int corepoolsize);
	void setmaxpoolsize(int maxpoolsize);
	void setkeepalivetime(int keepalivetime);
	void setmaxjobsize(int maxjobsize);
	void setthreadcount(int threadcount);
	int  getjobqueuesize();
	void outputjobqueue();
	void outputworkerqueue();
	void addthreadnumber();
	void reducethreadnumber();
	int getthreadnumber();
public:
	int  submitnormaljob(const CJob* pjob, int size);	
	int  put(CWorker* pworker);
	int  recyclebin(CWorker* pworker);
	int  getkeepalivetime();
	int  stop();
};


#include "Manager.h"
#include "Worker.h"
#include "Logger.h"


CManager::CManager(void)
{
	setcorepoolsize(5);
	setmaxpoolsize(25);
	setmaxjobsize(50000);
	setkeepalivetime(10); 
	m_done = true;
	m_threadnumber = 0;
}

CManager::CManager(int corepoolsize = 5, int maxpoolsize = 25, int maxjobsize = 50000, int keepalivetime = 10)
{
	setcorepoolsize(corepoolsize);
	setmaxpoolsize(maxpoolsize);
	setmaxjobsize(maxjobsize);
	setkeepalivetime(keepalivetime);
	m_done = true;
	m_threadnumber = 0;
}


CManager::~CManager(void)
{
}

int CManager::open()
{
	 return activate(THR_NEW_LWP|THR_CANCEL_ENABLE|THR_JOINABLE, 2, 0, ACE_DEFAULT_THREAD_PRIORITY, -1, this, 0, 0, 0, threads);
}

int CManager::svc()
{
	if (ACE_Thread::self() == threads[0])
	{
		CLogger::createinstance()->loginfomsg("starts the thread of processing job and threadid=%d\n", threads[0]);
		while(true)
		{	
			ACE_Message_Block *mb_job;	
			getq(mb_job);
			if (mb_job->msg_type() == ACE_Message_Block::MB_DATA)
			{
				CWorker* pworker = m_workerqueue.get();
				pworker->putq(mb_job);
			}
		}
	}
	else if(ACE_Thread::self() == threads[1])
	{
		CLogger::createinstance()->loginfomsg("starts the thread of processing command from keyboard and threadid=%d\n", threads[2]);
		while(true)
		{
			int command = 0;
			cin>>command;
			if (command == 0)
			{
				printf("0: help\n");
				printf("1: thread info\n");
				printf("2: job infomation\n");
				printf("3: stop\n");
			}
			else if(command == 1)
			{
				printf("total=%d\n", getthreadnumber());
				printf("corepoolsize=%d\n", getcorepoolsize());
				printf("maxpoolsize=%d\n", getmaxpoolsize());
				printf("keepalivetime=%d\n", getkeepalivetime());
				m_workerqueue.output();
			}
			else if(command == 2)
			{
				printf("maximum=%d\n", getmaxjobsize());
				printf("currentsize=%d\n", getjobqueuesize());
			}
			else if(command == 3)
			{
				stop();
				ACE_OS::sleep(10);
				CLogger::createinstance()->loginfomsg("the thread of processing command from keyboard and threadid=%d\n", threads[2]);
				break;
			}
		}
	}
	return 0;
}

int CManager::close()
{
	return 0;
}

int CManager::getcorepoolsize()
{
	return m_corepoolsize;
}

int CManager::getmaxpoolsize()
{
	return m_maxpoolsize;
}

int CManager::getkeepalivetime()
{
	return m_keepalivetime;
}

int CManager::getmaxjobsize()
{ 
	return m_maxjobsize;
}

void CManager::setcorepoolsize(int corepoolsize)
{
	m_corepoolsize = corepoolsize;
}

void CManager::setmaxpoolsize(int maxpoolsize)
{
	m_maxpoolsize = maxpoolsize;
}

void CManager::setkeepalivetime(int keepalivetime)
{
	m_keepalivetime = keepalivetime;
}

void CManager::setmaxjobsize(int maxjobsize)
{
	m_maxjobsize = maxjobsize;
}


int CManager::getjobqueuesize()
{
	return msg_queue_->message_count();
}


int CManager::submitnormaljob(const CJob* pjob, int size)
{	
	int result = -1;
	if (!m_done) 
	{
		CLogger::createinstance()->logfaultmsg("discard the job because of the threadpool has stopped to work\n");
		return result;
	}
	if (getjobqueuesize() >= getmaxjobsize())
	{
		CLogger::createinstance()->logfaultmsg("discard the job because of the jobqueue is full\n");
		return result;
	}

	ACE_Message_Block *mbjob = new ACE_Message_Block(size, ACE_Message_Block::MB_DATA);
	mbjob->copy((char*)pjob, size);
	putq(mbjob);

	int idle = 0;
	int busy = 0;
	m_workerqueue.getsize(&idle, &busy);

	if (idle == 0 && getthreadnumber() < getmaxpoolsize())
	{
		CWorker* pworker = new CWorker(this);
		addthreadnumber();
		pworker->open();
		put(pworker);
	}
	result = 0;
	return result;
}

int CManager::put(CWorker* pworker)
{
	m_workerqueue.put(pworker);
	return 0;
}

int  CManager::recyclebin(CWorker* pworker)
{
	return m_workerqueue.recyclebin(pworker);
}

void CManager::outputjobqueue()
{
	ACE_Message_Block *mb;
	msg_queue_->peek_dequeue_head(mb);
	do
	{
		CJob* pjob = (CJob*)mb->base();
	}
	while(mb = mb->next());
}

void CManager::outputworkerqueue()
{
	m_workerqueue.output();
}

int CManager::stop()
{
	int result = -1;
	m_done = false;
	result = 1;
	return result;
}

void CManager::addthreadnumber()
{
	m_mutex.acquire();
	m_threadnumber ++;
	m_mutex.release();
}

void CManager::reducethreadnumber()
{
	m_mutex.acquire();
	m_threadnumber --;
	m_mutex.release();
}

int CManager::getthreadnumber()
{
	return m_threadnumber;
}

 

抱歉!评论已关闭.