现在的位置: 首页 > 综合 > 正文

C++线程池的实现

2018年02月12日 ⁄ 综合 ⁄ 共 11280字 ⁄ 字号 评论关闭

使用多线程编程可以显著提高程序的运行速度,由于现在的操作系统都是多核的,所以一个多线程的程序,由于系统内核是基于时间片轮询的,所以多线程程序再用系统内核的时间大大增多,所完成的任务就更快。

 

线程池头文件:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//---------------------------------------------------------------------------
#ifndef CworkQueueH
#define CworkQueueH
//---------------------------------------------------------------------------
#include <queue>
#include<vcl.h>
class
CWorkQueue;
/**
用法原理:通过派生类WorkItemBase的dowork方法来实现,线程处理任务
通过create任务创建线程,并且这些线程一直在for循环里等待事件监听
一旦任务栈里有数据了触发线程执行任务。
**/
/*------------------------------------------------------------------------
WorkItemBase
this is the basic WorkItem that the Work Queue Use its interface
This class should be inherited and these virtual abstract functions
implemented.
DoWork()
virtual abstract function is the function that is called when the
work item turn has came to be poped out of the queue and be processed.
Abort ()
This function is called, when the Destroy function is called, for each of the WorkItems
That are left in the queue.
------------------------------------------------------------------------*/
class
WorkItemBase
{
virtual
void   DoWork(void* pThreadContext)    = 0;
virtual
void   Abort () = 0;
friend
CWorkQueue;
};
typedef
std::queue<WorkItemBase*>           WorkItemQueue,*PWorkItemQueue;
/*------------------------------------------------------------------------
CWorkQueue
This is the WorkOueue class also known as thread pool,
the basic idea of this class is creating thread that are waiting on a queue
of work item when the queue is inserted with items the threads wake up and
perform the requered work and go to sleep again.
------------------------------------------------------------------------*/
class 
CWorkQueue
{
public:
virtual
~CWorkQueue(){};
bool
Create(const
unsigned int      
nNumberOfThreads,
void*                    *pThreadDataArray             = NULL);
bool
InsertWorkItem(WorkItemBase* pWorkItem);
void
Destroy(int
iWairSecond);
int
GetThreadTotalNum();
private:
static
unsigned long
__stdcall ThreadFunc( void* pParam );
WorkItemBase* RemoveWorkItem();
int
GetWorekQueueSize();
enum{
ABORT_EVENT_INDEX = 0,
SEMAPHORE_INDEX,
NUMBER_OF_SYNC_OBJ,
};
//申请到的线程
PHANDLE                 
m_phThreads;
unsigned int            
m_nNumberOfThreads;
void*                    m_pThreadDataArray;
HANDLE                  
m_phSincObjectsArray[NUMBER_OF_SYNC_OBJ];
CRITICAL_SECTION         m_CriticalSection;
PWorkItemQueue           m_pWorkItemQueue;
};
#endif

