现在的位置: 首页 > 综合 > 正文

服务器设计系列:定时器

2013年10月01日 ⁄ 综合 ⁄ 共 11329字 ⁄ 字号 评论关闭

    一、 基础知识
    1、时间类型。
Linux下常用的时间类型有4个:time_t,struct timeval,struct timespec,struct tm。
    (1)time_t是一个长整型,一般用来表示用1970年以来的秒数。

    (2)struct timeval有两个成员,一个是秒,一个是微妙。

struct timeval {
    long tv_sec;        /* seconds */
    long tv_usec;  /* microseconds */
};

    (3)struct timespec有两个成员,一个是秒,一个是纳秒。

struct timespec{
    time_t  tv_sec;         /* seconds */
    long    tv_nsec;        /* nanoseconds */
};

    (4)struct tm是直观意义上的时间表示方法:

struct tm {
    int     tm_sec;         /* seconds */
    int     tm_min;         /* minutes */
    int     tm_hour;        /* hours */
    int     tm_mday;        /* day of the month */
    int     tm_mon;         /* month */
    int     tm_year;        /* year */
    int     tm_wday;        /* day of the week */
    int     tm_yday;        /* day in the year */
    int     tm_isdst;       /* daylight saving time */
};

    2、 时间操作
    (1) 时间格式间的转换函数

    主要是 time_t、struct tm、时间的字符串格式之间的转换。看下面的函数参数类型以及返回值类型:

char *asctime(const struct tm *tm);
char *ctime(const time_t *timep);
struct tm *gmtime(const time_t *timep);
struct tm *localtime(const time_t *timep);
time_t mktime(struct tm *tm);

    gmtime和localtime的参数以及返回值类型相同,区别是前者返回的格林威治标准时间,后者是当地时间。
    (2) 获取时间函数
    两个函数,获取的时间类型看原型就知道了:

time_t time(time_t *t);
int gettimeofday(struct timeval *tv, struct timezone *tz);

    前者获取time_t类型,后者获取struct timeval类型,因为类型的缘故,前者只能精确到秒,后者可以精确到微秒。
    二、 延迟函数
   主要的延迟函数有:sleep(), usleep(), nanosleep(), select(), pselect()。

unsigned int sleep(unsigned int seconds);
void usleep(unsigned long usec);
int nanosleep(const struct timespec *req, struct timespec *rem);
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
int pselect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timespec *timeout, const sigset_t *sigmask);

    alarm函数是信号方式的延迟,这种方式不直观,这里不说了。
    仅通过函数原型中时间参数类型,可以猜测sleep可以精确到秒级,usleep/select可以精确到微妙级,nanosleep和pselect可以精确到纳秒级。
    而实际实现中,linux上的nanosleep和alarm相同,都是基于内核时钟机制实现,受linux内核时钟实现的影响,并不能达到纳秒级的精度,man nanosleep也可以看到这个说明,man里给出的精度是:Linux/i386上是10 ms ,Linux/Alpha上是1ms。
    这里有一篇文章http://blog.csdn.net/zhoujunyi/archive/2007/03/30/1546330.aspx,测试了不同延迟函数之间的精确度。文章给出的结论是linux上精度最高的是select,10ms级别。我在本机器测试select和pselect相当,都达到了1ms级的精度,精度高于文章中给出的10ms,sleep在秒级以上和usleep/nanosleep相当。下面贴下我机器上1ms时候的测试结果,其他不贴了:

sleep           1000          0      -1000 
usleep           1000       2974       1974 
nanosleep        1000       2990       1990 
select           1000        991         -9 
pselect           1000        990        -10 
gettimeofday           1000       1000          0

    而使用gettimeofday循环不停检测时间,可精确微秒级,不过不适宜用来做定时器模块。因此后面的定时期模块将选择select为延迟函数。
    三、 定时器模块需求以及实现概述
    1、需求。
从实现结果的角度说来,需求就是最终的使用方式。呵呵,不详细描述需求了,先直接给出我实现的CTimer类的三个主要接口:

