现在的位置: 首页 > 综合 > 正文

用C++实现的带队列,并且当队列为空及任务数大于最大队列长度时自动wait的线程池 ( by quqi99 )

2014年01月15日 ⁄ 综合 ⁄ 共 4736字 ⁄ 字号 评论关闭

                                                                用C++实现的带队列,并且当队列为空及任务数大于最大队列长度时自动wait的线程池 ( by quqi99 )

作者:张华  发表于:2012-06-14
版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明

( http://blog.csdn.net/quqi99 )

             用JAVA, shell, python都写过很多程序,但不算学校的用C++写程序这个算第一个. 看到java中的线程通过wait, notify可以很方便的实现线程的同步, 于是, 想用c++实现一个, 于是有了下列这个线程池,它的特点是:

            1) 带队列的线程池

            2) 当队列为空, 通过pthread_mutex_lock实现java中的wait, 用pthread_cond_signal实现java中的notify功能

            3) 当任务数大于所设定的队列长度时,为防内存溢出, 同样通过pthread_mutex_lock实现java中的wait, 用pthread_cond_signal实现java中的notify功能实现自动等待

          

代码如下:

1) ThreadPool.h

/*
 * ThreadPool.h
 *
 *  Created on: 14 Jun 2012
 *      Author: Zhang Hua
 */

#ifndef THREADPOOL_H_
#define THREADPOOL_H_

#include <queue>
#include <string>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>
using namespace std;

class Task {
protected:
    int pos;
    string input;

public:
    Task() {
    }
    virtual ~Task() {
    }

    Task(int pos, string input) {
        this->pos = pos;
        this->input = input;
    }

    int getPos() {
        return this->pos;
    }

    string getInput() {
        return this->input;
    }

    virtual void run()=0;
};

class ThreadPool {
private:
    int m_threadNum;
    int m_maxQueueSize;
    static queue<Task*>* taskQueue;  //因为Task里有纯虚函数,所以为Task*
    static pthread_cond_t stop_if_empty, stop_if_full;
    static pthread_mutex_t m_mutex;

    typedef struct threadarg {
        int para1;
    } ARG;

    void clear();

public:
    ThreadPool(int, int);
    ~ThreadPool();
    static void* ThreadFunc(void *arg);

    int addTask(Task*);

    int waitUnitTaskFinish();
};

#endif /* THREADPOOL_H_ */

2) ThreadPool.cpp

/*
 * ThreadPool.cpp
 *
 *  Created on: 14 Jun 2012
 *      Author: zhang hua
 */

#include "ThreadPool.h"

queue<Task*>* ThreadPool::taskQueue = new queue<Task*>();
pthread_mutex_t ThreadPool::m_mutex = PTHREAD_MUTEX_INITIALIZER; //互斥量,用于线程同步
pthread_cond_t ThreadPool::stop_if_empty = PTHREAD_COND_INITIALIZER; //条件量,队列为空的条件量
pthread_cond_t ThreadPool::stop_if_full = PTHREAD_COND_INITIALIZER; //条件量,队列大于最大队列长度时的条件量

ThreadPool::ThreadPool(int threadNum, int maxQueueSize) {
    this->m_threadNum = threadNum;
    this->m_maxQueueSize = maxQueueSize;
    pthread_t tid = 0;
    ARG* threadarg = (ARG*) malloc(sizeof(ARG));
    threadarg->para1 = maxQueueSize;
    for (int i = 0; i < m_threadNum; i++) {
        pthread_create(&tid, NULL, ThreadFunc, (void *) threadarg);
    }
}

void *ThreadPool::ThreadFunc(void *arg) {
    while (true) {
        ARG* threadarg = (ARG *) arg;
        int maxQueueSize = threadarg->para1;

        int pid = pthread_self();
        cout << "[" << pid << "] thread is running ... " << endl;
        pthread_mutex_lock(&m_mutex);
        while (taskQueue->empty()) {
            cout << "[" << pid << "]"
                    << "队列为空,将等待,直到stop_if_empty条件量的再次触发线程才往下执行 ..." << endl;
            pthread_cond_wait(&stop_if_empty, &m_mutex);
        }
        if (!taskQueue->empty() && (int) taskQueue->size() < maxQueueSize) {
            cout << "[" << pid << "]" << "触发stop_if_full条件量..." << endl;
            pthread_cond_signal(&stop_if_full);
        }

        Task* task = taskQueue->front();
        taskQueue->pop();
        pthread_mutex_unlock(&m_mutex);

        task->run();
    }
    return 0;
}

int ThreadPool::addTask(Task* task) {
    int pid = pthread_self();
    pthread_mutex_lock(&m_mutex);
    while ((int) taskQueue->size() >= this->m_maxQueueSize) {
        cout << "[" << pid << "]"
                << "队列已满,将等待,直到stop_if_full条件量的再次触发线程才往下执行 ..." << endl;
        pthread_cond_wait(&stop_if_full, &m_mutex);
    }
    taskQueue->push(task);
    cout << "[" << pid << "]" << "触发stop_if_empty条件量..." << endl;
    pthread_cond_signal(&stop_if_empty);
    cout << "[" << pid << "]" << "add to queue ..." << task->getPos() << endl;
    pthread_mutex_unlock(&m_mutex);
    return 0;
}

int ThreadPool::waitUnitTaskFinish() {
    while (!taskQueue->empty()) {
        sleep(5);
    }
    clear();
    return 0;
}

void ThreadPool::clear() {
    while (!taskQueue->empty()) {
        taskQueue->pop();
    }
    delete taskQueue;
    pthread_cond_destroy(&stop_if_empty);
    pthread_cond_destroy(&stop_if_full);
}

ThreadPool::~ThreadPool() {
    clear();
}

3) PerfermanceTest.cpp 测试程序

//============================================================================
// Name        : PerfermanceTest.cpp
// Author      : zhang hua
// Version     :
// Copyright   :
// Description : 一个带队列的线程池,如果任务负载大于线程池的最大队列长度将自动等待
//============================================================================

#include"ThreadPool.h"
#include<iostream>
#include<sys/time.h>
using namespace std;

class WorkTask: public Task {

public:
    WorkTask(int pos, string input) {
        this->pos = pos;
        this->input = input;
    }
    void run() {
        int pid = pthread_self();
        cout << "[" << pid << "]" << "直正干活的线程任务... " << this->pos << endl;
    }
};

int main() {
    int pid = pthread_self();
    string input = "this is test ";
    int threadName = 10;
    int maxQueueSize = 100;
    ThreadPool threadpool(threadName, maxQueueSize);
    for (int i = 0; i < 10000; i++) {
        cout  << "[" << pid << "]"  << "addTask " << i << endl;
        WorkTask* workTask = new WorkTask(i, input);
        threadpool.addTask(workTask);
    }
    threadpool.waitUnitTaskFinish();
}

抱歉!评论已关闭.