现在的位置: 首页 > 综合 > 正文

Redis2.2.2源码学习——aeEvent事件轮询

2018年02月18日 ⁄ 综合 ⁄ 共 4477字 ⁄ 字号 评论关闭

背景       

        Redis的事件主要分为文件事件和定时器事件,作者对这两种事件处理的高端之处在于预先计算最近一个要超时的定时器距离当前的事件间隔,在这个时间间隔内调用poll函数处理文件事件,之后再处理定时器事件。

Redis在处理请求时完全是用单线程异步模式,通过epoll监听所有连接的读写事件,并通过相应的响应函数处理。Redis使用这样的设计来满足需求,很大程度上因为它服务的是高并发的短连接,且请求处理事件非常短暂。这个模型下,一旦有请求阻塞了服务,整个Redis的服务将受影响。Redis采用单线程模型可以避免系统上下文切换的开销。” (引用:Redis源代码分析.pdf
 邹雨晗)

事件轮询流程

源码分析

Version Redis2.2.2


/* File event structure */
typedef struct aeFileEvent {
    int mask;				///* 文件事件类型 读/写 one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;		
    aeFileProc *wfileProc;	
    void *clientData;
} aeFileEvent;

/* Time event structure */
typedef struct aeTimeEvent {
    long long id;	/* time event identifier. *///由aeEventLoop.timeEventNextId进行管理
    long when_sec;	/* seconds */
    long when_ms;	/* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd;		//已出现的事件的文件号对应的事件描述在aeEventLoop.events[]中的下标
    int mask;	//文件事件类型 AE_WRITABLE||AE_READABLE
} aeFiredEvent;

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;						//监听的最大文件号
    long long timeEventNextId;		//定时器事件的ID编号管理(分配ID号所用)
    aeFileEvent events[AE_SETSIZE]; //注册的文件事件,这些是需要进程关注的文件
    aeFiredEvent fired[AE_SETSIZE]; //poll结果,待处理的文件事件的文件号和事件类型
    aeTimeEvent *timeEventHead;		//定时器时间链表
    int stop;						//时间轮询是否结束?
    void *apidata; //文件事件的轮询数据和结果数据:poll; 三种轮询方式:epoll(linux),select(windows),kqueue
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

【三种文件事件轮询对已的apidata】
//select()
typedef struct aeApiState {
    fd_set rfds, wfds;	//文件轮询的fdset
    /* We need to have a copy of the fd sets as it's not safe to reuse
     * FD sets after select(). */
    fd_set _rfds, _wfds;//作为select中的参数(临时数据)
} aeApiState;
//kqueue()
typedef struct aeApiState {
    int kqfd;
    struct kevent events[AE_SETSIZE];
} aeApiState;
//epoll()
typedef struct aeApiState {
    int epfd;
    struct epoll_event events[AE_SETSIZE];
} aeApiState;

------------------------------------------------------------

【事件处理基本过程】

main
	1>initserver():
		信号注册
		服务器参数初始化:server(全局变量)
			。。。
			aeCreateEventLoop()
				构建aeEventLoop对象eventLoop
				初始化eventloop参数
					aeApiCreate(eventLoop)
						初始化eventLoop->apidata : 三种文件事件轮询对应不同的数据结构
			aeCreateTimeEvent()
				构建aeTimeEvent对象te
				aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
					设置te的超时时间 = currenttime + milliseonds(延时 1ms 是不是有点太小了?!)
				将te插入到eventLoop->timeEventHead链表中
			。。。
	。。。
	2>aeSetBeforeSleepProc(server.el,beforeSleep);

    3>aeMain(server.el) //以死循环的方式进入轮询,知道stop被置为非0
		while (!eventLoop->stop) {
            eventLoop->beforesleep(eventLoop);			//前期处理,回调函数
			aeProcessEvents(eventLoop, AE_ALL_EVENTS);	//事件处理(见下面)
		}
	
    4>aeDeleteEventLoop(server.el);

	return
endOfMain

---------------------------------------------------------------------

【事件处理过程:aeProcessEvents】

Redis的事件主要分为文件事件和定时器事件,作者对这两种事件处理的高端之处在于预先计算最近一个要超时的定时器距离当前的事件间隔,
在这个时间间隔内调用poll函数处理文件事件,之后再处理定时器事件。

aeProcessEvents
	1>shortest = aeSearchNearestTimer(eventLoop)
		遍历定时器链表eventLoop->timeEventHead找到最近一个要超时的定时器
		返回这个定时器时间
	2>计算当前时间与shortest时间的差值:tvp,如果tvp小于0,那么设置为0
	//计算出tvp,就相当于知道下一个定时器的超时时间间隔,在这个时间间隔内,我们可以用来检测其他的事件,比如文件事件
	3>numevents = aeApiPoll(eventLoop, tvp);	//文件事件的轮询,时间间隔为最近一个定时器的超时时间差tvp
		aeApiPoll有三种:select/epoll/kqueue
		以select为例:
			1.以eventLoop->apidata(记录了要poll的文件符)作为参数,调用select函数:
				retval = select(eventLoop->maxfd+1,&state->_rfds,&state->_wfds,NULL,tvp);//注意其中的tvp是select的最长阻塞时间。。。
			2.根据select的调用结果,甄选文件事件:
				//原则为:文件事件注册了读/写,并且相应文件的轮询结果fdset的读/写被置位,那么将该文件号添加到eventLoop->fired数组中,并设置相应的mask掩码
				for (j = 0; j <= eventLoop->maxfd; j++) {
					int mask = 0;
					aeFileEvent *fe = &eventLoop->events[j];

					if (fe->mask == AE_NONE) continue;
					if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
						mask |= AE_READABLE;
					if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
						mask |= AE_WRITABLE;
					eventLoop->fired[numevents].fd = j;
					eventLoop->fired[numevents].mask = mask;
					numevents++;
				}
	//通过aeApiPoll(eventLoop, tvp),待处理的文件事件的文件号都保存在了eventLoop->fired[numevents].fd中
	4>依次取出eventLoop->fired[numevents]中的fd和mask进行相应的处理:
		for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
			int rfired = 0;

			if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
				//限制同时对同一个文件的读写:未度过或读写的是不同的文件
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
		}
	5>最后,在执行计时器事件
		processTimeEvents(eventLoop)
			遍历计时器链表,计算计时器事件和当前时间的差值,如果已超时(<=0)那么:
			1.调用该计时器的hander
				retval = te->timeProc(eventLoop, id, te->clientData);
			2.判定retval // 有的定时器是重复使用的,有的是一次性的,关键是hander的返回值:
				if (retval != AE_NOMORE) {
					aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);//更新定时器时间(重用)
				} else {
					aeDeleteTimeEvent(eventLoop, id);	//删除定时器
				}
	return
endOf_aeProcessEvents
		
    

				

后记

今天主要看了和事件轮询相关的代码,感觉作者的想法挺好的,就是对于定时器事件来说,会有较大的事件误差。p.s. 终于找到基于Linux的C源码,啃一啃可以加深对linux系统调用的认识,同时学习Redis的K-V存储,两全其美,继续加油!

抱歉!评论已关闭.