class CTimer{
public:
    CTimer(unsigned int vinterval,void (*vfunc)(CTimer *,void *),void *vdata,TimerType vtype);
    void start();
    void stop();
    void reset(unsigned int vinterval);
};

    使用定时器模块的步骤如下:
    (1) 实例化一个CTimer,参数的含义依次是:vinterval间隔时间(单位ms),vfunc是时间到回调的函数,vdata回调函数使用的参数,vtype定时器的类型,分一次型和循环型两种。
    (2) 调用start方法。
    (3) 必要的时候调用stop和reset。
    2、实现。简单描述下定时器模块的实现,有一个manager单例类保存所有CTimer对象,开启一线程运行延迟函数,每次延迟间隔到,扫描保存CTimer的容器,对每个CTimer对象执行减少时间操作,减少到0则执行回调函数。对一次性CTimer,超时则从容器中删除,循环型的将间隔时间重置,不从容器中移除。
    CTimer的start执行将对象插入到manager容器中操作;stop执行将对象从manager容器中删除的操作;reset执行先删除,重置间隔,然后再放到容器中,reset不改变CTimer的定时器类型属性。
    四、 定时器模块的数据结构选择
    Manager类的容器要频繁进行的操作涉及插入、删除、查询等。
    误区:

    (1)简单看,好象该容器要是有序的,方便插入删除等,貌似红黑树比较合适。其实不然,插入删除操作的频率很低,最频繁的还是每次时延到,对容器的扫描并做时间减少操作,红黑树在做顺序扫描相对链表并没什么优势。
    (2) 插入的时候依照顺序链表的方式插入到合适的位置保持排序,以保证超时的对象都在链表的头端。其实这也是没必要的,每次时延到,对每一个对象都要做时间减少操作,因此不管是有序还是无序,都是一次扫描就执行完下面操作:减少时间、判断是否超时,是则执行回调,继续判断是什么类型,一次型的则执行完就移除,循环型则执行完直接重置间隔就可。
    因此,只需要能快速插入头、删除结点、遍历就好。我的实现直接使用BSD内核中的数据结构LIST,插入头、删除时间复杂度都是1,遍历就不说了。linux下/usr/include/sys下有头文件queue.h里也有LIST结构以及操作的定义。貌似linux下的少了遍历宏:

#define LIST_FOREACH(var, head, field)     \
 for((var) = LIST_FIRST(head);     \
     (var)!= LIST_END(head);     \
     (var) = LIST_NEXT(var, field))

    五、 详细实现
    这里贴出主要的代码,请重点关注CTimerManager:: process方法,不再详细说了。整体代码很简单,就两个类。
   (1)  CTimer类实现

    timer.h文件如下:

#ifndef _CTIMER_H_
#define _CTIMER_H_
#include "queue.h"      //OpenBSD中的queue.h
#include <sys/time.h>

//定时器
class CTimer
{
friend class CTimerManager;
public:
    typedef enum
    {
        TIMER_IDLE=0,    //start前以及手动调用stop后的状态
        TIMER_ALIVE,     //在manager的list里时候的状态
        TIMER_TIMEOUT    //超时后被移除的状态,循环型的没有
    }TimerState;
    typedef enum
    {
        TIMER_ONCE=0,    //一次型
        TIMER_CIRCLE     //循环型
    }TimerType;
    CTimer(unsigned int vinterval=0, void (*vfunc)(CTimer *,void *)=0, 
			void *vdata=0, TimerType vtype=TIMER_ONCE);
    void start();
    void stop();
    void reset(unsigned int vinterval);
    ~CTimer();
private:
    unsigned int id_;          //测试用
    unsigned int m_interval;   //间隔,不变
    unsigned int m_counter;    //开始设置为interval,随延迟时间到,减少
    TimerState m_state;        //状态
    TimerType m_type;          //类型
    void (*m_func)(CTimer *,void *);   //回调函数
    void * m_data;                     //回调函数参数
    LIST_ENTRY(CTimer) entry_;         //LIST的使用方式
};
#endif

    timer.cpp文件如下:

#include "timer.h"
#include "timer_manager.h"

