现在的位置: 首页 > 综合 > 正文

Memcached drive_machine 函数分析_process_update_command

2013年01月28日 ⁄ 综合 ⁄ 共 4666字 ⁄ 字号 评论关闭

关于Memcached的文章并不少,想更多了解Memcached的源代码,请看参考文献。

这些文献对于drive_machine()函数的描述比较简略,我们只是把未展开的分析展开。补充几篇文章都没有细化的地方。

memcached的核心使用的是libevent来处理网络IO事件,libevent需要指定一个主循环函数,
由这个函数对IO事件进行集中处理。

 

conn_new()用于创建一个连接,并通过libevent的event_set()把memcached自己的IO事件处理程序drive_machine()注册到libevent上,这个函数实际上用于处理一次连接当中的事件。当事件为conn_read的时候,进一步调用try_read_command(),process_command()两个方法,其中process_command()方法开始,和我们一般的命令行程序差不多,到这一步比较好理解,根据memcached协议中规定的格式,对item进行添加,更新,清除等进行处理。

drive_machine()实际上处理几个事件,这些事件被定义在conn_states里,这是一个枚举变量:

一个连接首先发生的是conn_listening事件,等待客户端进一步发送数据,当客户端发送了如put,get等命令,conn_read()负责读取request中的命令行,处理中如果遇到问题,会设置con_swallow()对错误进行输出,当memcached处理结束,通过conn_write()函数返回response信息给客户端。

 

所有都处理完毕设置conn_closing事件关闭当前连接。

更细节的内容,可以通过本文继续读Memcached的该方法的源代码。

 

Memcached命令解析

上一篇写到Memcached从try_read_command()开始进入命令解析阶段。这个函数只有一个参数c,也就是当前连接的数据结构。

/*
* if we have a complete line in the buffer, process it.
*/
static int try_read_command(conn *c) {

}

如果成功的在缓冲里面放入一行,就调用process_command()对命令进行解析。

在try_read_command()当中一定要了解,conn结构中的一些定义,如:

 

rsize可能等于以下两个常量之一

#define DATA_BUFFER_SIZE 2048
#define UDP_READ_BUFFER_SIZE 65536

2K 或者 64K 当使用的是UDP模式的时候为 64K

整体上try_read_command()并不长

[code]

/*
* if we have a complete line in the buffer, process it.
*/
static int try_read_command(conn *c) {
    char *el, *cont;

    assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));

    if (c->rbytes == 0)
        return 0;
    el = memchr(c->rcurr, '\n', c->rbytes);
    if (!el)
        return 0;
    cont = el + 1;
    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
        el--;
    }
    *el = '\0';

    assert(cont <= (c->rcurr + c->rbytes));

    process_command(c, c->rcurr);

    c->rbytes -= (cont - c->rcurr);
    c->rcurr = cont;

    assert(c->rcurr <= (c->rbuf + c->rsize));

    return 1;
}

[/code]

在request请求中查找是否存在\r和\n,如果存在,就判定为这次request当中的command头,

例如:

set key1 0x0000 0x12113914 0x0000005 \r\n

细节可以看下图:

因此try_read_command()的作用就是找到命令行的结尾。

process_command()函数调用tokenize_command()方法把缓冲中的命令具体解析出来,最多不超过7个参数,因为MAX_TOKENS是8,去掉cmd本身还剩下7个,这里不贴出所有代码,我们只看一下关键的:

[code]

    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

        process_get_command(c, tokens, ntokens, false);

    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE))

    ...

[/code]

依据命令和参数的不同,实际上一共预备了几个子函数,他们是:

process_get_command() 获取信息
process_update_command() 更新信息
process_arithmetic_command()自增自减运算
process_delete_command()删除信息类

 

Memcached命令的执行过程分析

Memcached常用的命令有这样几个add、set、get、delete四个,最新的还有cas,gets等命令。

在Memcached当中add、set都属于更新操作,因此都在process_update_command()中进行,以set命令进行分析如下:

如果存在原有内容set进行更新,如果没有则增加,实际上无论add还是set都先申请内存,对于set命令多了两个过程item_unlink()和item_remove(),接着让conn去处理nread事件,在nread事件中通过complate_nread()调用store_item()方法存储item。

我还画了另外一张图,基本可以看出Memcached的item存储情况:

 

Memcached里面既有key-val模式必不可少的hasktable结构,也有为了有效利用内存的slabs结构,关于slabs结构可以看参考资料《memcached全面剖析–2.理解memcached的内存存储》。

在hashtable里记录的是item的指针,item实际存储在slabs里,并且每个slabs中的chunk是等长的。do_item_unlink()是从hashtable结构中移除当前指针,do_item_remove()也不释放内存,而是在item->it_flags上进行标记,由LRU机制去实际处理。关于LRU可以看参考资料,it_flags一共有三种状态:

#define ITEM_LINKED 1  
#define ITEM_DELETED 2      
#define ITEM_SLABBED 4

ITEM_LINKED 正在使用中,ITEM_DELETED删除了没有过期的item,所以放到todelete队列中等待删除。 ITEM_SLABBED已经被删除后被标记。

上面是一个简单的轮廓,下面我们具体分析一下代码。

process_update_command()在执行命令之前,他首先要获得一些有效信息,如下面列的:

key - 键
nkey - 键长
flags - 标记位(在PHP当中使用flags来区别数据是否进过压缩)
exptime - 过期时间
vlen - 内容的长度
req_cas_id - 如果是cas模式,这里是版本号

获取完这些信息以后,add、set操作都会先申请内存,只不过set要多一步申请完内存以后,把原来的数据擦掉,然后把最新的内容存储到新申请的item里,可以具体看下面这段代码:

[code]

    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);

    if (it == 0) {
        if (! item_size_ok(nkey, flags, vlen + 2))
            out_string(c, "SERVER_ERROR object too large for cache");
        else
            out_string(c, "SERVER_ERROR out of memory storing object");
        /* swallow the data line */
        c->write_and_go = conn_swallow;
        c->sbytes = vlen + 2;

        /* Avoid stale data persisting in cache because we failed alloc.
         * Unacceptable for SET. Anywhere else too? */
        if (comm == NREAD_SET) {
            it = item_get(key, nkey);
            if (it) {
                item_unlink(it);
                item_remove(it);
            }
        }

        return;
    }

[/code]

这段代码实际上之包含了两个步骤,new 一块内存,如果是set,释放原有的item,我第一次看这块代码的时候,就想找到实际store的代码,没找到,实际上他需要另外一个过程去处理,如下代码:

[code]

    c->item = it;
    c->ritem = ITEM_data(it);
    c->rlbytes = it->nbytes;
    c->item_comm = comm;
    conn_set_state(c, conn_nread);

[/code]

这时候回到driver_machine()的switch里,执行case conn_nread这个分支,进一步执行complete_nread()函数,这里相当于一个save()过程,把当前conn当中的item保存下来,也就是store_item()函数。

保存完毕之后,向客户端发出response告知结束。

参考资料

memcached全面剖析–2.理解memcached的内存存储

memcached 源码修改笔记

 

 

 

 

抱歉!评论已关闭.