现在的位置: 首页 > 综合 > 正文

Programming With POSIX Threads 读书笔记(三)

2019年08月22日 ⁄ 综合 ⁄ 共 6103字 ⁄ 字号 评论关闭
使用线程的几种方式

流水线:

        每个线程反复的在数据系列集上执行同一操作,并把操作结果传递给下一步骤的其他线程,这就是流水线的方式。

        下面的程序演示简单的流水线程序片段。流水线中的每个线程将它的输入加一,并传递给下一线程。主线程从控制台得到一个输入,如果是数字则会被传给流水线的开始,如果是“=”则会从流水线的结束处读取下一个结果并打印它。

        其中流水线的每一步是由一个stage_t的类型变量代表的。stage_t包括一个同步访问的互斥量mutex。条件变量avail用来通知某步要处理的数据已经准备好了。当每一步的新数据已经处理好的时候线程都会发信号给自己的ready条件变量。元素data是前一步骤传递下来的数据,thread表示执行该步骤的线程,next是指向下一个stage_t的指针。

        pipe_t结构描述的是一条流水线,它包含指向流水线的第一步和最后一步的指针。最后一步的tail是一个特殊的stage_t,因为它没有相对应的线程,它只是保存最终结果的地方。

#include <pthread.h>
#include "errors.h"

typedef struct stage_tag {
	pthread_mutex_t mutex;
	pthread_cond_t avail;
	pthread_cond_t ready;
	int data_ready;
	long data;
	pthread_t thread;
	struct stage_tag *next;
}stage_t;

typedef struct pipe_tag {
	pthread_mutex_t mutex;
	stage_t *head;
	stage_t *tail;
	int stages;
	int active;
}pipe_t;

int pipe_send(stage_t *stage, long data) {
	int status;
	status = pthread_mutex_lock(&stage->mutex);
	if (status != 0)
		return status;

	while (stage->data_ready) {
		status = pthread_cond_wait(&stage->ready, &stage->mutex);
		if (status != 0) {
			pthread_mutex_unlock(&stage->mutex);
			return status;
		}
	}

	stage->data = data;
	stage->data_ready = 1;
	status = pthread_cond_signal(&stage->avail);
	if (status != 0) {
		pthread_mutex_unlock(&stage->mutex);
		return status;
	}
	status = pthread_mutex_unlock(&stage->mutex);
	return status;
}

void *pipe_stage(void *arg) {
	stage_t *stage = (stage_t*)arg;
	stage_t *next_stage = stage->next;
	int status;

	status = pthread_mutex_lock(&stage->mutex);
	if (status != 0)
		err_abort(status, "Lock pipe stage");
	while (1) {
		while (stage->data_ready != 1) {
			status = pthread_cond_wait(&stage->avail, &stage->mutex);
			if (status != 0)
				err_abort(status, "Wait for previous stage");
		}
		pipe_send(next_stage, stage->data + 1);
		stage->data_ready = 0;
		status = pthread_cond_signal(&stage->ready);
		if (status != 0)
			err_abort(status, "Wake next stage");
	}
	return NULL;
}

int pipe_create(pipe_t *pipe, int stages) {
	int pipe_index;
	stage_t **link = &pipe->head, *new_stage, *stage;
	int status;

	status = pthread_mutex_init(&pipe->mutex, NULL);
	if (status != 0)
		err_abort(status, "Init pipe mutex");
	pipe->stages = stages;
	pipe->active = 0;

	for (pipe_index = 0; pipe_index <= stages; pipe_index++) {
		new_stage = (stage_t*)malloc(sizeof(stage_t));
		if (new_stage == NULL)
			errno_abort("Allocate stage");
		status = pthread_mutex_init(&new_stage->mutex, NULL);
		if (status != 0)
			err_abort(status, "Init stage mutex");
		status = pthread_cond_init(&new_stage->avail, NULL);
		if (status != 0)
			err_abort(status, "Init avail condition");
		status = pthread_cond_init(&new_stage->ready, NULL);
		if (status != 0)
			err_abort(status, "Init ready condition");
		new_stage->data_ready = 0;
		*link = new_stage;
		link = &new_stage->next;
	}

	*link = (stage_t*)NULL;
	pipe->tail = new_stage;

	for (stage = pipe->head; stage->next != NULL; stage = stage->next) {
		status = pthread_create(&stage->thread, NULL, pipe_stage, (void*)stage);
		if (status != 0)
			err_abort(status, "Create pipe stage");
	}
	return 0;
}

int pipe_start(pipe_t *pipe, long value) {
	int status;

	status = pthread_mutex_lock(&pipe->mutex);
	if (status != 0)
		err_abort(status, "Lock pipe mutex");
	pipe->active++;
	status = pthread_mutex_unlock(&pipe->mutex);
	if (status != 0)
		err_abort(status, "Unlock pipe mutex");
	pipe_send(pipe->head, value);
	return 0;
}