/*构造函数*/
CTimer::CTimer(unsigned int vinterval, void (*vfunc)(CTimer *,void *), void *vdata, TimerType vtype):
    m_interval(vinterval),m_counter(vinterval),
    m_state(TIMER_IDLE),m_type(vtype),
    m_func(vfunc),m_data(vdata)
{}
/*开始定时器*/
void CTimer::start()
{
    CTimerManager::instance()->add_timer(this);
}
/*停止定时器*/
void CTimer::stop()
{
    CTimerManager::instance()->remove_timer(this);
}
/*reset定时器*/
void CTimer::reset(unsigned int vinterval)
{
    CTimerManager::instance()->remove_timer(this);
    m_counter=m_interval=vinterval;
    CTimerManager::instance()->add_timer(this);
}
/*析构函数,*/
CTimer::~CTimer()
{
    if(m_state==TIMER_ALIVE)
        stop();  //stop操作不能省略,避免delete前忘记stop
}

   (2)CTimerManager类实现

    timer_manager.h文件如下:

#ifndef __CTIMER_MANGER_H_
#define __CTIMER_MANGER_H_
#include "timer.h"
#include <pthread.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

//定时器管理类
class CTimerManager
{
public:
    typedef enum
    {
		TIMER_MANAGER_STOP=0,
        TIMER_MANAGER_START
    }TimerManagerState;
    
    static CTimerManager * instance();
    void add_timer(CTimer * vtimer);       //向链表中添加定时器:线程安全的add
    void remove_timer(CTimer * vtimer);    //从链表中移除定时器:线程安全的remove
    void start();  //开始process线程
    void stop();  //停止process线程
    void dump();
protected:
    static void * process(void *);         //实际的定时器时间延迟线程
private:    
    void add_timer_(CTimer * vtimer);      //非线程安全的add
    void remove_timer_(CTimer * vtimer);   //非线程安全的remove
    
    CTimerManager();
    static pthread_mutex_t m_mutex;
    static CTimerManager * m_instance;    //单例对象指针
    
    TimerManagerState m_state;
    LIST_HEAD(,CTimer) list_;     //LIST使用方式:存放定时器对象的链表

    static unsigned int mark;     //测试,配合dump
};
#endif

    timer_manager.cpp文件如下:

#include "timer_manager.h"
#include <stdio.h>
#include <sys/select.h>
#include <errno.h>

/* 返回单例 */
CTimerManager * CTimerManager::instance()
{
    if(m_instance==NULL)   //singlton的double-check实现
    {
        pthread_mutex_lock(&m_mutex);
        if(m_instance==NULL)
        {
            m_instance=new CTimerManager();
        }
        pthread_mutex_unlock(&m_mutex);
    }
    return m_instance;
}
/* 开始延迟函数线程 */
void CTimerManager::start()
{
    if(m_state==TIMER_MANAGER_STOP){
		m_state=TIMER_MANAGER_START;
		pthread_t pid;
		//process必须static,不能操作非static属性,因此传递this指针
		pthread_create(&pid,0,process,this);
    }
}

//......

/* 定时器模块的时间延迟线程 */
void * CTimerManager::process(void * arg)
{
    pthread_detach(pthread_self());
	//获取本单例
    CTimerManager *manage=(CTimerManager *)arg;
    CTimer *item;
    struct timeval start,end;
    unsigned int delay;    
    struct timeval tm;
    gettimeofday(&end,0);  //获取当前时间
	
	//使用状态控制线程运行,进而容易实现stop,也可以使用pthread_cancel粗暴的停止,
	//需要考虑暂停点等问题
    while(manage->m_state==TIMER_MANAGER_START)
    {
        tm.tv_sec=0;
        tm.tv_usec=DEFULT_INTERVAL*1000;
        start.tv_sec=end.tv_sec;
        start.tv_usec=end.tv_usec;
		//这里用select函数来进行延迟
		//不同系统的延迟函数精度不同,如果需要替换为其他延迟函数,这附近修改下就好
        while(select(0,0,0,0,&tm)<0&&errno==EINTR);
        gettimeofday(&end,0);
		//计算延迟的毫秒数
        delay=(end.tv_sec-start.tv_sec)*1000+(end.tv_usec-start.tv_usec)/1000;        

		//遍历链表的中各个定时器对象,CTimerManager是
		//CTimer的友元类,因此它可以访问CTimer的entry_成员
		pthread_mutex_lock(&manage->m_mutex);
        LIST_FOREACH(item, &(manage->list_), entry_)
        {
			//减少当前定时器的时间量
            item->m_counter<delay?item->m_counter=0:item->m_counter-=delay;
            if(item->m_counter==0)  //若减少到0,则执行回调函数
            {
                if(item->m_func)
                item->m_func(item,item->m_data);

                if(item->m_type==CTimer::TIMER_ONCE)
                {
					/*一次型的,超时,移除,并设为状态CTimer::TIMER_TIMEOUT*/
                    manage->remove_timer_(item);
                    item->m_state=CTimer::TIMER_TIMEOUT;
                }
                else if(item->m_type==CTimer::TIMER_CIRCLE)
                {
					/*循环型的,重置counter就好*/
                    item->m_counter=item->m_interval;
                }
            }
        }

        pthread_mutex_unlock(&manage->m_mutex);
    }
}

    使用示例如下:

