现在的位置: 首页 > 综合 > 正文

linux下线程池

2013年09月23日 ⁄ 综合 ⁄ 共 3596字 ⁄ 字号 评论关闭

 

thrmgr.h

#ifndef __THRMGR_H__
#define __THRMGR_H__

#include 
<sys/time.h>
#include 
<pthread.h>

typedef 
struct work_item_tag 
{
    
struct work_item_tag *next;
    
void *data;
    
struct timeval time_queued;
}
 work_item_t;

typedef 
struct work_queue_tag 
{
    work_item_t 
*head;
    work_item_t 
*tail;
    
int item_count;
}
 work_queue_t;

typedef 
enum
{
    POOL_INVALID,
    POOL_VALID,
    POOL_EXIT,
}
 pool_state_t;

typedef 
struct threadpool_tag
{
    pthread_mutex_t pool_mutex;
    pthread_cond_t pool_cond;
    pthread_attr_t pool_attr;

    pool_state_t state;
    
int thr_max;
    
int thr_alive;
    
int thr_idle;
    
int idle_timeout;

    
void (*handler)(void *);

    work_queue_t 
*queue;
}
 threadpool_t;

threadpool_t 
*thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
void thrmgr_destroy(threadpool_t *threadpool);
int  thrmgr_dispatch(threadpool_t *threadpool, void *user_data);

#endif


thrmgr.c

#include 
"thrmgr.h"

#include 
<stdlib.h>
#include 
<memory.h>
#include 
<errno.h>

#define FALSE (0)
#define TRUE (1)

work_queue_t 
*work_queue_new()
{
    work_queue_t 
*work_q;

    work_q 
= (work_queue_t *) malloc(sizeof(work_queue_t));

    work_q
->head = work_q->tail = NULL;
    work_q
->item_count = 0;
    
return work_q;
}


void work_queue_add(work_queue_t *work_q, void *data)
{
    work_item_t 
*work_item;

    
if (!work_q) 
    
{
        
return;
    }

    work_item 
= (work_item_t *) malloc(sizeof(work_item_t));
    work_item
->next = NULL;
    work_item
->data = data;
    gettimeofday(
&(work_item->time_queued), NULL);

    
if (work_q->head == NULL) 
    
{
        work_q
->head = work_q->tail = work_item;
        work_q
->item_count = 1;
    }

    
else 
    
{
        work_q
->tail->next = work_item;
        work_q
->tail = work_item;
        work_q
->item_count++;
    }

    
return;
}


void *work_queue_pop(work_queue_t *work_q)
{
    work_item_t 
*work_item;
    
void *data;

    
if (!work_q || !work_q->head) 
    
{
        
return NULL;
    }


    work_item 
= work_q->head;
    data 
= work_item->data;
    work_q
->head = work_item->next;
    
if (work_q->head == NULL)
    
{
        work_q
->tail = NULL;
    }


    free(work_item);
    
return data;
}


void thrmgr_destroy(threadpool_t *threadpool)
{
    
if (!threadpool || (threadpool->state != POOL_VALID))
    
{
        
return;
    }


    
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0
    
{
        
//logg("!Mutex lock failed ");
        exit(-1);
    }

    threadpool
->state = POOL_EXIT;

    
/* wait for threads to exit */
    
if (threadpool->thr_alive > 0)
    
{
        
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0)
        
{
            pthread_mutex_unlock(
&threadpool->pool_mutex);
            
return;
        }

    }


    
while (threadpool->thr_alive > 0)
    
{
        
if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0
        
{
            pthread_mutex_unlock(
&threadpool->pool_mutex);
            
return;
        }

    }


    
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0
    
{
        
//logg("!Mutex unlock failed ");
        exit(-1);
    }


    pthread_mutex_destroy(
&(threadpool->pool_mutex));
    pthread_cond_destroy(
&(threadpool->pool_cond));
    pthread_attr_destroy(
&(threadpool->pool_attr));
    free(threadpool);
    
return;
}


threadpool_t 
*thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
{
    threadpool_t 
*threadpool;

    
if (max_threads <= 0
    
{
        
return NULL;
    }


    threadpool 
= (threadpool_t *) malloc(sizeof(threadpool_t));

    threadpool
->queue = work_queue_new();
    
if (!threadpool->queue) 
    
{
        free(threadpool);
        
return NULL;
    }
 

    threadpool
->thr_max   

抱歉!评论已关闭.