现在的位置: 首页 > 综合 > 正文

Redis2.2.2源码学习——Server&Client链接的建立以及相关Event

2018年02月18日 ⁄ 综合 ⁄ 共 5591字 ⁄ 字号 评论关闭
文章目录

1背景

        个人理解,Redis将各种任务以事件的方式处理。在Redis中,是以单个进程进行事件轮询:(http://blog.csdn.net/ordeder/article/details/12791359)。事件就类似于linux内核的调度schedule()。而与系统不同之处是:OS调度的基本单位是进程,而Redis的“调度单位”是事件(不断触发已发生的事件的hander)。Redis的任务的思想是一切皆“事件”。
        比较典型的时间事件:serverCorn,服务器定时触发该事件进行检查,主要判定server与client的连接、还有hash是否扩容,是否进行rehash等等维护性的工作。这些操作有点类似守护进程。(http://blog.csdn.net/ordeder/article/details/12836017)
        同样,在Server端和Client的通行是以文件事件进行管理的。Redis在initServer中就创建用于监听client连接请求的ipfd,并在该文件符上建立文件事件,用于处理客户端的连接请求。而且,事件的处理hander是创建cfd(accept),并在cfd上同样建立文件事件,用于读取客户端的具体命令。详见下图。

图.1. Redis中Server和User建立链接(图中的client是服务器端用于描述与客户端的链接相关的信息)

2 Server&Client建立链接的事件处理分析

2.1 涉及的数据结构:

redisClient描述了服务器的状态信息,redisClient记录了与客户端建立交互的信息

-----------------------服务器数据结构-------------------------------

/* Global server state structure */
struct redisServer {
    ...
    int ipfd;			//server listen fd
    ...
    list *clients;		//client list
    ...
    aeEventLoop *el;	//事件轮询
	...
};

typedef struct redisClient {
    int fd;				//与客户端通信的fd
    ...
    sds querybuf;		//客户端请求的串
    ...
} redisClient;

图2简单描绘了server和client在建立链接过程中所涉及的操作。红色部分表示建立链接的过程。黑色部分表示在事件创建过程中所涉及的操作。而蓝色部分表示创建的相关事件。

图.2. Redis Server&Client链接的建立时相关Event的建立(图中的user代表用户端,而client是服务器端用于描述与客户端的链接相关的信息)

2.2 源码如下:

1.服务器初始化中,建立监听事件

void initServer() {
	...
	//Server监听套接字的创建
	if (server.port != 0) 
        server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
	...
	/*为Server的监听套接字ipfd建立文件事件,该事件的处理函数为(acceptTcpHandler)
	创建用于与客户机子建立连接的clfd,以及建立对应记录客户端状态的client数据对象*/
	if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
	...
}	

2.监听事件的建立

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
	//ipfd建立一个FileEvent
    if (fd >= AE_SETSIZE) return AE_ERR;
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
	//设置为acceptTcpHandler
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

3.事件处理过程&建立与客户端的连接

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	aeSearchNearestTimer(eventLoop);
	...set tvp
	numevents = aeApiPoll(eventLoop, tvp);
	for (j = 0; j < numevents; j++) {
		...
		if (fe->mask & mask & AE_READABLE) {
			rfired = 1;
			//acceptTcpHandler
			fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        }
        ...
	}
	...
	processTimeEvents(eventLoop);
}

4.服务器建立与客户端的链接cfd

/*作为ipfd监听事件的处理函数*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[128];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
	//建立cfd,cfd负责与客户端机子进行通信
    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
    if (cfd == AE_ERR) {
        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
        return;
    }
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
	//建立记录client状态的数据对象
    acceptCommonHandler(cfd);
}

//服务器端accept,建立clfd负责与客户端进行通信
int anetTcpAccept(char *err, int s, char *ip, int *port) {
    int fd;
    struct sockaddr_in sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
        return ANET_ERR;

    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
    if (port) *port = ntohs(sa.sin_port);
    return fd;
}

5.在cfd上创建文件事件:hander为读取客户端请求

//建立clfd所对应的客户端对象redisClient
static void acceptCommonHandler(int fd) {
    redisClient *c;
	/*创建redisClient客户端,同时建立clfd的监听事件*/
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,"Error allocating resoures for the client");
        close(fd); /* May be already closed, just ingore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in nonblocking
     * mode and we can send an error for free using the Kernel I/O */
    if (server.maxclients && listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        freeClient(c);
        return;
    }
    server.stat_numconnections++;
}

--------------------------------------------------------------------------------
/*作为与客户端机子建立连接的套接字clfd,我们需要在这个套接字上监听客户端发来的数据
所以,和服务器一样,需要在clfd上建立监听事件。这个监听事件是在createClient总完成
的(acceptCommonHandler >>  createClient)。*/

redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    c->bufpos = 0;

    anetNonBlock(NULL,fd);//非阻塞
    anetTcpNoDelay(NULL,fd);//非延时
    if (!c) return NULL;
	/*建立clfd的文件事件,事件处理函数为readQueryFromClient()
	readQueryFromClient负责读取客户端机子发送过来的命令。
	下文将具体分析命令处理函数readQueryFromClient*/
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
        readQueryFromClient, c) == AE_ERR)
    {
        close(fd);
        zfree(c);
        return NULL;
    }

    selectDb(c,0);
    c->fd = fd;
    c->querybuf = sdsempty();
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;
    c->multibulklen = 0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->lastinteraction = time(NULL);
    c->authenticated = 0;
    c->replstate = REDIS_REPL_NONE;
    c->reply = listCreate();
    listSetFreeMethod(c->reply,decrRefCount);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->bpop.keys = NULL;
    c->bpop.count = 0;
    c->bpop.timeout = 0;
    c->bpop.target = NULL;
    c->io_keys = listCreate();
    c->watched_keys = listCreate();
    listSetFreeMethod(c->io_keys,decrRefCount);
    c->pubsub_channels = dictCreate(&setDictType,NULL);
    c->pubsub_patterns = listCreate();
    listSetFreeMethod(c->pubsub_patterns,decrRefCount);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
	//将客户端c添加到server中的clinents链表
    listAddNodeTail(server.clients,c);
    initClientMultiState(c);
    return c;
}

3. 总结

Redis 对任务的处理是以事件为单位的。可以说,事件是Redis的调度单元。Redis是单进程进行事件轮询,所以事件是采用流水的方式进行执行的,这样的好处是不要进行大量的进程切换。他的弊端是,只适合处理小任务。试下如果一个事件要处理大量的事件,那么在其他待处理的事件必然需要等待,这样会影响吞吐量。其实,Redis在对dict表进行扩容的rehash阶段,就需要进行大量的数据拷贝,这是在serverCorn事件中进行的。作者就采用一个hash表分时段的方式进行拷贝,对每次拷贝分配固定的时间,如果时间超出,则暂停拷贝,等待下一次serverCorn事件被触发后继续拷贝。这就对系统的吞吐性能进行了优化,在事件轮询编程中是值得学习的。
以上是鄙人学习源码的一点心得,如有不当之处,欢迎批评指正。

原文链接:http://blog.csdn.net/ordeder/article/details/13998855

抱歉!评论已关闭.