./smp3 client1.txt client2.txt client3.txt
则产生3个client threads, 每个对应一个文件,client thread会一次读文件中的一行,然后将该行信息和它的线程编号作为queue element
加入到queue中。
注意所有client 线程并行工作。
server thread每次从queue中取出一个queue element 并加入到 notice bord queue中去,同时计算出当前notice bord中一共有多少字符,作为返回
给对应 client thread的信息。
client thread等待它sever thread处理完它发出的上一行信息,并得到server thread 反馈的当前notice bord 一共有多少字符信息,打印
Server returned X to client Y after request Z. 到 Y.out 文件,
X 是notice bord 当前字符数目, Y对应该线程处理的文件名,Z对应该线程处理的当前行数,如1,2,3...
然后继续读下一行....重复工作。
main thread等待所有的client thread 处理完对应文本的所有行,然后通知server thread可以结束,sever thread 接到信息后结束。
需要注意的是很多资源如queue elemt 中 file name 指针所指的内容在client thread生成,确定内容,在server thread中free ,所以要堆内存动态分配。
char *message = (char *)malloc(256 * sizeof(char));
while (fgets(message, 256, pFile_read)) {
......
message = (char *)malloc(256 * sizeof(char));
}
free(message);
这里注意当前行的message会在前面处理过程中被server thread free掉,所以最后一定注意要从新分配内存。
added = 0 queue = 1 read[i] = 0
client thread
while ( can read one line)
wait(queue)
add_element_to_queue()
signal(added) // tell server has added
signal(queue)
wait(read[i]) //wait server to read the message,i is client num
print(result)
sever thread
while(1)
wait(added)
wait(queue)
if (queue is empty)
print info , finsh running
else
read message from queue top , get client num iClientID, free resouces of the queue element
signal(queue)
push message to notice bored queue
caclculate current total character num , client_result[iClientID] = char_count(); //通过全局变量返回信息给线程iClientID
signal(read[iClientID])
main thread
inint semophores
inint goloal resouces
launch sever thread, client threads
wait client threads finish
singnal(added)
wait server thread finish
free resouces main thread allocate
2 #ifndef __SMP3_H
3 #define __SMP3_H
4
5 #include <stdio.h>
6 #include <semaphore.h>
7 #include <pthread.h>
8 #include <errno.h>
9
10 sem_t queue_semaphore;
11
12
13 typedef struct __queue_element
14 {
15 char *message;
16 int client_id;
17 struct __queue_element *next;
18 } queue_element;
19
20 queue_element *queue;
21
22
23 #endif /* __SMP3_H */
//smp3.c
2 Small Machine Problem #3
3 Spring 2009
4 */
5
6 /*Start: include appropriate header files*/
7 // Include appropriate header files here
8 /*End: include appropriate header files*/
9
10 #include "smp3.h"
11 #include "smp3.server.h"
12 #include "smp3.client.h"
13
14 void init_semaphores(int num_clients)
15 {
16 if(sem_init(&queue_semaphore, 0, 1) == -1 ||
17 sem_init(&server_notify_semaphore, 0, 0) == -1) {
18 printf("Failed to init semaphore!\n");
19 exit(1);
20 }
21 int i;
22 for (i = 0; i < num_clients; i++) {
23 if (sem_init(&client_notify_semaphore[i], 0, 0) == -1) {
24 printf("Failed to init semaphore\n");
25 exit(1);
26 }
27 }
28 }
29
30 void free_resources(int num_clients)
31 {
32 int i;
33 sem_destroy(&queue_semaphore);
34 sem_destroy(&server_notify_semaphore);
35 for (i = 0; i < num_clients; i++)
36 sem_destroy(&client_notify_semaphore[i]);
37
38 free(client_notify_semaphore);
39 free(client_result);
40 }
41
42 int main(int argc, char **argv)
43 {
44 /* Start your code */
45 if (argc == 1) {
46 printf("You should at least give one input file\n");
47 exit(1);
48 }
49
50 int i;
51 int num_clients = argc - 1;
52 //init resouce
53 pthread_t tid_client[num_clients];
54 pthread_t tid_server;
55 client_notify_semaphore = (sem_t *) malloc(num_clients * sizeof(sem_t));
56 client_result = (int *) malloc(num_clients * sizeof(int));
57
58 init_semaphores(num_clients);
59
60 //launch server thread
61 pthread_create(&tid_server, NULL, server_thread, NULL);
62 //launch num_clients client threads
63 client_info client_info[num_clients];
64 for (i = 0; i < num_clients; i++) {
65 client_info[i].client_id = i;
66 client_info[i].file_name = argv[i + 1];
67 pthread_create(&tid_client[i], NULL, client_thread, (void *)&client_info[i]);
68 }
69
70 //wait all clients thread to finish
71 for (i = 0; i < num_clients; i++)
72 pthread_join(tid_client[i], NULL);
73
74 //notify server it should print info and exit,now the queue is empty
75 if (sem_post(&server_notify_semaphore) == -1) {
76 fprintf(stderr, "Main thread failed to post sever_notify_semaphore");
77 exit(1);
78 }
79
80 //wait server finish
81 pthread_join(tid_server, NULL);
82
83 free_resources(num_clients);
84 /* End your code */
85
86 return 0;
87 }
88
2 #ifndef __SMP3CLIENT_H
3 #define __SMP3_CLIENT_H
4
5 #include <semaphore.h>
6 #include "smp3.h"
7
8 sem_t *client_notify_semaphore;
9 int *client_result;
10
11 typedef struct __client_info
12 {
13 int client_id;
14 char *file_name;
15 } client_info;
16
17
18 void add_element_to_end_of_queue(queue_element *);
19 void *client_thread(void *);
20
21
22 #endif /* __SMP3_CLIENT_H */
2 /*Start: include appropriate header files*/
3 // Include appropriate header files here
4 /*End: include appropriate header files*/
5
6 #include "smp3.client.h"
7 #include "smp3.server.h"
8 #include "smp3.h"
9
10 void add_element_to_end_of_queue(queue_element *ptr_queue_element)
11 {
12 /* Sanity to ensure the queue ends */
13 ptr_queue_element->next = NULL;
14
15 /* Insert into queue */
16 if (queue == NULL)
17 queue = ptr_queue_element;
18 else
19 {
20 queue_element *thru = queue;
21
22 while (thru->next != NULL)
23 thru = thru->next;
24
25 thru->next = ptr_queue_element;
26 }
27 }
28
29 void *client_thread(void *vptr_client_info)
30 {
31 int my_client_id = ((client_info *)vptr_client_info)->client_id;
32 char *my_file_name = ((client_info *)vptr_client_info)->file_name;
33 /* Start your code */
34
35 //You can safely assume that each line will contain less than 256 characters.
36 FILE *pFile_read, *pFile_write;
37 char *message = (char *)malloc(256 * sizeof(char));
38
39 pFile_read = fopen (my_file_name , "r");
40 if (pFile_read == NULL) {
41 perror ("Error opening file");
42 return;
43 }
44
45 //the outfile name is inpufile.out
46 char out_file_name[strlen(my_file_name) + 4];
47 strcpy(out_file_name, my_file_name);
48 strcat(out_file_name,".out");
49
50 pFile_write = fopen(out_file_name, "w");
51 if (pFile_write == NULL) {
52 perror("Error writting file");
53 return;
54 }
55
56 queue_element *p_queue_element;
57 int request = 1;
58 while (fgets(message, 256, pFile_read)) { //fgets will incluce '\n'
59
60 message[strlen(message) - 1] = '\0'; //remove the last '\n'
61 //printf("%s\n", message);
62 p_queue_element = (queue_element *) malloc(sizeof(queue_element));
63 p_queue_element->client_id = my_client_id;
64 p_queue_element->message = message;
65
66 /* acquire a lock on the queue. */
67 if (sem_wait(&queue_semaphore) == -1) {
68 fprintf(stderr, "The client thread failed to acquire a lock on the queue. (Error Value = %d)\n", errno);
69 return;
70 }
71
72 add_element_to_end_of_queue(p_queue_element);
73
74 //notify sever I have add element to queue
75 if (sem_post(&server_notify_semaphore) == -1) {
76 fprintf(stderr, "The client thread failed to post server_notify_semaphore. (Error Value = %d)\n", errno);
77 return;
78 }
79
80 //release the queue lock
81 if (sem_post(&queue_semaphore) == -1) {
82 fprintf(stderr, "The client thread failed to post queue_semaphore. (Error Value = %d)\n", errno);
83 return;
84 }
85
86 //waiting sever finish dealing with this message this particular clinet thread sent
87 if (sem_wait(&client_notify_semaphore[my_client_id]) == -1) {
88 fprintf(stderr, "The client thread failed to wait client_notify_semaphore . (Error Value = %d)\n", errno);
89 return;
90 }
91
92 //print the severt returned result and then loop to deal the next line
93 fprintf(pFile_write, "Server returned %d to client %s after request %d\n",
94 client_result[my_client_id], my_file_name, request++);
95 message = (char *)malloc(256 * sizeof(char)); //do not forget this!!Since it has been freed by server here must realloc
96
97 }
98
99 free(message); // the last malloc need to be freed
100 /* End your code */
101
102 return NULL;
103 }
104
105
2 #ifndef __SMP3_SERVER_H
3 #define __SMP3_SERVER_H
4
5 #include <semaphore.h>
6
7 sem_t server_notify_semaphore;
8
9 typedef struct __notice
10 {
11 char *message;
12 int client_id;
13 struct __notice *next;
14 } notice;
15
16 notice *start_notice;
17
18 void add_element_to_end_of_notice(notice *);
19 void free_notice_memory();
20 void print_notice();
21 int my_strlen(const char *);
22 int char_count();
23 void *server_thread(void *);
24
25
26 #endif /* __SMP3_SERVER_H */
2 /*
3 Small Machine Problem #3
4 Spring 2009
5 */
6
7 /*Start: include appropriate header files*/
8 // Include appropriate header files here
9 /*End: include appropriate header files*/
10
11 #include "smp3.server.h"
12 #include "smp3.client.h"
13 #include "smp3.h"
14
15 void add_element_to_end_of_notice(notice *ptr_notice)
16 {
17 /* Sanity to ensure the notice queue ends */
18 ptr_notice->next = NULL;
19
20 /* Insert into queue */
21 if (start_notice == NULL)
22 start_notice = ptr_notice;
23 else
24 {
25 notice *thru = start_notice;
26
27 while (thru->next != NULL)
28 thru = thru->next;
29
30 thru->next = ptr_notice;
31 }
32 }
33
34 void free_notice_memory()
35 {
36 notice *