现在的位置: 首页 > 综合 > 正文

nginx memcached模块解析

2018年03月21日 ⁄ 综合 ⁄ 共 7638字 ⁄ 字号 评论关闭

1nginx memcached模块
1.1概述
nginx memcached是一个使用内存来为访问页面加速的模块,当客户端请求到达nginx服务器时,nginx会先通过键值(比如说uri),去访问memcached服务器,当能从memcached服务器获取到数据时,会直接将数据封装,返回给客户端,否则,则继续访问相关服务如php,从相关应用获取到内容发送给客户端,同时由相关应用主动将内容写入到memcached服务器,以便下次访问时能起到加速的效果。
nginx memcached模块与memcached服务器之间的交互是通过nginx的upstream机制来进行的。
                                                                                                                  
 应注意官方的memcached模块只有访问读取memcached服务器的能力,而没有将内容写入到memcached服务器的能力,某些第三方模块则有此功能。

1.2配置
一个典型的memcached访问配置如下,此处是nginx.conf里面的:
server {
    listen       80;
    server_name  192.168.3.139;
    root html;
 
    location ~* \.(gif|jpg|jpeg|png|bmp|swf|js|css)$ {
          set $memcached_key $uri;
          default_type     text/html;
          memcached_pass   192.168.3.139:11211;//memchaced服务器在192.168.3.139上面,访问该uri时,先去该服务器去取资源,取不到时跳转到@goto404的分支去处理
          error_page 404 = @goto404; //当发生404错误时,重定向到@goto404去处理
    }
 
    location @goto404 {
          fastcgi_pass   127.0.0.1:9000;
          fastcgi_index  index.php;
          include fastcgi.conf;
          rewrite ^(.*)? /goto.php?q=$1 break;
    }
 
}

1.3源码分析
memcached模块依赖于nginx的upstream模块来实现,其主要处理部分,也都是嵌入到了memcached模块注册在upstream的各个回调函数中,下面用1.0.15版本来进行源码分析:

memcached模块的配置主要是pass_proxy关键字,其相关处理函数是ngx_http_memcached_pass,其中处理流程包括建立upstream对应的数据结构等,但最重要的是下面这句 :

clcf->handler = ngx_http_memcached_handler;
 ngx_http_memcached_handler;回调函数被注册到了请求包处理11个阶段的content phase阶段,在该阶段会去调用该处理函数。
 那么在该处理函数中又做了什么呢,从代码上看主要是初始化upstream机制所用的各个数据结构,设置upstream机制所调用的各个回调函数,为与上游的memcached服务器做连接做好准备。具体代码解析如下:

  static ngx_int_t
ngx_http_memcached_handler(ngx_http_request_t *r)
{
    ngx_int_t                       rc;
    ngx_http_upstream_t            *u;
    ngx_http_memcached_ctx_t       *ctx;
    ngx_http_memcached_loc_conf_t  *mlcf;

    if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_HEAD))) {
        return NGX_HTTP_NOT_ALLOWED;
    }
	//丢弃请求包体,对请求包体并不关心
    rc = ngx_http_discard_request_body(r);

    if (rc != NGX_OK) {
        return rc;
    }

    if (ngx_http_set_content_type(r) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }
	//创建upstream数据结构
    if (ngx_http_upstream_create(r) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u = r->upstream;

    ngx_str_set(&u->schema, "memcached://");
    u->output.tag = (ngx_buf_tag_t) &ngx_http_memcached_module;

    mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);

    u->conf = &mlcf->upstream;
    //下面是设置upstream使用到的各个回调函数
	//该回调函数创建了相关的get请求,该get请求将发往上游服务器
    u->create_request = ngx_http_memcached_create_request;
    u->reinit_request = ngx_http_memcached_reinit_request;
	//处理memcache服务器回传的回应报文头,计算其头部,计算回应报文
	//数据长度
    u->process_header = ngx_http_memcached_process_header;
    u->abort_request = ngx_http_memcached_abort_request;
    u->finalize_request = ngx_http_memcached_finalize_request;

    ctx = ngx_palloc(r->pool, sizeof(ngx_http_memcached_ctx_t));
    if (ctx == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }
	//ctx->rest的初始化在这里,该rest使用的场景是在一次接收中,
	//已经接收到了\r\nEND\r\n的一部分,但未接收完全,用来记录
	//剩下尚未接收的部分
    ctx->rest = NGX_HTTP_MEMCACHED_END;
    ctx->request = r;

    ngx_http_set_ctx(r, ctx, ngx_http_memcached_module);

    u->input_filter_init = ngx_http_memcached_filter_init;
	//该函数用来处理响应包体,把包体buf挂入request的out链表上
    u->input_filter = ngx_http_memcached_filter;
    u->input_filter_ctx = ctx;

    r->main->count++;
	//发起对上游memcached服务器的连接
    ngx_http_upstream_init(r);

    return NGX_DONE;
}

