在了解Nginx进程之间通信的方法之前,首先要了解的一个结构是ngx_process_t,该结构用来保存fork出来的worker进程的信息(src/os/unix.process.h):
//对worker进程的一些数据的封装 typedef struct { ngx_pid_t pid; //进程ID int status; //进程的状态 ngx_socket_t channel[2]; //通过socketpair创建的两个句柄,用它来进行通信 ngx_spawn_proc_pt proc; //该wrok的进程执行函数 void *data; char *name; unsigned respawn:1; unsigned just_spawn:1; unsigned detached:1; unsigned exiting:1; unsigned exited:1; } ngx_process_t;
在前面的文章中我们已经提到了一个全局变量ngx_processes,其实它就是上面的结构的数组,用来保存所有worker进程的信息。在Nginx中,通过socketpair的方式进行通信,因而我们可以在上述结构中看到一个channel域,用来保存socketpair的socket描述符。
在看具体的通信之前,应该先看一下通信的数据结构(src/os/unix.Ngx_chanel.h):
//master每次向worker进程发送命令的数据封装 typedef struct { ngx_uint_t command; //发送的指令 ngx_pid_t pid; //发送到work进程的进程ID号码 ngx_int_t slot; //发送信号的work进程的slot号码,即在全局变量ngx_proecsses中的索引号码 ngx_fd_t fd; //master传给worker进程的文件描述符 } ngx_channel_t;
上述的结构就是每次master进程给worker进程发送的命令格式。好了接下来可以具体看如何实现通信的了。
这里要先从函数ngx_start_worker_processes说起了,看如下部分代码:
for (i = 0; i < n; i++) { cpu_affinity = ngx_get_cpu_affinity(i); //在这里创建work进程,ngx_worker_process_cycle为该woker进程的执行函数,执行函数的数据位空, ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL, "worker process", type); //初始化ch的值,ngx_process_slot是刚刚fork出来的进程在ngx_processes数组中的位置 ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; //将socketpair的chanel0,也就是写端的描述符传递给其他的进程 ch.fd = ngx_processes[ngx_process_slot].channel[0]; //向其他的worker传递channel ngx_pass_open_channel(cycle, &ch); }
首先函数通过ngx_spawn_process函数创建worker子进程,然后在全局数组ngx_processes中设置改worker进程的相关信息,并将该worker进程创建的socketpair描述符的chanel[0]传递给其余已经创建的worker进程。这里就需要具体看ngx_spawn_process函数了。
for (s = 0; s < ngx_last_process; s++) { if (ngx_processes[s].pid == -1) { break; } }
首先用上述代码为即将fork的worker进程在全局数组ngx_processes占一个坑,用s参数来暂时存储这个坑的位置,接着:
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed while spawning \"%s\"", name); return NGX_INVALID_PID; }
调用socketpair函数创建socketpair,并将其保存在chanel中,其定义见上面介绍的结构。接下来的代码就是一些设置刚刚创建的socketpair,例如将它们弄成非阻塞的模式等。这里又几句代码比较重要:
//当前fork出来的worker进程的socketpair会用到的描述符,1代表读端,待会执行fork之后,子进程会自动继承这个变量 ngx_channel = ngx_processes[s].channel[1]; } else { ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; //即在ngx_processes数组中该进程存放的位置,用来保存当前进程的位置,是一个全局的变量,待会fork出来的子进程也会继承
这里用ngx_channel变量暂时保存了socketpair的第二个(worker子进程将会用它来读取信息),然后还用ngx_process_slot变量才存储了刚刚在全局数组ngx_spawn_process为即将fork的子进程占的坑,这些待会fork后子进程将会继承这些变量。并且master进程也会用到这几个变量。然后再worker进程的初始化函数ngx_worker_process_init中我们可以看到如下代码:
for (n = 0; n < ngx_last_process; n++) { //无效的process,直接跳过 if (ngx_processes[n].pid == -1) { continue; } //该进程即是当前的子进程,直接跳过,ngx_process_slot该变量时从父进程也就是master进程继承过来的全局变量 if (n == ngx_process_slot) { continue; } if (ngx_processes[n].channel[1] == -1) { continue; } //1为读端口,0为写端口 //同理,ngx_processes数组也是从master进程继承过来的,所以这里需要关闭其余进程的读端的socketpair,只是保留写端就可以了 if (close(ngx_processes[n].channel[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } } //关闭自己进程的socketpair的写端,只是保留读端就可以了 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "close() channel failed"); } #if 0 ngx_last_process = 0; #endif //给chanel添加读事件的处理函数,ngx_channel该参数是chanel【0】,也就是当前worker进程的socketpair的读端口描述符 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); }
这些代码主要是遍历所有以前已经建立好的worker进程,然后在奔worker进程中将其与的worker进程的socketpair的读端关闭,还要将自己worker进程的写端关闭,然后再将自己的读端注册到事件处理中去,事件处理函数为ngx_channel_handler,其实该函数也很简单,这里就不细讲了。
接下来可以回归master进程的ngx_start_worker_processes函数了,如下代码:
ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL, "worker process", type); //初始化ch的值,ngx_process_slot是刚刚fork出来的进程在ngx_processes数组中的位置 ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; //将socketpair的chanel0,也就是写端的描述符传递给其他的进程 ch.fd = ngx_processes[ngx_process_slot].channel[0]; //向其他的worker传递channel ngx_pass_open_channel(cycle, &ch);
这里创建一个chanel,也就是Nginx进程通信的数据结构,传递的数据上面已经说的清楚了,最后调用ngx_pass_open_channel函数给所有已经创建的worker进程,然他们更新刚刚创建的worker进程的socketpair信息,这个函数还是相当简单的,具体的就不细写了,有兴趣可以自己看。
这样,master进程就可以通过socketpair来与worker进程进行通信了,而且worker进程之间也可以通过其进行通信。(在master进程以及worker进程中都有全局数组ngx_processes,这些socketpair的信息都会保存在其中,通信的时候用就可以了,具体通信的处理又会涉及到event模块的内容,也就是ngx_channel_handler函数,以后再说吧)