#include "timer_manager.h"
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
void func(CTimer * timer, void *data)
{
    printf("hi,%d\n",(int)(data));
}

//随便写的,凑合着看吧。没有CTimerManager::instance()->stop();也没new对象。
//定时器对象可多次start和stop,使用上对暴露的接口没有任何的契约式限制,可随意调用
int main()
{
    CTimerManager::instance()->start();
    CTimer a(1000,func,(void *)1,CTimer::TIMER_CIRCLE);
    CTimer a1(2000,func,(void *)11,CTimer::TIMER_ONCE);
    CTimer a2(3000,func,(void *)12,CTimer::TIMER_ONCE);
    CTimer a3(1000,func,(void *)13,CTimer::TIMER_ONCE);
    
    a.start();
    a1.start();
    a2.start();
    a3.start();
    a.start();
    a1.start();
    a2.start();
    a3.start();

    sleep(1);
    CTimerManager::instance()->dump();
    sleep(1);
    CTimerManager::instance()->dump();
    a.reset(2000);
    a1.stop();
    a3.stop();

    sleep(10);
    return 0;
}

    六、讨论
    (1)精度问题。
精度高,则实时性高,但要求select等待的时间缩短,进而增加对LIST结构的扫描操作。精度低,则实时性差,但会增加定时器线程的睡眠时间,减少对cpu的占用。一般的应用系统,应该尽量降低精度,避免不必要的扫描,对具体系统可考察所用到的所有定时器的实际间隔,在允许的情况下,尽量降低精度,可通过修改代码中的宏实现。为了降低定时器线程对cpu的占有时间,甚有更为粗犷型的定时器模块实现为将延迟时间取list中最小的那个间隔,保证每次延迟时间到都有回调。
    (2)加锁区域问题。这里的定时器模块实现,将定时器对象的时间减少以及函数回调的执行等再同一个临界区内执行,而有的定时器模块实现是在加锁区域执行“时间减少”操作,将减少到0的对象放到另一个超时链表中,解锁后再单独扫描超时链表执行回调操作。很明显,后者缩短了加锁时间,能及时响应其他的线程的定时器对象的start以及stop操作。但是后者对定时器操作的时序性有误差,直观反应就是可能在定时器执行了stop操作以后,仍然会有超时回调发生,特别是回调参数是指针的情况,可能引起难以发现的bug,增加调试困难。在衡量两者的利弊后,本文采用延长加锁时间以保证操作的时序性。因此,在实际的使用,回调函数应尽快返回,另一方面,尽量减少系统内使用的定时器数目,这个主要原因是延迟时间到要扫描LIST,哪种方式都避免不了。

    在start定时器线程的时候,传入精度和误差补偿,一是根据实际需要调整精确度,二是弥补延迟函数的稍许误差,以具有更好的伸缩性和精确度。本文旨在说明定时器模块的内部实现机制,详细细节不再修改了。

    七、一些改进

    上面这个实现每次延时时间到都要扫描所有的定时器对象,效率低下。开始设想的时候,LIST中的定时器对象保存间隔时间段的毫秒值,导致每次延时时间到都要做“时间减少操作”直到减少到零,并且得出不需排序的结论。
    如果其中保存超时的精确时间点,而不是保存时间段,则可以在LIST中根据超时时间点对定时器对象排序,延时时间到,则从链表头扫描定时器对象,取其超时时间点与当前时间点对比,如果小于等于当前时间点,则进行超时处理,否则终止继续扫描,避免不必要的扫描操作。同时插入对象的时候插入到合适的位置,以保持链表的顺序化。
   (1) 改进后的主要数据结构。此次容器结构选择内核数据结构中的TAILQ,因为LIST没有插入尾部操作(当要插入的定时器对象超时时间点大于所有队列中的对象的时候)。
   (2) 新的时间类型操作。另一方面很多地方涉及到对struct timeval结构的操作,这里介绍几个对该结构进行操作的宏,都已经在系统头文件中定义,可以使用函数原型的方式理解就是如下:

