现在的位置: 首页 > 综合 > 正文

Linux c 简易线程池

2018年01月18日 ⁄ 综合 ⁄ 共 4823字 ⁄ 字号 评论关闭

参考别人的代码 自己手写了一个线程池,备忘,自己也是刚接触线程池,和我一样的同学,建议以main->create_thread_pool->add_task_to_list->destroy_thread_pool->thread_func的流程看下去。

/*
     线程池 2014年2月18日 09:46:38
*/
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include<pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>


//typedef BOOL int

/*
    task_list 任务链表
    task_func: 任务
    arg      :传递给任务的参数  
*/

typedef struct _task_list
    {
         void * arg;
         void * (*task_func)(void*);
         struct _task_list *next;
    }task_list;

/*
       线程池
       1.n_max_thread       线程池中的最大线程数
       2.task_list_head     任务链表
       3.b_stop_thread_pool 线程池开关,判断线程池是否关闭
       4.thread_id          线程id组
       5.cond               触发条件
       6.mutex              task任务队列锁
*/

typedef struct _thread_pool
    {
         int          n_max_thread;
         task_list*   task_list_head;
         int          b_stop_thread_pool;
         pthread_t*   thread_id;
         pthread_cond_t  cond;
         pthread_mutex_t mutex;
    }thread_pool_t;


int create_thread_pool(int _n_max_thread);
int add_task_to_list(void* (*_task_func)(void*), void* _arg );
int destroy_thread_pool();


#endif//__THREAD_POOL_H__


#include "thread_pool.h"

#define malloc_err(p) if((p) == NULL){ perror("p:"); return -1;}

thread_pool_t * g_thread_pool = NULL;//初始化全局线程池

int g_nTatal_thread = 0;

static int total_task(task_list* _head)//计算当前任务链表中剩余任务数
{
     
    int n_total = 0;
    if( _head == NULL ) return n_total;
    while( _head )
    {    
    	   n_total++;
        _head = _head -> next;
    }
    return n_total;

}