int pipe_result(pipe_t *pipe, long *result) {
	stage_t *tail = pipe->tail;
	int empty = 0;
	int status;

	status = pthread_mutex_lock(&pipe->mutex);
	if (status != 0)
		err_abort(status, "Lock pipe mutex");
	if (pipe->active <= 0)
		empty = 1;
	else
		pipe->active--;

	status = pthread_mutex_unlock(&pipe->mutex);
	if (status != 0)
		err_abort(status, "Unlock pipe mutex");
	if (empty)
		return 0;

	pthread_mutex_lock(&tail->mutex);
	while (!tail->data_ready)
		pthread_cond_wait(&tail->avail, &tail->mutex);
	*result = tail->data;
	tail->data_ready = 0;
	pthread_cond_signal(&tail->ready);
	pthread_mutex_unlock(&tail->mutex);
	return 1;
}

int main(int argc, char **argv) {
	pipe_t my_pipe;
	long value, result;
	char line[128];

	pipe_create(&my_pipe, 10);
	printf("Enter interger values, or \"=\" for next result\n");

	while (1) {
		printf("Data> ");
		if (fgets(line, sizeof(line), stdin) == NULL)
			exit(0);
		if (strlen(line) <= 1) continue;
		if (strlen(line) <= 2 && line[0] == '=') {
			if (pipe_result(&my_pipe, &result))
				printf("Result is %ld\n", result);
			else
				printf("Pipe is empty\n");
		} else {
			if (sscanf(line, "%ld", &value) < 1)
				fprintf(stderr, "Enter an interger value\n");
			else
				pipe_start(&my_pipe, value);
		}
	}
}

        我们从程序的main函数开始看起,这是一个驱动流水线的驱动程序。首先它调用了pipe_create函数,创建一条流水线。然后就不断的接受用户的输入,根据输入的不同,他可能调用pipe_result输入传给流水线的开始。也可能从流水线的末端取出一条记录并输出。

        在pipe_create函数中,首先感慨一句,比起C++的传引用,还是二级指针来得方便一点。程序一开始首先是对pipe->mutex的动态初始化。然后注意一下第一个循环,从0开始,判断条件又带等号,所以这个流水线实际上是有stages+1个stage_t节点的。在这个循环中最后的两行指针操作需要仔细思考一下,感觉C语言的指针不够扎实的话,这两行代码得仔细琢磨一下。第一句是一级指针的赋值,操作的是stage_t->next(除了第一次操作的是head)。而第二次操作的是一个二级指针,将link操作的对象改成下一个节点的next指针。在第二个循环中,注意到结束条件是stage->next!=null,也就是说最后一个节点是不会创建线程的。

        这个时候,pipe_t算是初始化创建完成了。再看一下每一个创建好的线程在干什么。每一个创建好的线程如果运行的话,首先对stage->mutex上锁,然后就进入条件等待(stage->avail),等待数据传入完成。那么在流水线创建完成之后,所有的子线程都应该处于条件等待状态。等待数据传入。在数据传入设置完成之后,他将会把新的数据传递给下一个线程,然后修改stage->ready的状态。最后发出stage->ready的信号,唤醒阻塞的pipe_send函数,继续完成传入数据的操作。然后马上又进入条件等待的状态了。所以在这个函数中我们并没有看见它显式地调用unlock。因为它在绝大多数的时间里都会处于条件等待状态,所以也就没有必要在这里解锁了。

        这个时候如果主线程得到一个用户输入的话,pipe_start函数就会将数据传入流水线的第一节,它首先更新一下流水线中的数据数量(pipe->active),然后调用pipe_send。

        pipe_send函数应该算是这个程序中最主要的函数之一,首先对stage->mutex上锁,然后判断stage中的数据是不是已经处理完成了,如果stage中的数据还没有被子线程处理,那么它将进入条件等待(stage->ready),等待线程处理数据完成。一旦线程数据处理完成了,那么它将设置传入的新数据,然后发出stage->avail的信号,唤醒子线程处理数据。然后解锁潇洒离开。

总结:

最后上一个图看看整个过程:

        第一列的pipe_send,我假设pipe_stage唤醒慢了,pipe_send第二次直接进入cond_wait(ready, mutex)。第四列的pipe_stage我假设他也唤醒慢了,但是是在pipe_send上锁之前。这样可以看到上锁被阻塞的瞬间。

pipe_send pipe_stage pipe_send pipe_stage
pipe_send
  mutex_lock(mutex)   mutex_lock(mutex)  
  cond_wait(avail, mutex)   cond_wait(avail, mutex)  
mutex_lock(mutex)        
setdata        
cond_signal(avail)        
mutex_unlock(mutex)        
mutex_lock(mutex)        
cond_wait(ready, mutex)        
  pipe_send      
  cond_signal(ready) mutex_lock(mutex)    
  cond_wait(avail, mutex) setdata    
setdata   cond_signal(avail)    
cond_signal(avail)   mutex_unlock(mutex)    
mutex_unlock(mutex)        
  pipe_send   pipe_send mutex_lock(mutex)
  cond_signal(ready) mutex_lock(mutex) cond_signal(ready) setdata
  cond_wait(avail, mutex) ——上锁阻塞—— cond_wait(avail, mutex) cond_signal(avail)
setdata   setdata   mutex_unlock(mutex)

Programming With POSIX Threads 读书笔记(一)http://blog.csdn.net/hyzhou33550336/article/details/16890691

Programming With POSIX Threads 读书笔记(二)http://blog.csdn.net/hyzhou33550336/article/details/16890959

Programming With POSIX Threads 读书笔记(三)http://blog.csdn.net/hyzhou33550336/article/details/16899433

抱歉!评论已关闭.