现在的位置: 首页 > 综合 > 正文

有限容量BlockingQueue:消费者生产者 有限容量BlockingQueue:消费者生产者

2017年11月03日 ⁄ 综合 ⁄ 共 3730字 ⁄ 字号 评论关闭
 

有限容量BlockingQueue:消费者生产者

分类: Linux多线程编程 213人阅读 评论(0) 收藏 举报

    有限容量的BlockingQueue实现工作队列,用于生产者消费者问题。

  1. #include<iostream>  
  2. #include<string>  
  3. #include<vector>  
  4. #include<deque>  
  5. #include<assert.h>  
  6. #include<pthread.h>  
  7. #include<unistd.h>  
  8. #include<boost/noncopyable.hpp>  
  9. #include<boost/shared_ptr.hpp>  
  10. #include<boost/weak_ptr.hpp>  
  11. #include<boost/function.hpp>  
  12. #include<boost/bind.hpp>  
  13. #include<boost/circular_buffer.hpp>  
  14. using namespace std;  
  15. using namespace boost;  
  16. class Mutex:public noncopyable{//互斥量的封装  
  17.     public:  
  18.         Mutex(){  
  19.             pthread_mutex_init(&mutex,NULL);  
  20.         }  
  21.         void lock(){  
  22.             pthread_mutex_lock(&mutex);  
  23.         }  
  24.         void unlock(){  
  25.             pthread_mutex_unlock(&mutex);  
  26.         }  
  27.         ~Mutex(){  
  28.             pthread_mutex_destroy(&mutex);  
  29.         }  
  30.         pthread_mutex_t* getMutex(){  
  31.             return &mutex;  
  32.         }  
  33.     private:  
  34.         mutable pthread_mutex_t mutex;  
  35. };  
  36. class MutexLockGuard:noncopyable{//RAII管理互斥量  
  37.     public:  
  38.         explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){  
  39.             mutex_.lock();  
  40.         }  
  41.         ~MutexLockGuard(){  
  42.             mutex_.unlock();  
  43.         }  
  44.     private:  
  45.         Mutex& mutex_;//注意是引用,Mutex继承了noncopyable后不能拷贝构造  
  46. };  
  47. class Condition:public noncopyable{//条件变量的封装  
  48.     public:  
  49.         explicit Condition(Mutex& mutex):mutex_(mutex){  
  50.             pthread_cond_init(&cond,NULL);  
  51.         }  
  52.         ~Condition(){  
  53.             pthread_cond_destroy(&cond);  
  54.         }  
  55.         void wait(){  
  56.             pthread_cond_wait(&cond,mutex_.getMutex());  
  57.         }  
  58.         void notify(){  
  59.             pthread_cond_signal(&cond);  
  60.         }  
  61.         void notifyALL(){  
  62.             pthread_cond_broadcast(&cond);  
  63.         }  
  64.     private:  
  65.         Mutex& mutex_;//注意是引用  
  66.         pthread_cond_t cond;  
  67. };  
  68. template<typename T>  
  69. class BlockingQueue:noncopyable{  
  70.     public:  
  71.         explicit BlockingQueue(int x):mutex(),full(mutex),empty(mutex),Q(x){}  
  72.         void put(T a){  
  73.             MutexLockGuard guard(mutex);  
  74.             while(Q.full()){//若队列满,则等待空条件empty  
  75.                 empty.wait();//等待消费者消费  
  76.             }  
  77.             assert(!Q.full());  
  78.             Q.push_back(a);  
  79.             full.notifyALL();//通知消费者  
  80.         }  
  81.         T take(){  
  82.             MutexLockGuard guard(mutex);  
  83.             while(Q.empty()){//若队列空,则等待满条件full  
  84.                 full.wait();//等待生产者生产  
  85.             }  
  86.             assert(!Q.empty());  
  87.             T front(Q.front());  
  88.             front();  
  89.             Q.pop_front();  
  90.             empty.notify();//通知生产者  
  91.             return front;  
  92.         }  
  93.     private:  
  94.         Mutex mutex;  
  95.         Condition full;//满条件  
  96.         Condition empty;//空条件  
  97.         circular_buffer<T> Q;//boost的循环队列  
  98. };  
  99. class test{//任务内容  
  100.     public:  
  101.         explicit test(int x):data(x){}  
  102.         void show(){  
  103.             cout<<"show "<<data<<endl;  
  104.         }  
  105.     private:  
  106.         int data;  
  107. };  
  108. typedef function<void()> Functor;//任务T  
  109. BlockingQueue<Functor> taskQueue(10);  
  110. bool running=true;//终止线程标志  
  111. void* producer(void* arg){//生产者线程  
  112.     int i=0;  
  113.     while(running){  
  114.         //usleep(100);  
  115.         test one(i++);  
  116.         Functor task=bind(&test::show,one);//注意bind会拷贝参数。该处不能用&one.  
  117.         taskQueue.put(task);  
  118.     }  
  119. }  
  120. void* customer(void* arg){//消费者线程  
  121.     while(running){  
  122.         Functor task=taskQueue.take();  
  123.         task();  
  124.     }  
  125. }  
  126. int main(){  
  127.     pthread_t pid0;  
  128.     pthread_t pid[2];  
  129.     pthread_create(&pid0,NULL,producer,NULL);  
  130.     for(int i=0;i<2;i++){  
  131.         pthread_create(&pid[i],NULL,customer,NULL);  
  132.     }  
  133.     sleep(1);  
  134.     running=false;//终止线程  
  135.     pthread_join(pid0,NULL);  
  136.     return 0;  
  137. }  

抱歉!评论已关闭.