Nginx之进程间的通信机制(Nginx频道)

1. Nginx 频道

ngx_channel_t 频道是 Nginx master 进程与 worker 进程之间通信的常用工具,它是使用本机套接字实现的,即 socketpair 方法,它用于创建父子进程间使用的套接字。

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>

int socketpair(int domain, int type, int protocol, int sv[2]);

这个方法可以创建一对关联的套接字 sv[2]。

  • domain:表示域,在 Linux 下通常取值为 AF_UNIX;
  • type:取值为 SOCK_STREAM 或 SOCK_DGRAM,它表示在套接字上使用的是 TCP 还是 UDP;
  • protocol:必须传递 0;
  • sv[2]:是一个含有两个元素的整型数组,实际上就是两个套接字。
  • 当 socketpair 返回 0 时,sv[2] 这两个套接字创建成功,否则 sockpair 返回 -1 表示失败.

当 socketpair 执行成功时,sv[2] 这两个套接字具备下列关系:

  • 向 sv[0] 套接字写入数据,将可以从 sv[1] 套接字中读取到刚写入的数据;
  • 同样,向 sv[1] 套接字写入数据,也可以从 sv[0] 中读取到写入的数据。
  • 通常,在父、子进程通信前,会先调用 socketpair 方法创建这样一组套接字,在调用 fork 方法创建出子进程后,将会在父进程中关闭 sv[1] 套接字,仅使用 sv[0] 套接字用于向子进程发送数据以及接收子进程发送来的数据;
  • 而在子进程中则关闭 sv[0] 套接字,仅使用 sv[1] 套接字既可以接收父进程发送来的数据,也可以向父进程发送数据。

ngx_channel_t 结构体是 Nginx 定义的 master 父进程与 worker 子进程间的消息格式,如下:

typedef struct {
    // 传递的 TCP 消息中的命令
    ngx_uint_t  command;
    // 进程 ID,一般是发送命令方的进程 ID
    ngx_pid_t   pid;
    // 表示发送命令方在 ngx_processes 进程数组间的序号
    ngx_int_t   slot;
    // 通信的套接字句柄
    ngx_fd_t    fd;
}ngx_channel_t;

Nginx 针对 command 成员定义了如下命令:

// 打开频道,使用频道这种方式通信前必须发送的命令
#define NGX_CMD_OPEN_CHANNEL 1

// 关闭已经打开的频道,实际上也就是关闭套接字
#define NGX_CMD_CLOSE_CHANNEL 2

// 要求接收方正常地退出进程
#define NGX_CMD_QUIT 3

// 要求接收方强制地结束进程
#define NGX_CMD_TERMINATE 4

// 要求接收方重新打开进程已经打开过的文件
#define NGX_CMD_REOPEN 5

问:master 是如何启动、停止 worker 子进程的?

答:正是通过 socketpair 产生的套接字发送命令的,即每次要派生一个子进程之前,都会先调用 socketpair 方法。

在 Nginx 派生子进程的 ngx_spawn_process 方法中,会首先派生基于 TCP 的套接字,如下:

ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
    char *name, ngx_int_t respawn)
{

    if (respawn != NGX_PROCESS_DETACHED) {

        /* Solaris 9 still has no AF_LOCAL */

        // ngx_processes[s].channel 数组正是将要用于父、子进程间通信的套接字对
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
        {
            return NGX_INVALID_PID;
        }

        // 将 channel 套接字对都设置为非阻塞模式
        if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }

        if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
    ...
}

ngx_processes 数组定义了 Nginx 服务中所有的进程,包括 master 进程和 worker 进程,如下:

#define NGX_MAX_PROCESSES 1024

// 虽然定义了 NGX_MAX_PROCESSES 个成员,但是已经使用的元素仅与启动的进程个数有关
ngx_processes_t ngx_processes[NGX_MAX_PROCESSES];

ngx_processes 数组的类型是 ngx_processes_t,对于频道来说,这个结构体只关心它的 channel 成员:

