每个线程反复的在数据系列集上执行同一操作,并把操作结果传递给下一步骤的其他线程,这就是流水线的方式。
下面的程序演示简单的流水线程序片段。流水线中的每个线程将它的输入加一,并传递给下一线程。主线程从控制台得到一个输入,如果是数字则会被传给流水线的开始,如果是“=”则会从流水线的结束处读取下一个结果并打印它。
其中流水线的每一步是由一个stage_t的类型变量代表的。stage_t包括一个同步访问的互斥量mutex。条件变量avail用来通知某步要处理的数据已经准备好了。当每一步的新数据已经处理好的时候线程都会发信号给自己的ready条件变量。元素data是前一步骤传递下来的数据,thread表示执行该步骤的线程,next是指向下一个stage_t的指针。
pipe_t结构描述的是一条流水线,它包含指向流水线的第一步和最后一步的指针。最后一步的tail是一个特殊的stage_t,因为它没有相对应的线程,它只是保存最终结果的地方。
#include <pthread.h> #include "errors.h" typedef struct stage_tag { pthread_mutex_t mutex; pthread_cond_t avail; pthread_cond_t ready; int data_ready; long data; pthread_t thread; struct stage_tag *next; }stage_t; typedef struct pipe_tag { pthread_mutex_t mutex; stage_t *head; stage_t *tail; int stages; int active; }pipe_t; int pipe_send(stage_t *stage, long data) { int status; status = pthread_mutex_lock(&stage->mutex); if (status != 0) return status; while (stage->data_ready) { status = pthread_cond_wait(&stage->ready, &stage->mutex); if (status != 0) { pthread_mutex_unlock(&stage->mutex); return status; } } stage->data = data; stage->data_ready = 1; status = pthread_cond_signal(&stage->avail); if (status != 0) { pthread_mutex_unlock(&stage->mutex); return status; } status = pthread_mutex_unlock(&stage->mutex); return status; } void *pipe_stage(void *arg) { stage_t *stage = (stage_t*)arg; stage_t *next_stage = stage->next; int status; status = pthread_mutex_lock(&stage->mutex); if (status != 0) err_abort(status, "Lock pipe stage"); while (1) { while (stage->data_ready != 1) { status = pthread_cond_wait(&stage->avail, &stage->mutex); if (status != 0) err_abort(status, "Wait for previous stage"); } pipe_send(next_stage, stage->data + 1); stage->data_ready = 0; status = pthread_cond_signal(&stage->ready); if (status != 0) err_abort(status, "Wake next stage"); } return NULL; } int pipe_create(pipe_t *pipe, int stages) { int pipe_index; stage_t **link = &pipe->head, *new_stage, *stage; int status; status = pthread_mutex_init(&pipe->mutex, NULL); if (status != 0) err_abort(status, "Init pipe mutex"); pipe->stages = stages; pipe->active = 0; for (pipe_index = 0; pipe_index <= stages; pipe_index++) { new_stage = (stage_t*)malloc(sizeof(stage_t)); if (new_stage == NULL) errno_abort("Allocate stage"); status = pthread_mutex_init(&new_stage->mutex, NULL); if (status != 0) err_abort(status, "Init stage mutex"); status = pthread_cond_init(&new_stage->avail, NULL); if (status != 0) err_abort(status, "Init avail condition"); status = pthread_cond_init(&new_stage->ready, NULL); if (status != 0) err_abort(status, "Init ready condition"); new_stage->data_ready = 0; *link = new_stage; link = &new_stage->next; } *link = (stage_t*)NULL; pipe->tail = new_stage; for (stage = pipe->head; stage->next != NULL; stage = stage->next) { status = pthread_create(&stage->thread, NULL, pipe_stage, (void*)stage); if (status != 0) err_abort(status, "Create pipe stage"); } return 0; } int pipe_start(pipe_t *pipe, long value) { int status; status = pthread_mutex_lock(&pipe->mutex); if (status != 0) err_abort(status, "Lock pipe mutex"); pipe->active++; status = pthread_mutex_unlock(&pipe->mutex); if (status != 0) err_abort(status, "Unlock pipe mutex"); pipe_send(pipe->head, value); return 0; } int pipe_result(pipe_t *pipe, long *result) { stage_t *tail = pipe->tail; int empty = 0; int status; status = pthread_mutex_lock(&pipe->mutex); if (status != 0) err_abort(status, "Lock pipe mutex"); if (pipe->active <= 0) empty = 1; else pipe->active--; status = pthread_mutex_unlock(&pipe->mutex); if (status != 0) err_abort(status, "Unlock pipe mutex"); if (empty) return 0; pthread_mutex_lock(&tail->mutex); while (!tail->data_ready) pthread_cond_wait(&tail->avail, &tail->mutex); *result = tail->data; tail->data_ready = 0; pthread_cond_signal(&tail->ready); pthread_mutex_unlock(&tail->mutex); return 1; } int main(int argc, char **argv) { pipe_t my_pipe; long value, result; char line[128]; pipe_create(&my_pipe, 10); printf("Enter interger values, or \"=\" for next result\n"); while (1) { printf("Data> "); if (fgets(line, sizeof(line), stdin) == NULL) exit(0); if (strlen(line) <= 1) continue; if (strlen(line) <= 2 && line[0] == '=') { if (pipe_result(&my_pipe, &result)) printf("Result is %ld\n", result); else printf("Pipe is empty\n"); } else { if (sscanf(line, "%ld", &value) < 1) fprintf(stderr, "Enter an interger value\n"); else pipe_start(&my_pipe, value); } } }
我们从程序的main函数开始看起,这是一个驱动流水线的驱动程序。首先它调用了pipe_create函数,创建一条流水线。然后就不断的接受用户的输入,根据输入的不同,他可能调用pipe_result输入传给流水线的开始。也可能从流水线的末端取出一条记录并输出。
在pipe_create函数中,首先感慨一句,比起C++的传引用,还是二级指针来得方便一点。程序一开始首先是对pipe->mutex的动态初始化。然后注意一下第一个循环,从0开始,判断条件又带等号,所以这个流水线实际上是有stages+1个stage_t节点的。在这个循环中最后的两行指针操作需要仔细思考一下,感觉C语言的指针不够扎实的话,这两行代码得仔细琢磨一下。第一句是一级指针的赋值,操作的是stage_t->next(除了第一次操作的是head)。而第二次操作的是一个二级指针,将link操作的对象改成下一个节点的next指针。在第二个循环中,注意到结束条件是stage->next!=null,也就是说最后一个节点是不会创建线程的。
这个时候,pipe_t算是初始化创建完成了。再看一下每一个创建好的线程在干什么。每一个创建好的线程如果运行的话,首先对stage->mutex上锁,然后就进入条件等待(stage->avail),等待数据传入完成。那么在流水线创建完成之后,所有的子线程都应该处于条件等待状态。等待数据传入。在数据传入设置完成之后,他将会把新的数据传递给下一个线程,然后修改stage->ready的状态。最后发出stage->ready的信号,唤醒阻塞的pipe_send函数,继续完成传入数据的操作。然后马上又进入条件等待的状态了。所以在这个函数中我们并没有看见它显式地调用unlock。因为它在绝大多数的时间里都会处于条件等待状态,所以也就没有必要在这里解锁了。
这个时候如果主线程得到一个用户输入的话,pipe_start函数就会将数据传入流水线的第一节,它首先更新一下流水线中的数据数量(pipe->active),然后调用pipe_send。
pipe_send函数应该算是这个程序中最主要的函数之一,首先对stage->mutex上锁,然后判断stage中的数据是不是已经处理完成了,如果stage中的数据还没有被子线程处理,那么它将进入条件等待(stage->ready),等待线程处理数据完成。一旦线程数据处理完成了,那么它将设置传入的新数据,然后发出stage->avail的信号,唤醒子线程处理数据。然后解锁潇洒离开。
总结:
最后上一个图看看整个过程:
第一列的pipe_send,我假设pipe_stage唤醒慢了,pipe_send第二次直接进入cond_wait(ready, mutex)。第四列的pipe_stage我假设他也唤醒慢了,但是是在pipe_send上锁之前。这样可以看到上锁被阻塞的瞬间。
pipe_send | pipe_stage | pipe_send | pipe_stage |
pipe_send |
mutex_lock(mutex) | mutex_lock(mutex) | |||
cond_wait(avail, mutex) | cond_wait(avail, mutex) | |||
mutex_lock(mutex) | ||||
setdata | ||||
cond_signal(avail) | ||||
mutex_unlock(mutex) | ||||
mutex_lock(mutex) | ||||
cond_wait(ready, mutex) | ||||
pipe_send | ||||
cond_signal(ready) | mutex_lock(mutex) | |||
cond_wait(avail, mutex) | setdata | |||
setdata | cond_signal(avail) | |||
cond_signal(avail) | mutex_unlock(mutex) | |||
mutex_unlock(mutex) | ||||
pipe_send | pipe_send | mutex_lock(mutex) | ||
cond_signal(ready) | mutex_lock(mutex) | cond_signal(ready) | setdata | |
cond_wait(avail, mutex) | ——上锁阻塞—— | cond_wait(avail, mutex) | cond_signal(avail) | |
setdata | setdata | mutex_unlock(mutex) |
Programming With POSIX Threads 读书笔记(一)http://blog.csdn.net/hyzhou33550336/article/details/16890691
Programming With POSIX Threads 读书笔记(二)http://blog.csdn.net/hyzhou33550336/article/details/16890959
Programming With POSIX Threads 读书笔记(三)http://blog.csdn.net/hyzhou33550336/article/details/16899433