CPP实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
//---------------------------------------------------------------------------
#pragma hdrstop
#include "CworkQueue.h"
//---------------------------------------------------------------------------
#include <assert.h>
typedef
struct _THREAD_CONTEXT
{
CWorkQueue* pWorkQueue;
void*       pThreadData;
} THREAD_CONTEXT,*PTHREAD_CONTEXT;
/*------------------------------------------------------------------------
建立多线程   nNumberOfThreads多线程数目  ThreadData线程函数执行的参数
------------------------------------------------------------------------*/
bool
CWorkQueue::Create(const
unsigned int 
nNumberOfThreads,
void*         *ThreadData     
/*=NULL*/)
{
//创建任务队列,存放后续将要执行的任务
m_pWorkItemQueue =
new WorkItemQueue();
if(NULL == m_pWorkItemQueue )
{
return
false;
}
//m_phSincObjectsArray保存了线程池的信号量和事件
//m_phSincObjectsArray[ABORT_EVENT_INDEX]保存的是事件,当用户设置退出事件时使用
//m_phSincObjectsArray[SEMAPHORE_INDEX]保存信号量,当用户设置执行任务时使用
//创建信号量(多线程同步使用)
/*在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。
当一个线程调用Wait操作时,它要么得到资源然后将信号量减一,要么一直等下去(指放入阻塞队列),
直到信号量大于等于一时。Release(释放)实际上是在信号量上执行加操作*/
m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL);
if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL)
{
delete
m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
return
false;
}
//创建事件为手动置位,一次只能进入一个,False为初始不是运行状态(多线程同步使用)
m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL);
if(m_phSincObjectsArray[ABORT_EVENT_INDEX]  == NULL)
{
delete
m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
return
false;
}
//创建并初始化临界区(多线程互斥访问使用)
InitializeCriticalSection(&m_CriticalSection);
//创建线程数组
m_phThreads =
new HANDLE[nNumberOfThreads];
if(m_phThreads == NULL)
{
delete
m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
return
false;
}
unsigned int
i;
m_nNumberOfThreads = nNumberOfThreads;
DWORD
dwThreadId;
PTHREAD_CONTEXT pThreadsContext;
//创建所有的线程
for(i = 0 ; i < nNumberOfThreads ; i++ )
{
//初始化线程函数运行时传入的参数
pThreadsContext =
new THREAD_CONTEXT;
pThreadsContext->pWorkQueue  =
this;
pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i];
//创建线程
m_phThreads[i] = CreateThread(NULL,
0,
CWorkQueue::ThreadFunc,
pThreadsContext,
0,
&dwThreadId);
if(m_phThreads[i] == NULL)
{
delete
pThreadsContext;
m_nNumberOfThreads = i;
Destroy(5);
return
false;
}
}
return
true;
}
/*------------------------------------------------------------------------
向任务队列添加任务
任务执行类通过继承基类WorkItemBase之后使用多态函数DoWork来完成真实任务
------------------------------------------------------------------------*/
bool
CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
{
assert(pWorkItem != NULL);
//多线程互斥访问,进入临界区
EnterCriticalSection(&m_CriticalSection);
//将任务插入队列
m_pWorkItemQueue->push(pWorkItem);
//离开临界区
LeaveCriticalSection(&m_CriticalSection);
//释放信号量,使信号量加1,促使后面的Wailt操作执行
if
(!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL))
{
assert(false);
return
false;
}
return
true;
}
/*------------------------------------------------------------------------
从工作队列中移除对象,并返回移除的对象
------------------------------------------------------------------------*/
WorkItemBase*  CWorkQueue::RemoveWorkItem()
{
WorkItemBase* pWorkItem;
//多线程间访问互斥,进入临界区
EnterCriticalSection(&m_CriticalSection);
//移除对象
pWorkItem = m_pWorkItemQueue->front();
m_pWorkItemQueue->pop();
//离开临界区,其他等待线程可以进入临界区
LeaveCriticalSection(&m_CriticalSection);
assert(pWorkItem != NULL);
return
pWorkItem;
}
/*------------------------------------------------------------------------
线程执行的函数,实际执行的是任务队列中的DoWork()
------------------------------------------------------------------------*/
unsigned
long __stdcall CWorkQueue::ThreadFunc(
void*  pParam )
{
//创建线程时传入的参数
PTHREAD_CONTEXT       pThreadContext =  (PTHREAD_CONTEXT)pParam;
WorkItemBase*         pWorkItem      = NULL;
CWorkQueue*           pWorkQueue     = pThreadContext->pWorkQueue;
void*                 pThreadData    = pThreadContext->pThreadData;
DWORD
dwWaitResult;
for(;;)
{
//WaitForMultipleObjects等待pWorkQueue->m_phSincObjectsArray信号量数组两件事
//一个是执行任务的释放信号量,一个是异常的释放信号量
//当WaitForMultipleObjects等到多个内核对象的时候,如果它的bWaitAll 参数设置为false。
//其返回值减去WAIT_OBJECT_0 就是参数lpHandles数组的序号。如果同时有多个内核对象被触发,
//这个函数返回的只是其中序号最小的那个。如果为TRUE 则等待所有信号量有效在往下执行。
//(FALSE 当有其中一个信号量有效时就向下执行)
//本文WaitForMultipleObjects等待执行任务的信号量和退出销毁任务信息的事件
//当有新任务添加到任务队列,设置执行任务信号量,触发任务执行
//当设置退出事件时,销毁任务信息,所有线程因为没有设置事件复位信息,因此会全部销毁
dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);
//WaitForMultipleObjects返回数组pWorkQueue->m_phSincObjectsArray编号
switch(dwWaitResult - WAIT_OBJECT_0)
{
//返回异常编号
case
ABORT_EVENT_INDEX:
delete
pThreadContext;
return
0;
//返回执行任务编号
case
SEMAPHORE_INDEX:
//从任务队列取一个任务执行
pWorkItem = pWorkQueue->RemoveWorkItem();
if(pWorkItem == NULL)
{
assert(false);
break;
}
//执行真正的任务
pWorkItem->DoWork(pThreadData);
break;
default:
assert(false);
delete
pThreadContext;
return
0;
}
}
//删除线程参数
delete
pThreadContext;
return
1;
}
/**
获取线程总数
**/
int
CWorkQueue::GetThreadTotalNum()
{
return
m_nNumberOfThreads;
}
/**
获取任务池的大小
**/
int
CWorkQueue::GetWorekQueueSize()
{
//多线程间访问互斥,进入临界区
EnterCriticalSection(&m_CriticalSection);
int
iWorkQueueSize = m_pWorkItemQueue->size();
//离开临界区
LeaveCriticalSection(&m_CriticalSection);
return
iWorkQueueSize;
}
/*------------------------------------------------------------------------
Destroy
销毁线程池
------------------------------------------------------------------------*/
void
CWorkQueue::Destroy(int
iWairSecond)
{
//为防止子线程任务没有执行完,主线程就销毁线程池,就加入一个等待函数
while(0 != GetWorekQueueSize())
{
//主线程等待线程池完成所有任务
Sleep(iWairSecond*1000);
}
//设置退出事件,所有线程因为没有设置事件复位信息,因此会全部销毁
if(!SetEvent(m_phSincObjectsArray[ABORT_EVENT_INDEX]))
{
assert(false);
return;
}
//wait for all the threads to end
WaitForMultipleObjects(m_nNumberOfThreads,m_phThreads,true,INFINITE);
//clean queue
while(!m_pWorkItemQueue->empty())
{
m_pWorkItemQueue->front()->Abort();
m_pWorkItemQueue->pop();
}
delete
m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
//close all threads handles
for(int
i = 0 ; i < m_nNumberOfThreads ; i++)
CloseHandle(m_phThreads[i]);
delete[] m_phThreads;
}
#pragma package(smart_init)

 

 

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//---------------------------------------------------------------------------
#ifndef CworkItemH
#define CworkItemH
#include "CworkQueue.h"
#include "OprateEXCEL.h"
//---------------------------------------------------------------------------
class
CworkItem :public
WorkItemBase
{
public:
void  
DoWork(void* pThreadContext);
void  
Abort ();
void 
SetWriteContent(const
vTExcelSheetDate &tvTExcelSheetDate);
void 
SetExcelPath(const
String &ExcelPath);
private:
//要写入Excel的内容
vTExcelSheetDate m_vTExcelSheetDate;
//Excel文件的路径
String m_ExcelPath;
};
#endif
//---------------------------------------------------------------------------
#pragma hdrstop
#include "CworkItem.h"
//---------------------------------------------------------------------------
/**************************************
****************************************
函数功能:    保存Excel写入内容
****************************************
作者:时间:2013.6.30
****************************************
****************************************/
void 
CworkItem::SetWriteContent(const
vTExcelSheetDate &tvTExcelSheetDate)
{
m_vTExcelSheetDate.clear();
vTExcelSheetDate().swap(m_vTExcelSheetDate);
m_vTExcelSheetDate =  tvTExcelSheetDate;
}
/**************************************
****************************************
函数功能:    保存Excel保存路径
****************************************
作者:时间:2013.6.30
****************************************
****************************************/
void 
CworkItem::SetExcelPath(const
String &ExcelPath)
{
m_ExcelPath =  ExcelPath;
}
/**************************************
****************************************
函数功能:    实现基类的工作方法
****************************************
作者:    时间:2013.6.30
****************************************
****************************************/
void 
CworkItem::DoWork(void* pThreadContext)
{
OperateExcel taOperateExcel;
String sError;
taOperateExcel.WriteDateToExcel(m_ExcelPath,m_vTExcelSheetDate,sError);
//工作完成后,自我了断
delete
this;
}
void
CworkItem::Abort ()
{
delete
this;
}

 

 

 

1
2
3
4
5
6
7
8
9
10
CworkItem *pCworkItem =
new CworkItem();
pCworkItem->SetWriteContent(tvTExcelSheetDate);
pCworkItem->SetExcelPath(sFilePath);
m_CWorkQueue.InsertWorkItem(pCworkItem);
pCworkItem =
new CworkItem();
pCworkItem->SetWriteContent(tvTExcelSheetDate);
sFilePath =
"F:\\Project\\多线程写excel\\song1.xls";
pCworkItem->SetExcelPath(sFilePath);
m_CWorkQueue.InsertWorkItem(pCworkItem);
m_CWorkQueue.Destroy(5);

抱歉!评论已关闭.