在上面这段代码中,我们主要关注在upstream中设置的create_request ,process_header  ,input_filter 这几个字段所设定的函数:
    ngx_http_memcached_create_request:用于构建发往memcached服务器的请求报文。
    ngx_http_memcached_process_header:处理memcached服务器返回来的相应头部,计算响应体的长度,判断请求访问的状态(是否成功)。
    ngx_http_memcached_filter:将upstream收到的各个响应包挂入reuqest的out_buf中,同时在这里会判断包体的接收是否结束。
  下面会对这三个函数做详细解析,在此之前先对memcached服务器交互报文做个简单了解:

1ngx_http_memcached_create_request:
该函数主要是构建发往memcached服务器的报文,报文格式:
	GET URI \r\n
static ngx_int_t
ngx_http_memcached_create_request(ngx_http_request_t *r)
{
    size_t                          len;
    uintptr_t                       escape;
    ngx_buf_t                      *b;
    ngx_chain_t                    *cl;
    ngx_http_memcached_ctx_t       *ctx;
    ngx_http_variable_value_t      *vv;
    ngx_http_memcached_loc_conf_t  *mlcf;

    mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);

    vv = ngx_http_get_indexed_variable(r, mlcf->index);

    if (vv == NULL || vv->not_found || vv->len == 0) {
       
        return NGX_ERROR;
    }

    escape = 2 * ngx_escape_uri(NULL, vv->data, vv->len, NGX_ESCAPE_MEMCACHED);
    //计算发给memcached服务器的请求的报文长度
    len = sizeof("get ") - 1 + vv->len + escape + sizeof(CRLF) - 1;
	//分配相关buf,chain,用来将数据报文写入到里面
    b = ngx_create_temp_buf(r->pool, len);
    if (b == NULL) {
        return NGX_ERROR;
    }

    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf = b;
    cl->next = NULL;

    r->upstream->request_bufs = cl;
	//写入get 关键字
    *b->last++ = 'g'; *b->last++ = 'e'; *b->last++ = 't'; *b->last++ = ' ';

    ctx = ngx_http_get_module_ctx(r, ngx_http_memcached_module);

    ctx->key.data = b->last;
	//写入url
    if (escape == 0) {
        b->last = ngx_copy(b->last, vv->data, vv->len);

    } else {
        b->last = (u_char *) ngx_escape_uri(b->last, vv->data, vv->len,
                                            NGX_ESCAPE_MEMCACHED);
    }

    ctx->key.len = b->last - ctx->key.data;

    
	//设置报文结束标志\r\n
    *b->last++ = CR; *b->last++ = LF;

    return NGX_OK;
}
    