typedef struct {
    ...
    // socketpair 创建的套接字对
    ngx_socket_t channel[2];
}ngx_processes_t;

1. ngx_write_channel:使用频道发送 ngx_channel_t 消息

ngx_int_t
ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,
    ngx_log_t *log)
{
    ssize_t         n;
    ngx_err_t       err;
    struct iovec    iov[1];
    struct msghdr   msg;

#if (NGX_HAVE_MSGHDR_MSG_CONTROL)

    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    }cmsg;

    if (ch->fd == -1) {
        msg.msg_control = NULL;
        msg.msg_controllen = 0;

    } else {
        // 辅助数据
        msg.msg_control = (caddr_t)&cmsg;
        msg.msg_controllen = sizeof(cmsg);

        ngx_memzero(&cmsg, sizeof(cmsg));

        cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
        cmsg.cm.cmsg_level = SOL_SOCKET;
        cmsg.cm.cmsg_type = SCM_RIGHTS;

        /*
         * We have to use ngx_memcpy() instead of simple
         *   *(int *) CMSG_DATA(&cmsg.cm) = ch->fd;
         * because some gcc 4.4 with -O2/3/s optimization issues the warning:
         *   dereferencing type-punned pointer will break strict-aliasing rules
         *
         * Fortunately, gcc with -O1 compiles this ngx_memcpy()
         * in the same simple assignment as in the code above
         */

        ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int));
    }

    msg.msg_flags = 0;

#else

    if (ch->fd == -1) {
        msg.msg_accrights = NULL;
        msg.msg_accrightslen = 0;

    } else {
        msg.msg_accrights = (caddr_t) &ch->fd;
        msg.msg_accrightslen = sizeof(int);
    }

#endif

    // 指向要发送的 ch 起始地址
    iov[0].iov_base = (char *) ch;
    iov[0].iov_len = size;

    // msg_name 和 msg_namelen 仅用于未连接套接字(如UDP)
    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;

    // 将该 ngx_channel_t 消息发出去
    n = sendmsg(s, &msg, 0);

    if (n == -1) {
        err = ngx_errno;
        if (err == NGX_EAGAIN) {
            return NGX_AGAIN;
        }

        return NGX_ERROR;
    }

    return NGX_OK;
}

2. ngx_read_channel: 读取消息

ngx_int_t
ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
{
    ssize_t             n;
    ngx_err_t           err;
    struct iovec        iov[1];
    struct msghdr       msg;

#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
    union {
        struct cmsghdr  cm;
        char            space[CMSG_SPACE(sizeof(int))];
    } cmsg;
#else
    int                 fd;
#endif

    iov[0].iov_base = (char *)ch;
    iov[0].iov_len = size;

    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;

#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
    msg.msg_control = (caddr_t) &cmsg;
    msg.msg_controllen = sizeof(cmsg);
#else
    msg.msg_accrights = (caddr_t) &fd;
    msg.msg_accrightslen = sizeof(int);
#endif

    // 接收命令
    n = recvmsg(s, &msg, 0);

    if (n == -1) {
        err = ngx_errno;
        if (err == NGX_EAGAIN) {
            return NGX_AGAIN;
        }

        return NGX_ERROR;
    }

    if (n == 0) {
        return NGX_ERROR;
    }

    // 接收的数据不足
    if ((size_t) n < sizeof(ngx_channel_t)) {
        return NGX_ERROR;
    }

#if (NGX_HAVE_MSGHDR_MSG_CONTROL)

    // 若接收到的命令为"打开频道,使用频道这种方式通信前必须发送的命令"
    if (ch->command == NGX_CMD_OPEN_CHANNEL) {

        if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) {
            return NGX_ERROR;
        }

        if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS)
        {
            return NGX_ERROR;
        }

        /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */

        ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int));
    }

    // 若接收到的消息是被截断的
    if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
        ngx_log_error(NGX_LOG_ALERT, log, 0,
                      "recvmsg() truncated data");
    }

