有限容量的BlockingQueue实现工作队列,用于生产者消费者问题。
- #include<iostream>
- #include<string>
- #include<vector>
- #include<deque>
- #include<assert.h>
- #include<pthread.h>
- #include<unistd.h>
- #include<boost/noncopyable.hpp>
- #include<boost/shared_ptr.hpp>
- #include<boost/weak_ptr.hpp>
- #include<boost/function.hpp>
- #include<boost/bind.hpp>
- #include<boost/circular_buffer.hpp>
- using namespace std;
- using namespace boost;
- class Mutex:public noncopyable{//互斥量的封装
- public:
- Mutex(){
- pthread_mutex_init(&mutex,NULL);
- }
- void lock(){
- pthread_mutex_lock(&mutex);
- }
- void unlock(){
- pthread_mutex_unlock(&mutex);
- }
- ~Mutex(){
- pthread_mutex_destroy(&mutex);
- }
- pthread_mutex_t* getMutex(){
- return &mutex;
- }
- private:
- mutable pthread_mutex_t mutex;
- };
- class MutexLockGuard:noncopyable{//RAII管理互斥量
- public:
- explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){
- mutex_.lock();
- }
- ~MutexLockGuard(){
- mutex_.unlock();
- }
- private:
- Mutex& mutex_;//注意是引用,Mutex继承了noncopyable后不能拷贝构造
- };
- class Condition:public noncopyable{//条件变量的封装
- public:
- explicit Condition(Mutex& mutex):mutex_(mutex){
- pthread_cond_init(&cond,NULL);
- }
- ~Condition(){
- pthread_cond_destroy(&cond);
- }
- void wait(){
- pthread_cond_wait(&cond,mutex_.getMutex());
- }
- void notify(){
- pthread_cond_signal(&cond);
- }
- void notifyALL(){
- pthread_cond_broadcast(&cond);
- }
- private:
- Mutex& mutex_;//注意是引用
- pthread_cond_t cond;
- };
- template<typename T>
- class BlockingQueue:noncopyable{
- public:
- explicit BlockingQueue(int x):mutex(),full(mutex),empty(mutex),Q(x){}
- void put(T a){
- MutexLockGuard guard(mutex);
- while(Q.full()){//若队列满,则等待空条件empty
- empty.wait();//等待消费者消费
- }
- assert(!Q.full());
- Q.push_back(a);
- full.notifyALL();//通知消费者
- }
- T take(){
- MutexLockGuard guard(mutex);
- while(Q.empty()){//若队列空,则等待满条件full
- full.wait();//等待生产者生产
- }
- assert(!Q.empty());
- T front(Q.front());
- front();
- Q.pop_front();
- empty.notify();//通知生产者
- return front;
- }
- private:
- Mutex mutex;
- Condition full;//满条件
- Condition empty;//空条件
- circular_buffer<T> Q;//boost的循环队列
- };
- class test{//任务内容
- public:
- explicit test(int x):data(x){}
- void show(){
- cout<<"show "<<data<<endl;
- }
- private:
- int data;
- };
- typedef function<void()> Functor;//任务T
- BlockingQueue<Functor> taskQueue(10);
- bool running=true;//终止线程标志
- void* producer(void* arg){//生产者线程
- int i=0;
- while(running){
- //usleep(100);
- test one(i++);
- Functor task=bind(&test::show,one);//注意bind会拷贝参数。该处不能用&one.
- taskQueue.put(task);
- }
- }
- void* customer(void* arg){//消费者线程
- while(running){
- Functor task=taskQueue.take();
- task();
- }
- }
- int main(){
- pthread_t pid0;
- pthread_t pid[2];
- pthread_create(&pid0,NULL,producer,NULL);
- for(int i=0;i<2;i++){
- pthread_create(&pid[i],NULL,customer,NULL);
- }
- sleep(1);
- running=false;//终止线程
- pthread_join(pid0,NULL);
- return 0;
- }