2 ngx_http_memcached_process_header
static ngx_int_t
ngx_http_memcached_process_header(ngx_http_request_t *r)
{
    u_char                    *p, *len;
    ngx_str_t                  line;
    ngx_http_upstream_t       *u;
    ngx_http_memcached_ctx_t  *ctx;

    u = r->upstream;

    for (p = u->buffer.pos; p < u->buffer.last; p++) {
        if (*p == LF) {
            goto found;
        }
    }

    return NGX_AGAIN;

found:

    *p = '\0';

    line.len = p - u->buffer.pos - 1;
    line.data = u->buffer.pos;

    p = u->buffer.pos;

    ctx = ngx_http_get_module_ctx(r, ngx_http_memcached_module);
	//报文头返回VALUE关键字,表明memcached服务器中有相关数据内容
    if (ngx_strncmp(p, "VALUE ", sizeof("VALUE ") - 1) == 0) {
        p += sizeof("VALUE ") - 1;
        if (ngx_strncmp(p, ctx->key.data, ctx->key.len) != 0) {
            return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }
        p += ctx->key.len;

        if (*p++ != ' ') {
            goto no_valid;
        }

        /* skip flags */

        while (*p) {
            if (*p++ == ' ') {
                goto length;
            }
        }

        goto no_valid;

    length:

        len = p;

        while (*p && *p++ != CR) { /* void */ }
		//计算出相应报文的长度
        r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
        if (r->headers_out.content_length_n == -1) {
            
            return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        u->headers_in.status_n = 200;
        u->state->status = 200;
        u->buffer.pos = p + 1;

        return NGX_OK;
    }
	//当返回END\x0d时,表明memcached服务器中没有该内容
    if (ngx_strcmp(p, "END\x0d") == 0) {
        u->headers_in.status_n = 404;
        u->state->status = 404;

        return NGX_OK;
    }

no_valid:

    return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}

3 ngx_http_memcached_filter
 static ngx_int_t
ngx_http_memcached_filter(void *data, ssize_t bytes)
{
    ngx_http_memcached_ctx_t  *ctx = data;

    u_char               *last;
    ngx_buf_t            *b;
    ngx_chain_t          *cl, **ll;
    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;
    b = &u->buffer;
	//该分支意味着有部分的ngx_http_memcached_end在上次尚未接收
	//完毕,在此次进行接收
    if (u->length == ctx->rest) {
		
        if (ngx_strncmp(b->last,
                   ngx_http_memcached_end + NGX_HTTP_MEMCACHED_END - ctx->rest,
                   bytes)
            != 0)
        {
            u->length = 0;
            ctx->rest = 0;
            return NGX_OK;
        }
        u->length -= bytes;
        ctx->rest -= bytes;

        return NGX_OK;
    }

    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;
    }

    cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf->flush = 1;
    cl->buf->memory = 1;

    *ll = cl;
	//b是upstream当前用来接收数据的buf.
    last = b->last;
	//数据的起始位置为b->last的地址
    cl->buf->pos = last;
	//重新设定b->last的值,以用来接收后续数据时使用
    b->last += bytes;
    cl->buf->last = b->last;
    cl->buf->tag = u->output.tag;
	//在这里应该是没有接收完毕相关数据
    if (bytes <= (ssize_t) (u->length - NGX_HTTP_MEMCACHED_END)) {
        u->length -= bytes;
        return NGX_OK;
    }
	
	//找到\r\nEND\r\n的起始位置
    last += u->length - NGX_HTTP_MEMCACHED_END;
	//比较一下接收到的数据是否结尾
    if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) {
        ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                      "memcached sent invalid trailer");
    }
	//到此应该表示数据已经接收完毕,当然有可能有部分
	//ngx_http_memcached_end数据尚未接收完毕
	//ctx->rest里面记录了尚未被接收完全数据,进行调整
    ctx->rest -= b->last - last;
	//重新更新一下下次接收数据用到的地址,这也表示结束
	//符不用传递到下游客户端
    b->last = last;
    cl->buf->last = last;
    u->length = ctx->rest;

    return NGX_OK;
}

在memcached命中的情况下,上面三个函数基本就可以搞定一切了,但,但,但是当服务器返回了END\x0d字符串时,表明memcached服务器中并没有该内容,看相关代码,nginx打算返回404错误了。这时候一般会使用error_page将其进行重定向,定向到真正的服务器的真正资源上面。这些工作是在ngx_http_upstream_finalize_request函数中去做的,该函数比较复杂,处理的情况很多。以404重定向为例,其函数调用顺序是:
ngx_http_upstream_finalize_request--->ngx_http_finalize_request----->ngx_http_sepecial_response_handler--->ngx_http_send_error_page---->ngx_http_internal_redirect--->ngx_http_handler

在函数ngx_http_handler中:
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
r->phase_handler = cmcf->phase_engine.server_rewrite_index;
 会将该请求跳转到11个阶段中的url重定向阶段,继续执行。

抱歉!评论已关闭.