登 录
//判断是否为空,设置empty标志 empty = BufferIsEmpty( products); //写入数据 products->buffer[products->posWriteTo] = item; DB(printf("done! producer: %d/n", item);) products->posWriteTo++; if (products->posWriteTo >= BUFFER_SIZE) products->posWriteTo = 0; //如果之前为空,则发通知给消费线程,新数据被写入,可以消费 if(empty) { DB(printf("buffer is empty,can not consume/n");) DB(printf("buffer is writed,consume it/n");) pthread_cond_signal(&products->notEmpty); } pthread_mutex_unlock(&products->locker); //解锁 DB(printf("produce release the locker/n");) } //消费产品 int Consume(struct Products* products) { int item, full; pthread_mutex_lock(&products->locker); DB(printf("Consume get the locker/n");) // 为空时持续等待,无数据可读, // 进入等待状态,不占用CPU,同时,释放占用的互斥锁,一旦被唤醒则再次锁定互斥量,即锁定product结构体的读写。 while (BufferIsEmpty(products)) { DB(printf("buffer is empty, can not consume/n");) DB(printf("Consume release the locker/n");) pthread_cond_wait(&products->notEmpty, &products->locker); DB(printf("notEmpty is true and it will read/n");) } //判断是否为满,设置full标志 full = BufferIsFull(products); //提取数据 item = products->buffer[products->posReadFrom]; DB(printf("done! consume: %d/n", item);) products->posReadFrom++; if (products->posReadFrom >= BUFFER_SIZE) //如果到末尾,从头读取 products->posReadFrom = 0; //如果之前是full的,则发信号通知生产线程 if(full) { DB(printf("buffer is full,can not produce/n");) DB(printf("buffer is consume, produce it/n");) pthread_cond_signal(&products->notFull); } pthread_mutex_unlock(&products->locker); DB(printf("consume release the locker/n");) return item; } #define END_FLAG (-1) struct Products products; void* ProducerThread(void* data) { int i; for (i = 0; i < 16; ++i) { printf("producer: %d/n", i); Produce(&products, i); } Produce(&products, END_FLAG); return NULL; } void* ConsumerThread(void* data) { int item; while (1) { item = Consume(&products); if (END_FLAG == item) break; printf("consumer: %d/n", item); } return (NULL); } int main(int argc, char* argv[]) { pthread_t producer; pthread_t consumer; int result; pthread_cond_init((pthread_cond_t * __restrict__)&products.notEmpty,NULL); pthread_cond_init((pthread_cond_t * __restrict__)&products.notFull,NULL); pthread_cond_init((pthread_cond_t * __restrict__)&products.locker,NULL); pthread_create(&producer, NULL, &ProducerThread, NULL); pthread_create(&consumer, NULL, &ConsumerThread, NULL); pthread_join(producer, (void *)&result); DB(printf("producer complete/n");) pthread_join(consumer, (void *)&result); DB(printf("consume complete/n");)
return 0; }
抱歉!评论已关闭.