【Nginx】事件驱动框架处理流程

ngx_event_core_module模块的ngx_event_process_init方法对事件模块做了一些初始化。其中包括将“请求连接”这样一个读事件对应的处理方法(handler)设置为ngx_event_accept函数,并将此事件添加到epoll模块中。当有新连接事件发生时,ngx_event_accept就会被调用。大致流程是这样:

  1. worker进程在ngx_worker_process_cycle方法中不断循环调用ngx_process_events_and_timers函数处理事件,这个函数是事件处理的总入口。
  2. ngx_process_events_and_timers会调用ngx_process_events,这是一个宏,相当于ngx_event_actions.process_events,ngx_event_actions是个全局的结构体,存储了对应事件驱动模块(这里是epoll模块)的10个函数接口。所以这里就是调用了ngx_epoll_module_ctx.actions.process_events函数,也就是ngx_epoll_process_events函数来处理事件。
  3. ngx_epoll_process_events调用Linux函数接口epoll_wait获得“有新连接”这个事件,然后调用这个事件的handler处理函数来对这个事件进行处理。
  4. 在上面已经说过handler已经被设置成了ngx_event_accept函数,所以就调用ngx_event_accept进行实际的处理。

下面分析ngx_event_accept方法,它的流程图如下所示:

经过精简的代码如下,注释中的序号对应上图的序号:

void
ngx_event_accept(ngx_event_t *ev)
{
    socklen_t          socklen;
    ngx_err_t          err;
    ngx_log_t         *log;
    ngx_uint_t         level;
    ngx_socket_t       s;
    ngx_event_t       *rev, *wev;
    ngx_listening_t   *ls;
    ngx_connection_t  *c, *lc;
    ngx_event_conf_t  *ecf;
    u_char             sa[NGX_SOCKADDRLEN];

    if (ev->timedout) {
        if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
            return;
        }

        ev->timedout = 0;
    }

    ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

    if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
        ev->available = 1;

    } else if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
        ev->available = ecf->multi_accept;
    }

    lc = ev->data;
    ls = lc->listening;
    ev->ready = 0;

    do {
        socklen = NGX_SOCKADDRLEN;

        /* 1、accept方法试图建立连接,非阻塞调用 */
        s = accept(lc->fd, (struct sockaddr *) sa, &socklen);

        if (s == (ngx_socket_t) -1)
        {
            err = ngx_socket_errno;

            if (err == NGX_EAGAIN)
            {
                /* 没有连接,直接返回 */
                return;
            }

            level = NGX_LOG_ALERT;

            if (err == NGX_ECONNABORTED) {
                level = NGX_LOG_ERR;

            } else if (err == NGX_EMFILE || err == NGX_ENFILE) {
                level = NGX_LOG_CRIT;
            }

            if (err == NGX_ECONNABORTED) {
                if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
                    ev->available--;
                }

                if (ev->available) {
                    continue;
                }
            }

            if (err == NGX_EMFILE || err == NGX_ENFILE) {
                if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)
                    != NGX_OK)
                {
                    return;
                }

                if (ngx_use_accept_mutex) {
                    if (ngx_accept_mutex_held) {
                        ngx_shmtx_unlock(&ngx_accept_mutex);
                        ngx_accept_mutex_held = 0;
                    }

                    ngx_accept_disabled = 1;

                } else {
                    ngx_add_timer(ev, ecf->accept_mutex_delay);
                }
            }

            return;
        }

        /* 2、设置负载均衡阈值 */
        ngx_accept_disabled = ngx_cycle->connection_n / 8
                              - ngx_cycle->free_connection_n;

        /* 3、从连接池获得一个连接对象 */
        c = ngx_get_connection(s, ev->log);

        /* 4、为连接创建内存池 */
        c->pool = ngx_create_pool(ls->pool_size, ev->log);

        c->sockaddr = ngx_palloc(c->pool, socklen);

        ngx_memcpy(c->sockaddr, sa, socklen);

        log = ngx_palloc(c->pool, sizeof(ngx_log_t));

        /* set a blocking mode for aio and non-blocking mode for others */
        /* 5、设置套接字属性为阻塞或非阻塞 */
        if (ngx_inherited_nonblocking) {
            if (ngx_event_flags & NGX_USE_AIO_EVENT) {
                if (ngx_blocking(s) == -1) {
                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                                  ngx_blocking_n " failed");
                    ngx_close_accepted_connection(c);
                    return;
                }
            }

        } else {
            if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
                if (ngx_nonblocking(s) == -1) {
                    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                                  ngx_nonblocking_n " failed");
                    ngx_close_accepted_connection(c);
                    return;
                }
            }
        }

        *log = ls->log;

        c->recv = ngx_recv;
        c->send = ngx_send;
        c->recv_chain = ngx_recv_chain;
        c->send_chain = ngx_send_chain;

        c->log = log;
        c->pool->log = log;

        c->socklen = socklen;
        c->listening = ls;
        c->local_sockaddr = ls->sockaddr;
        c->local_socklen = ls->socklen;

        c->unexpected_eof = 1;

        rev = c->read;
        wev = c->write;

        wev->ready = 1;

        if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {
            /* rtsig, aio, iocp */
            rev->ready = 1;
        }

        if (ev->deferred_accept) {
            rev->ready = 1;

        }

        rev->log = log;
        wev->log = log;

        /*
         * TODO: MT: - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         *
         * TODO: MP: - allocated in a shared memory
         *           - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         */

        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

        if (ls->addr_ntop) {
            c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
            if (c->addr_text.data == NULL) {
                ngx_close_accepted_connection(c);
                return;
            }

            c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
                                             c->addr_text.data,
                                             ls->addr_text_max_len, 0);
            if (c->addr_text.len == 0) {
                ngx_close_accepted_connection(c);
                return;
            }
        }

        /* 6、将新连接对应的读写事件添加到epoll对象中 */
        if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
            if (ngx_add_conn(c) == NGX_ERROR) {
                ngx_close_accepted_connection(c);
                return;
            }
        }

        log->data = NULL;
        log->handler = NULL;

        /* 7、TCP建立成功调用的方法,这个方法在ngx_listening_t结构体中 */
        ls->handler(c);

    } while (ev->available);    /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */
}