/*线程函数*/
static void* thread_func(void* arg)
{
    task_list* head_bak;
    while(1)//线程函数不退出,执行完一个任务之后继续阻塞
    {   
    	  
        pthread_mutex_lock(&g_thread_pool->mutex);
        while( !g_thread_pool->b_stop_thread_pool && !g_thread_pool ->task_list_head )//当线程池未关闭且任务链表中没有任务
            {
                pthread_cond_wait(&g_thread_pool->cond, &g_thread_pool->mutex);                             
            }
            
            if(g_thread_pool -> b_stop_thread_pool)//线程池开关关闭,线程被broastcast唤醒 
            	   {
            	         pthread_mutex_unlock(&g_thread_pool->mutex);//解锁
            	         pthread_exit(0);//线程退出
            	    }
             head_bak = g_thread_pool -> task_list_head;//取出第一个任务
             g_thread_pool -> task_list_head = g_thread_pool -> task_list_head -> next;//任务节点后移
             
           printf("线程摘走一个任务后:剩余任务数:%d, %s \n",total_task(g_thread_pool -> task_list_head),__func__);  
             pthread_mutex_unlock(&g_thread_pool->mutex);//打开任务链表锁
             head_bak -> task_func( head_bak -> arg );//执行取出的任务,并导入参数
             
             free(head_bak);//释放任务节点空间
             head_bak = NULL;
             
             // 传入的任务执行时间过短,加一个sleep可唤醒线程池中的其他线程
             //否则只有一个线程在执行任务链表中的任务
             sleep(1);
    }
}
/*
      create_thread_pool 创建并初始化线程池
      _nMax_thread       初始化线程池中线程的数量
*/
int create_thread_pool(int _n_max_thread)
{
	    g_thread_pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));//开辟线程池空间
	   // malloc( g_thread_pool);
	    g_thread_pool -> n_max_thread       = _n_max_thread;//初始化线程池中线程的数量
	    g_thread_pool -> b_stop_thread_pool = 0;//初始化线程池开关
	    g_thread_pool -> task_list_head     = NULL;//初始化任务列表
	    
	    
	    g_thread_pool -> thread_id = (pthread_t*)malloc(sizeof(pthread_t) * g_thread_pool->n_max_thread);//开辟空间存放线程ID
	    
	    int n_index;
	    for( n_index = 0; n_index < g_thread_pool ->n_max_thread; n_index++ )
	    {
	          pthread_create(&g_thread_pool ->thread_id[n_index], NULL, thread_func, NULL );//可加出错处理
	    }
	    
	    if( pthread_mutex_init(&g_thread_pool -> mutex, NULL) == -1 )
	    	return -1;
	    
	    if( pthread_cond_init(&g_thread_pool -> cond, NULL) == -1 )
	    	return -1;
	       
	    return 0;	
}
int add_task_to_list(void* (*_task_func)(void*), void* _arg )
{
     
     if( _task_func == NULL ) return -1;
     task_list * task_tmp = (task_list*)malloc(sizeof(task_list));//开辟任务节点空间
     //malloc_err(task_list);
     task_tmp -> task_func = _task_func;
     task_tmp -> arg = _arg;
     task_tmp -> next = NULL;//填充节点
     
     pthread_mutex_lock(&g_thread_pool->mutex);//操作任务链表之前先加锁
     
     if( g_thread_pool -> task_list_head == NULL )//判断头节点为空,当前任务链表没有任务
     	 {
     	      g_thread_pool -> task_list_head = task_tmp;
     	 }
     else
       {
         task_list * head_bak = g_thread_pool -> task_list_head;
         while( head_bak ->next != NULL )//找到任务链表中的最后一个节点,插入新任务
             {
                  head_bak = head_bak -> next;
             } 
             head_bak -> next = task_tmp;	
       }
       
       printf("当前任务数:%d, %s \n",total_task(g_thread_pool -> task_list_head),__func__);
       
       //任务链表中有新任务,唤醒阻塞等待的线程
       pthread_cond_signal(&g_thread_pool->cond);
       
       pthread_mutex_unlock(&g_thread_pool->mutex);
       
       return 0;
     
}
int destroy_thread_pool()
{
    if( g_thread_pool -> b_stop_thread_pool /* == 1*/)//如果线程池已关闭
        {
            return 0;
        }
    g_thread_pool -> b_stop_thread_pool = 1;//首先关闭开关
    pthread_mutex_lock(&g_thread_pool->mutex);
    pthread_cond_broadcast(&g_thread_pool->cond);//唤醒所有阻塞的进程
    pthread_mutex_unlock(&g_thread_pool->mutex);
    
    //防止僵尸进程,等待线程池中所有线程结束
    int n_index;
    for( n_index = 0; n_index < g_thread_pool -> n_max_thread; ++n_index )
        {
            pthread_join(g_thread_pool -> thread_id[n_index], NULL);
        }
     
     free(g_thread_pool -> thread_id ); //释放线程ID空间
     g_thread_pool -> thread_id = NULL;

     task_list *head_bak =  g_thread_pool -> task_list_head;
     while( head_bak != NULL )//释放任务链表空间
         {
             g_thread_pool -> task_list_head = g_thread_pool -> task_list_head -> next;
             free(head_bak);
             head_bak =  g_thread_pool -> task_list_head;
         }
     
    
     pthread_mutex_destroy(&g_thread_pool->mutex);
     pthread_cond_destroy(&g_thread_pool->cond);
     free(g_thread_pool);//释放掉线程池空间
     g_thread_pool = NULL;
      
    return 0;
}


测试代码

#include<stdio.h>
#include<stdlib.h>
#include"thread_pool.h"

void* task(void* arg)
{
     printf("当前线程[ID]: %u, [arg]:%d \n",(unsigned int)pthread_self(), arg);
}

int  main()
{
     create_thread_pool(5);
    
    int i;
    for(i=1; i<=20; ++i)
    {
        add_task_to_list(task, (void*)i);
    }
    
    
    sleep(8);
       
    destroy_thread_pool();

    return 0;
}


不太会写makefile,勉强能用

all:test
test:test.o thread_pool.o
	gcc  test.o thread_pool.o -o test -lpthread
test.o:
	gcc -c test.c thread_pool.h
thread_pool.o:
	gcc -c thread_pool.c thread_pool.h 

.PHONY:clean
clean:
	rm -f test *.o
	 rm -f *.gch  

抱歉!评论已关闭.