timeradd(struct timeval *p1,struct timeval *p2,struct timeval *result);
timersub(struct timeval *p1,struct timeval *p2,struct timeval *result);
timercmp(struct timeval *p1,struct timeval *p2,operator op);

    对struct timespec同样有timespecadd/timespecsub/timespeccmp,另外还有两者的转换宏,使用函数原型的方式理解就是:

TIMEVAL_TO_TIMESPEC(struct timeval *p1,struct timespec *result);
TIMESPEC_TO_TIMEVAL(struct timespec *p1,struct timeval *result);

    如果系统的头文件中找不到,也可以自己实现下,明白这两个结构的细节结构,实现也很简单,这里拿timeradd举例,其它不说了。

#define    timeradd(tvp, uvp, vvp)                        \
    do {                                \
        (vvp)->tv_sec = (tvp)->tv_sec + (uvp)->tv_sec;        \
        (vvp)->tv_usec = (tvp)->tv_usec + (uvp)->tv_usec;    \
        if ((vvp)->tv_usec >= 1000000) {            \
            (vvp)->tv_sec++;                \
            (vvp)->tv_usec -= 1000000;            \
        }                            \
    } while (0)

    (3)实现。和前面的实现相比,改动比较的在add_timer_和process方法。当前add_timer_操作需要顺序扫描插入到合适的位置以保持链表的顺序。当前process的主要代码如下:

while(manager->m_state==TIMER_MANAGER_START){
    tm.tv_sec=manager->m_interval.tv_sec;
    tm.tv_usec=manager->m_interval.tv_usec;
    while(select(0,0,0,0,&tm)<0&&errno==EINTR);
    gettimeofday(&now,0);
    /*加上误差补偿时间*/
    timeradd(&now,&manager->m_repair,&stand);
    pthread_mutex_lock(&manager->m_mutex);
    TAILQ_FOREACH(item, &(manager->list_), entry_){
        /*取修正后的时间逐个和定时器中的超时时间点相比,遇到不超时对象则退出扫描*/
        if(timercmp(&item->m_endtime,&stand,<)){
                 if(item->m_func)
                        item->m_func(item,item->m_data);
             if(item->m_type==CTimer::TIMER_ONCE){
                       manager->remove_timer_(item);
                      item->m_state=CTimer::TIMER_TIMEOUT;
                }
            else if(item->m_type==CTimer::TIMER_CIRCLE){
                    /* 循环性的要保证链表的顺序性,如要重新插入,保存entry_的原因,是执行新的插入后
                       不要影响当前进行的循环 */
                    tmpTimer.entry_=item->entry_;
                    manager->remove_timer_(item);
                    manager->add_timer_(item);
                   item=&tmpTimer;}
                 }
          else break;
        }
        pthread_mutex_unlock(&manager->m_mutex);
    }

    与上一版本相比,该文中的定时器实现要求在定时器模块运行期间不能修改系统实际,上一版本实现则无此限制。
    定时器模块的锁初始化为fastmutex方式,因此在回调函数里注意不能再调用CTimer的start stop reset等方法,以免死锁,如果有调用的需求,可以把锁修改为循环锁recmutex方式。
     这是timer的v2实现,定时器timer在业务线程中执行start的时候,要执行扫描排序操作,导致返回时间延长。后续修改:
    (1) 定时器timer的start操作不再执行扫描操作,而是简单插入队列头同时置一变量表示尚未排序。定时器线程延迟时间到,首先扫描未排序对象,执行排序(一次性timer从尾部扫描对比,循环性从头部扫描对比),再扫描判断是否超时。
    (2) func移动到锁外执行。
    针对windows客户端栈使用定时器不多,并且定时间隔不能受系统时间影响(客户端的系统时间可能被修改),仍然使用v1的实现。

参考文献:
技术系列之定时器:http://www.cppblog.com/CppExplore/MyPosts.html

抱歉!评论已关闭.