Nginx中的“惊群”问题

Nginx一般会运行多个worker进程,这些进程会同时监听同一端口。当有新连接到来时,内核将这些进程全部唤醒,但只有一个进程能够和客户端连接成功,导致其它进程在唤醒时浪费了大量开销,这被称为“惊群”现象。Nginx解决“惊群”的方法是,让进程获得互斥锁ngx_accept_mutex,让进程互斥地进入某一段临界区。在该临界区中,进程将它所要监听的连接对应的读事件添加到epoll模块中,使得当有“新连接”事件发生时,该worker进程会作出反应。这段加锁并添加事件的过程是在函数ngx_trylock_accept_mutex中完成的。而当其它进程也进入该函数想要添加读事件时,发现互斥锁被另外一个进程持有,所以它只能返回,它所监听的事件也无法添加到epoll模块,从而无法响应“新连接”事件。但这会出现一个问题:持有互斥锁的那个进程在什么时候释放互斥锁呢?如果需要等待它处理完所有的事件才释放锁的话,那么会需要相当长的时间。而在这段时间内,其它worker进程无法建立新连接,这显然是不可取的。Nginx的解决办法是:通过ngx_trylock_accept_mutex获得了互斥锁的进程,在获得就绪读/写事件并从epoll_wait返回后,将这些事件归类放入队列中:

  • 新连接事件放入ngx_posted_accept_events队列
  • 已有连接事件放入ngx_posted_events队列

代码如下:

if (flags & NGX_POST_EVENTS)
{
        /* 延后处理这批事件 */
        queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);

        /* 将事件添加到延后执行队列中 */
        ngx_locked_post_event(rev, queue);
}
else
{
        rev->handler(rev);  /* 不需要延后,则立即处理事件 */
}

写事件做类似处理。进程接下来处理ngx_posted_accept_events队列中的事件,处理完后立即释放互斥锁,使该进程占用锁的时间降到了最低。

Nginx中的负载均衡问题

Nginx中每个进程使用了一个处理负载均衡的阈值ngx_accept_disabled,它在上图的第2步中被初始化:

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

它的初值为一个负数,该负数的绝对值等于总连接数的7/8.当阈值小于0时正常响应新连接事件,当阈值大于0时不再响应新连接事件,并将ngx_accept_disabled减1,代码如下:

if (ngx_accept_disabled > 0)
{
        ngx_accept_disabled--;
}
else
{
    if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR)
    {
        return;
    }
    ....
}

这说明,当某个进程当前的连接数达到能够处理的总连接数的7/8时,负载均衡机制被触发,进程停止响应新连接。

参考:

《深入理解Nginx》 P328-P334.

【Nginx】事件驱动框架处理流程,布布扣,bubuko.com

时间: 2024-12-14 18:40:55

【Nginx】事件驱动框架处理流程的相关文章

【nginx】【转】Nginx启动框架处理流程

