现在的位置: 首页 > 综合 > 正文

Nginx进程间通信

2014年07月04日 ⁄ 综合 ⁄ 共 4112字 ⁄ 字号 评论关闭

在了解Nginx进程之间通信的方法之前,首先要了解的一个结构是ngx_process_t,该结构用来保存fork出来的worker进程的信息(src/os/unix.process.h):

//对worker进程的一些数据的封装
typedef struct {
    ngx_pid_t           pid;   //进程ID
    int                 status;    //进程的状态
    ngx_socket_t        channel[2];   //通过socketpair创建的两个句柄,用它来进行通信

    ngx_spawn_proc_pt   proc;   //该wrok的进程执行函数
    void               *data;
    char               *name;

    unsigned            respawn:1;
    unsigned            just_spawn:1;
    unsigned            detached:1;
    unsigned            exiting:1;
    unsigned            exited:1;
} ngx_process_t;

在前面的文章中我们已经提到了一个全局变量ngx_processes,其实它就是上面的结构的数组,用来保存所有worker进程的信息。在Nginx中,通过socketpair的方式进行通信,因而我们可以在上述结构中看到一个channel域,用来保存socketpair的socket描述符。

在看具体的通信之前,应该先看一下通信的数据结构(src/os/unix.Ngx_chanel.h):

//master每次向worker进程发送命令的数据封装
typedef struct {
     ngx_uint_t  command;   //发送的指令
     ngx_pid_t   pid;    //发送到work进程的进程ID号码
     ngx_int_t   slot;    //发送信号的work进程的slot号码,即在全局变量ngx_proecsses中的索引号码
     ngx_fd_t    fd;    //master传给worker进程的文件描述符
} ngx_channel_t;


上述的结构就是每次master进程给worker进程发送的命令格式。好了接下来可以具体看如何实现通信的了。

这里要先从函数ngx_start_worker_processes说起了,看如下部分代码:

 for (i = 0; i < n; i++) {

        cpu_affinity = ngx_get_cpu_affinity(i);
//在这里创建work进程,ngx_worker_process_cycle为该woker进程的执行函数,执行函数的数据位空,
        ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
                          "worker process", type);
//初始化ch的值,ngx_process_slot是刚刚fork出来的进程在ngx_processes数组中的位置
        ch.pid = ngx_processes[ngx_process_slot].pid;
        ch.slot = ngx_process_slot;
//将socketpair的chanel0,也就是写端的描述符传递给其他的进程
        ch.fd = ngx_processes[ngx_process_slot].channel[0];
//向其他的worker传递channel
        ngx_pass_open_channel(cycle, &ch);
    }


首先函数通过ngx_spawn_process函数创建worker子进程,然后在全局数组ngx_processes中设置改worker进程的相关信息,并将该worker进程创建的socketpair描述符的chanel[0]传递给其余已经创建的worker进程。这里就需要具体看ngx_spawn_process函数了。

 for (s = 0; s < ngx_last_process; s++) {
            if (ngx_processes[s].pid == -1) {
                break;
            }
        }


首先用上述代码为即将fork的worker进程在全局数组ngx_processes占一个坑,用s参数来暂时存储这个坑的位置,接着:

if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "socketpair() failed while spawning \"%s\"", name);
            return NGX_INVALID_PID;
        }


调用socketpair函数创建socketpair,并将其保存在chanel中,其定义见上面介绍的结构。接下来的代码就是一些设置刚刚创建的socketpair,例如将它们弄成非阻塞的模式等。这里又几句代码比较重要:

		//当前fork出来的worker进程的socketpair会用到的描述符,1代表读端,待会执行fork之后,子进程会自动继承这个变量
        ngx_channel = ngx_processes[s].channel[1];

    } else {
        ngx_processes[s].channel[0] = -1;
        ngx_processes[s].channel[1] = -1;
    }

    ngx_process_slot = s;  //即在ngx_processes数组中该进程存放的位置,用来保存当前进程的位置,是一个全局的变量,待会fork出来的子进程也会继承

这里用ngx_channel变量暂时保存了socketpair的第二个(worker子进程将会用它来读取信息),然后还用ngx_process_slot变量才存储了刚刚在全局数组ngx_spawn_process为即将fork的子进程占的坑,这些待会fork后子进程将会继承这些变量。并且master进程也会用到这几个变量。然后再worker进程的初始化函数ngx_worker_process_init中我们可以看到如下代码:

    for (n = 0; n < ngx_last_process; n++) {
//无效的process,直接跳过
        if (ngx_processes[n].pid == -1) {
            continue;
        }
//该进程即是当前的子进程,直接跳过,ngx_process_slot该变量时从父进程也就是master进程继承过来的全局变量
        if (n == ngx_process_slot) {
            continue;
        }

        if (ngx_processes[n].channel[1] == -1) {
            continue;
        }
		//1为读端口,0为写端口
//同理,ngx_processes数组也是从master进程继承过来的,所以这里需要关闭其余进程的读端的socketpair,只是保留写端就可以了
        if (close(ngx_processes[n].channel[1]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "close() channel failed");
        }
    }
//关闭自己进程的socketpair的写端,只是保留读端就可以了
    if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "close() channel failed");
    }

#if 0
    ngx_last_process = 0;
#endif
//给chanel添加读事件的处理函数,ngx_channel该参数是chanel【0】,也就是当前worker进程的socketpair的读端口描述符
    if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
                              ngx_channel_handler)
        == NGX_ERROR)
    {
        /* fatal */
        exit(2);
    }


这些代码主要是遍历所有以前已经建立好的worker进程,然后在奔worker进程中将其与的worker进程的socketpair的读端关闭,还要将自己worker进程的写端关闭,然后再将自己的读端注册到事件处理中去,事件处理函数为ngx_channel_handler,其实该函数也很简单,这里就不细讲了。

接下来可以回归master进程的ngx_start_worker_processes函数了,如下代码:

ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
                          "worker process", type);
//初始化ch的值,ngx_process_slot是刚刚fork出来的进程在ngx_processes数组中的位置
        ch.pid = ngx_processes[ngx_process_slot].pid;
        ch.slot = ngx_process_slot;
//将socketpair的chanel0,也就是写端的描述符传递给其他的进程
        ch.fd = ngx_processes[ngx_process_slot].channel[0];
//向其他的worker传递channel
        ngx_pass_open_channel(cycle, &ch);


这里创建一个chanel,也就是Nginx进程通信的数据结构,传递的数据上面已经说的清楚了,最后调用ngx_pass_open_channel函数给所有已经创建的worker进程,然他们更新刚刚创建的worker进程的socketpair信息,这个函数还是相当简单的,具体的就不细写了,有兴趣可以自己看。


这样,master进程就可以通过socketpair来与worker进程进行通信了,而且worker进程之间也可以通过其进行通信。(在master进程以及worker进程中都有全局数组ngx_processes,这些socketpair的信息都会保存在其中,通信的时候用就可以了,具体通信的处理又会涉及到event模块的内容,也就是ngx_channel_handler函数,以后再说吧

抱歉!评论已关闭.