#else

    if (ch->command == NGX_CMD_OPEN_CHANNEL) {
        if (msg.msg_accrightslen != sizeof(int)) {
            return NGX_ERROR;
        }

        ch->fd = fd;
    }
#endif

    return n;
}

在 Nginx 中,目前仅存在 master 进程向 worker 进程发送消息的场景,这时对于 socketpair 方法创建的 channel[2] 套接字来说,master 进程会使用 channel[0] 套接字来发送消息,而 worker 进程会使用 channel[1] 套接字来接收消息。

3. ngx_add_channel_event: 把接收频道消息的套接字添加到 epoll 中

worker 进程调度 ngx_read_channel 方法接收频道消息是通过该 ngx_add_channel_event 函数将接收频道消息的套接字(对于 worker 即为channel[1])添加到 epoll 中,当接收到父进程消息时子进程会通过 epoll 的事件回调相应的 handler 方法来处理这个频道消息,如下:

ngx_int_t
ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event,
    ngx_event_handler_pt handler)
{
    ngx_event_t         *ev, *rev, *wev;
    ngx_connection_t    *c;

    // 获取一个空闲连接
    c = ngx_get_connection(fd, cycle->log);

    if (c == NULL) {
        return NGX_ERROR;
    }

    c->pool = cycle->pool;

    rev = c->read;
    wev = c->write;

    rev->log = cycle->log;
    wev->log = cycle->log;

    rev->channel = 1;
    wev->channel = 1;

    ev = (event == NGX_READ_EVENT) ? rev : wev;

    // 初始化监听该 ev 事件时调用的回调函数
    ev->handler = handler;

    // 将该接收频道消息的套接字添加到 epoll 中
    if (ngx_add_conn && (ngx_event_flags && NGX_USE_EPOLL_EVENT) == 0) {
        // 这里是同时监听该套接字的读、写事件
        if (ngx_add_conn(c) == NGX_ERROR) {
            ngx_free_connection(c);
            return NGX_ERROR;
        }

    } else {
        // 这里是仅监听 ev 事件
        if (ngx_add_event(ev, event, 0) == NGX_ERROR) {
            ngx_free_connection(c);
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}

4. ngx_close_channel: 关闭这个频道通信方式

void
ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log)
{
    if (close(fd[0]) == -1) {

    }

    if (close(fd[1]) == -1) {

    }
}

原文地址:https://www.cnblogs.com/jimodetiantang/p/9191092.html

时间: 2024-08-02 04:35:59

Nginx之进程间的通信机制(Nginx频道)的相关文章

Nginx之进程间的通信机制(信号、信号量、文件锁)

1. 信号 Nginx 在管理 master 进程和 worker 进程时大量使用了信号.Linux 定义的前 31 个信号是最常用的,Nginx 则通过重定义其中一些信号的处理方法来使用吸纳后,如接收到 SIGUSR1 信号就意味着需要重新打开文件. 使用信号时 Nginx 定义了一个 ngx_signal_t 结构体用于描述接收到的信号时的行为: typedef struct { // 需要处理的信号 int signo; // 信号对应的字符串名称 char *signame; // 这个

从AIDL开始谈Android进程间Binder通信机制

本文首先概述了Android的进程间通信的Binder机制,然后结合一个AIDL的例子,对Binder机制进行了解析. 概述 我们知道,在Android app中的众多activity,service等组件可以运行在同一进程中,也可以运行在不同进程中.当组件运行在同一进程中进行通信就显得比较简单,在之前的Android线程间通信机制中已经讲过了:而当它们运行在不同的进程中时,就需要使用我们本文中所要介绍的Binder机制了. Binder作为一种进程间通信机制,负责提供远程调用的功能(RPC),

Nginx学习——Nginx进程间的通信

nginx进程间的通信 进程间消息传递 共享内存 共享内存还是Linux下提供的最主要的进程间通信方式,它通过mmap和shmget系统调用在内存中创建了一块连续的线性地址空间,而通过munmap或者shmdt系统调用可以释放这块内存.使用共享内存的优点是当多个进程使用同一块共享内存时,在不论什么一个进程改动了共享内存中的内容后,其它进程通过訪问这段共享内存都可以得到改动后的内容. Nginx定义了ngx_shm_t结构体.用于描写叙述一块共享内存, typedef struct{ //指向共享

wwwhy76888com理解容器间link通信机制199O8836661

一.什么是docker的link机制? 同一个宿主机上的多个docker容器之间如果想进行通信,可以通过使用容器的ip地址来通信,也可以通过宿主机的ip加上容器暴露出的端口号来通信,前者会导致ip地址的硬编码,不方便迁移,并且容器重启后ip地址会改变,除非使用固定的ip,后者的通信方式比较单一,只能依靠监听在暴露出的端口的进程来进行有限的通信.通过docker的link机制可以通过一个name来和另一个容器通信,link机制方便了容器去发现其它的容器并且可以安全的传递一些连接信息给其它的容器.其

linux 进程间的通信

现在linux使用的进程间通信方式:(1)管道(pipe)和有名管道(FIFO)(2)信号(signal)(3)消息队列(4)共享内存(5)信号量(6)套接字(socket) 为何进行进程间的通信:A.数据传输:一个进程需要将它的数据发送给另一个进程,发送的数据量在一个字节到几M字节之间B.共享数据:多个进程想要操作共享数据,一个进程对共享数据的修改,别的进程应该立刻看到.C.通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程).D.资源共享

Linux进程间的通信

一.管道 管道是Linux支持的最初Unix IPC形式之一,具有以下特点: A. 管道是半双工的,数据只能向一个方向流动: B. 需要双工通信时,需要建立起两个管道: C. 只能用于父子进程或者兄弟进程之间(具有亲缘关系的进程): D. 单独构成一种独立的文件系统:管道对于管道两端的进程而言,就是一个文件,但它不是普通的文件,它不属于某种文件系统,而是自立门户,单独构成一种文件系统,并且只存在与内存中. 匿名管道的创建:该函数创建的管道的两端处于一个进程中间,在实际应用中没有太大意义;因此,一

【转】使用AIDL实现进程间的通信之复杂类型传递

使用AIDL实现进程间的通信之复杂类型传递 首先要了解一下AIDL对Java类型的支持. 1.AIDL支持Java原始数据类型. 2.AIDL支持String和CharSequence. 3.AIDL支持传递其他AIDL接口,但你引用的每个AIDL接口都需要一个import语句,即使位于同一个包中. 4.AIDL支持传递实现了android.os.Parcelable接口的复杂类型,同样在引用这些类型时也需要import语句.(Parcelable接口告诉Android运行时在封送(marsha

进程间的通信

进程间通信概念:(IPC) 每个进程都有各自不同的进程地址空间,任何一个进程的全局变量在另一个进程中都看不到,因此进程之间要交换数据必须要通过内核,在内核中开辟一块缓冲区,进程把数据从用户空间拷贝到内核区,再从内核缓冲区取出数据.这就叫进程间的通信. 管道技术:(pipe) 是一种最基本进程间通信机制,它是基于字节流的.分为匿名管道和命名管道. 调用pipe函数时,会在内核区开辟一块缓冲区用于通信,它有一个读端和一个写端,通过参数传给用户程序两个文件描述符,0指管道的读端,1指管道的写端,因此管

Android进程间的通信之AIDL

Android服务被设计用来执行很多操作,比如说,可以执行运行时间长的耗时操作,比较耗时的网络操作,甚至是在一个单独进程中的永不会结束的操作.实现这些操作之一是通过Android接口定义语言(AIDL)来完成的.AIDL被设计用来执行进程间通信,另一种实现方式见博文Android进程间的通信之Messenger.本文我们将学习如何创建AIDL文件实现Android进程间通信.在正式学习之前,我们先澄清一些"事实". 关于Android Service 1.Android服务不是后台任务