Nginx启动过程流程图: ngx_cycle_t结构体: Nginx的启动初始化在src/core/nginx.c的main函数中完成,当然main函数是整个Nginx的入口,除了完成启动初始化任务以外,也必定是所有功能模块的入口之处.Nginx的初始化工作主要围绕一个类型为ngx_cycle_t类型的全局变量(cycle)展开. ngx_cycle_t结构体类型: typedef struct ngx_cycle_s ngx_cycle_t; struct ngx_cycle_s { voi

Nginx学习之十一-Nginx启动框架处理流程

Nginx启动过程流程图 下面首先给出Nginx启动过程的流程图: ngx_cycle_t结构体 Nginx的启动初始化在src/core/nginx.c的main函数中完成,当然main函数是整个Nginx的入口,除了完成启动初始化任务以外,也必定是所有功能模块的入口之处.Nginx的初始化工作主要围绕一个类型为ngx_cycle_t类型的全局变量(cycle)展开. ngx_cycle_t结构体类型: typedef struct ngx_cycle_s ngx_cycle_t; struc

【Nginx】事件驱动框架和异步处理

Nginx对请求的处理是通过事件触发的,模块作为事件消费者,只能被事件收集.分发器调用.这与传统的Web服务器是不同的.传统的Web服务器下,一个请求由一个进程消费,请求在建立连接后将始终占用着系统资源,直到连接关闭才会释放资源.这样做有以下缺点: 进程数增加会增加进程间切换的负担,影响系统整体性能. 当某个进程要等待事件发生而处于阻塞状态时,该进程仍然占用内存资源直到该请求结束,造成资源极大浪费. 在Nginx中,接收到一个请求时,不会产生一个单独的进程来处理该请求,而是由事件收集.分发器(进

Nginx——事件驱动机制(惊群问题,负载均衡)

事件框架处理流程 每个worker子进程都在ngx_worker_process_cycle方法中循环处理事件,处理分发事件则在ngx_worker_process_cycle方法中调用ngx_process_events_and_timers方法,循环调用该方法就是 在处理所有事件,这正是事件驱动机制的核心.该方法既会处理普通的网络事件,也会处理定时器事件. ngx_process_events_and_timers方法中核心操作主要有以下3个: 1)  调用所使用事件驱动模块实现的proce

Nginx Http框架的理解

HTTP框架是Nginx基础框架的一部分,Nginx的其它底层框架如master-worker进程模型.event模块.mail 模块等. HTTP框架代码主要有2个模块组成:ngx_http_module和ngx_http_core_module: 我们编写的HTTP模块需要注册到HTTP框架上,才能融入HTTP请求的处理流程中. 当在nginx.conf中存在一个http{...}的配置时,即启用了HTTP框架代码,在nginx配置解析时,就已经为框架建立好了各种数据结构(尤其是HTTP模块

SSH(Struts2+Spring+Hibernate)框架搭建流程<注解的方式创建Bean>

此篇讲的是MyEclipse9工具提供的支持搭建自加包有代码也是相同:用户登录与注册的例子,表字段只有name,password. SSH,xml方式搭建文章链接地址:http://www.cnblogs.com/wkrbky/p/5912810.html 一.Hibernate(数据层)的搭建: 实现流程 二.Spring(注入实例)的使用: 实现流程 三.Struts2(MVC)的搭建: 实现流程 这里注意一点问题: Struts2与Hibernate在一起搭建,antlr包,有冲突.MyE

struts2 框架处理流程

struts2 框架处理流程 流程图如下: -->FilterDispatcher过滤器在2.1.3以后被StrutsPrepareAndExecuteFilter所替代,使得在执行Action之前可以添加过滤器了: -->客户端初始化一个指向servlet容器的request请求,请求经过一系列的过滤器,FilterDispatcher询问ActionMapper来决定这个请求是否需要调用某个Action. 1)Struts2框架的大致处理流程: ①     浏览器发出请求,例如请求/log

图文解析Struts2框架执行流程

struts的架构图 (1)提交请求 客户端通过HttpServletRequest向servlet容器(即tomcat)提交一个请求. 请求经过一系列的过滤器,例如图中的ActionContextCleanUp和Other filter(SlterMesh,etc)等,最后被struts的核心过滤器FilterDispatcher控制到. 注:核心控制器2.1.3版本之后,struts的filterDispatcher核心控制器变成了StrutsPrepareAndExecuteFilte,如

nginx的请求接收流程(一)

今年我们组计划写一本nginx模块开发以及原理解析方面的书,整本书是以open book的形式在网上会定时的更新,网址为http://tengine.taobao.org/book/index.html.本书分析的nginx源码版本为1.2.0,环境为linux,事件处理模型为epoll,大部分分析流程都基于以上假设.我会负责其中一些章节的编写,所以打算在这里写一系列我负责章节内容相关的文章(主要包括nginx各phase模块的开发,nginx请求的处理流程等).本篇文章主要会介绍nginx中请