【Nginx核心模块】线程池模块

本节研究Nginx中线程池模块的相关实现;

总体说明

(1)线程池模块属于核心模块,因此其配置内存ngx_thread_pool_conf_t将会预先申请好;ngx_thread_pool_conf_t中主要管理各个线程池结构;

(2)在ngx_thread_pool_init_worker和 ngx_thread_pool_exit_worker分别会创建每一个线程池和销毁每一个线程池;

线程池模块

ngx_module_t  ngx_thread_pool_module = {
    NGX_MODULE_V1,
    &ngx_thread_pool_module_ctx,           /* module context */
    ngx_thread_pool_commands,              /* module directives */
    NGX_CORE_MODULE,                       /* module type */    //核心模块
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    ngx_thread_pool_init_worker,           /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    ngx_thread_pool_exit_worker,           /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

核心模块上下文

//核心模块上下文
static ngx_core_module_t  ngx_thread_pool_module_ctx = {
    ngx_string("thread_pool"),
    ngx_thread_pool_create_conf,
    ngx_thread_pool_init_conf
};

线程模块配置结构

//线程模块配置结构
typedef struct {
    ngx_array_t               pools;      //管理多个线程池
} ngx_thread_pool_conf_t;

申请配置内存

//线程模块配置申请内存
static void *
ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
{
    ngx_thread_pool_conf_t  *tcf;

    tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
    if (tcf == NULL) {
        return NULL;
    }

    //总的线程池的个数
    if (ngx_array_init(&tcf->pools, cycle->pool, 4,
                       sizeof(ngx_thread_pool_t *))
        != NGX_OK)
    {
        return NULL;
    }

    return tcf;
}

默认初始化配置

//线程模块配置初始化
static char *
ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
{
    ngx_thread_pool_conf_t *tcf = conf;

    ngx_uint_t           i;
    ngx_thread_pool_t  **tpp;

    tpp = tcf->pools.elts;

    for (i = 0; i < tcf->pools.nelts; i++) {

        if (tpp[i]->threads) {  //线程数目不为0时,直接跳过
            continue;
        }

        if (tpp[i]->name.len == ngx_thread_pool_default.len
            && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
                           ngx_thread_pool_default.len)
               == 0)
        {
            tpp[i]->threads = 32;   //默认的线程数
            tpp[i]->max_queue = 65536;  //默认的任务数
            continue;
        }

        ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
                      "unknown thread pool \"%V\" in %s:%ui",
                      &tpp[i]->name, tpp[i]->file, tpp[i]->line);

        return NGX_CONF_ERROR;
    }

    return NGX_CONF_OK;
}

线程池模块配置项描述

static ngx_command_t  ngx_thread_pool_commands[] = {

    { ngx_string("thread_pool"),
      NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,    //main配置项目
      ngx_thread_pool,          //解析函数
      0,
      0,
      NULL },

      ngx_null_command
};

"ngx_thread_pool"配置解析

//"ngx_thread_pool"配置解析
static char *
ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_str_t          *value;
    ngx_uint_t          i;
    ngx_thread_pool_t  *tp;

    value = cf->args->elts;

    tp = ngx_thread_pool_add(cf, &value[1]);    //添加线程池

    if (tp == NULL) {
        return NGX_CONF_ERROR;
    }

    if (tp->threads) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "duplicate thread pool \"%V\"", &tp->name);
        return NGX_CONF_ERROR;
    }

    tp->max_queue = 65536;

    for (i = 2; i < cf->args->nelts; i++) {

        if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {

            tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);	//线程数目设置

            if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                   "invalid threads value \"%V\"", &value[i]);
                return NGX_CONF_ERROR;
            }

            continue;
        }

        if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {

            tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);	//任务数设置

            if (tp->max_queue == NGX_ERROR) {
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                   "invalid max_queue value \"%V\"", &value[i]);
                return NGX_CONF_ERROR;
            }

            continue;
        }
    }

    if (tp->threads == 0) {   //若线程池为空时
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "\"%V\" must have \"threads\" parameter",
                           &cmd->name);
        return NGX_CONF_ERROR;
    }

    return NGX_CONF_OK;
}

在worker进程进入主循环前调用

//在worker进程进入循环前调用
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
{
    ngx_uint_t                i;
    ngx_thread_pool_t       **tpp;
    ngx_thread_pool_conf_t   *tcf;

    if (ngx_process != NGX_PROCESS_WORKER
        && ngx_process != NGX_PROCESS_SINGLE)
    {
        return NGX_OK;
    }

    tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                                  ngx_thread_pool_module);

    if (tcf == NULL) {
        return NGX_OK;
    }

    //初始化已完成任务队列头
    ngx_thread_pool_queue_init(&ngx_thread_pool_done);

    tpp = tcf->pools.elts;

    for (i = 0; i < tcf->pools.nelts; i++) {
        if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {  //初始化每一个线程池
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}

在worker进程退出前调用

//在worker进程退出前调用
static void
ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
{
    ngx_uint_t                i;
    ngx_thread_pool_t       **tpp;
    ngx_thread_pool_conf_t   *tcf;

    if (ngx_process != NGX_PROCESS_WORKER
        && ngx_process != NGX_PROCESS_SINGLE)
    {
        return;
    }

    tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                                  ngx_thread_pool_module);

    if (tcf == NULL) {
        return;
    }

    tpp = tcf->pools.elts;

    for (i = 0; i < tcf->pools.nelts; i++) {
        ngx_thread_pool_destroy(tpp[i]);    //销毁每一个线程池
    }
}

线程池描述

//线程池描述
struct ngx_thread_pool_s {
    ngx_thread_mutex_t        mtx;		//互斥量
    ngx_thread_pool_queue_t   queue;	//任务队列
    ngx_int_t                 waiting;  //等待的任务数
    ngx_thread_cond_t         cond;		//条件

    ngx_log_t                *log;

    ngx_str_t                 name;     //线程池名称
    ngx_uint_t                threads;  	//线程池数目
    ngx_int_t                 max_queue;	//任务数的最大个数

    u_char                   *file;
    ngx_uint_t                line;
};

初始化一个线程池

//初始化一个线程池
static ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
    int             err;
    pthread_t       tid;
    ngx_uint_t      n;
    pthread_attr_t  attr;

    if (ngx_notify == NULL) {   //没有通知处理
        ngx_log_error(NGX_LOG_ALERT, log, 0,
               "the configured event method cannot be used with thread pools");
        return NGX_ERROR;
    }

    ngx_thread_pool_queue_init(&tp->queue);   //初始化该线程池中任务队列

    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {   //创建该线程池的互斥量
        return NGX_ERROR;
    }

    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {   //创建线程池的条件变量
        (void) ngx_thread_mutex_destroy(&tp->mtx, log);
        return NGX_ERROR;
    }

    tp->log = log;

    err = pthread_attr_init(&attr);   //设置线程属性
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_init() failed");
        return NGX_ERROR;
    }

#if 0
    err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);    //设置栈大小
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_setstacksize() failed");
        return NGX_ERROR;
    }
#endif

    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
		//创建线程池, ngx_thread_pool_cycle为线程执行的函数

		if (err) {
            ngx_log_error(NGX_LOG_ALERT, log, err,
                          "pthread_create() failed");
            return NGX_ERROR;
        }
    }

    (void) pthread_attr_destroy(&attr);   //摧毁线程属性

    return NGX_OK;
}

销毁线程池

//销毁线程池
static void
ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
{
    ngx_uint_t           n;
    ngx_thread_task_t    task;
    volatile ngx_uint_t  lock;

    ngx_memzero(&task, sizeof(ngx_thread_task_t));

    task.handler = ngx_thread_pool_exit_handler;	//任务退出执行函数
    task.ctx = (void *) &lock;				//指向传入的参数

    for (n = 0; n < tp->threads; n++) {	//tp中所有的线程池添加该任务
        lock = 1;

        if (ngx_thread_task_post(tp, &task) != NGX_OK) {
            return;
        }

        while (lock) {		//主进程判断如果lock没有改变,就让CPU给其他线程执行,以此等待,相当于pthread_join
            ngx_sched_yield();
        }

        task.event.active = 0;
    }

	//此时到这边,所有的线程都已经退出

    (void) ngx_thread_cond_destroy(&tp->cond, tp->log);		//条件变量销毁

    (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);		//互斥量销毁
}

//各个线程的退出任务
static void
ngx_thread_pool_exit_handler(void *data, ngx_log_t *log)
{
    ngx_uint_t *lock = data;	//获取参数

    *lock = 0;	//变为0

    pthread_exit(0);
}
时间: 2024-10-10 07:44:11

【Nginx核心模块】线程池模块的相关文章

java核心知识点 --- 线程池ThreadPool

线程池是多线程学习中需要重点掌握的. 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互.在这种情形下,使用线程池可以很好的提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 一.如何创建线程池?? 在Java5之前,线程池都是开发才手动实现的,从Java5开始,Java内建支持线程池.主要是新增了一个executors工厂类来生产线程池. 1.newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓

python3 线程池-threadpool模块与concurrent.futures模块

一. 既然多线程可以缩短程序运行时间,那么,是不是线程数量越多越好呢? 显然,并不是,每一个线程的从生成到消亡也是需要时间和资源的,太多的线程会占用过多的系统资源(内存开销,cpu开销),而且生成太多的线程时间也是可观的,很可能会得不偿失,这里给出一个最佳线程数量的计算方式: 最佳线程数的获取: 1.通过用户慢慢递增来进行性能压测,观察QPS(即每秒的响应请求数,也即是最大吞吐能力.),响应时间 2.根据公式计算:服务器端最佳线程数量=((线程等待时间+线程cpu时间)/线程cpu时间) * c

nginx源码分析——线程池

源码: nginx 1.13.0-release 一.前言 nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响.但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA).其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的.下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误.不足还请大家指出,谢谢)

Nginx引入线程池,性能提升9倍!

前言 Nginx以异步.事件驱动的方式处理连接.传统的方式是每个请求新起一个进程或线程,Nginx没这样做,它通过非阻塞sockets.epoll.kqueue等高效手段,实现一个worker进程处理多个连接和请求. 一般情况下是一个CPU内核对应一个worker进程,所以worker进程数量固定,并且不多,所以在任务切换上消耗的内存和CPU减少了.这种方式很不错,在高并发和扩展能力等方面都能体现. 看图说话,任务切换不见了. 但是异步事件模式很不喜欢阻塞(blocking).很多第三方模块使用

Nginx 的线程池与性能剖析

http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt158 正如我们所知,NGINX采用了异步.事件驱动的方法来处理连接.这种处理方式无需(像使用传统架构的服务器一样)为每个请求创建额外的专用进程或者线程,而是在一个工作进程中处理多个连接和请求.为此,NGINX工作在非阻塞的socket模式下,并使用了epoll 和 kqueue这样有效的方法. 因为满负载进程的数量很少(通常每核CPU只有一个)而且恒定,所以任务切换只消耗很少的内

Nginx 的线程池与性能剖析【转载】

正如我们所知,NGINX采用了异步.事件驱动的方法来处理连接.这种处理方式无需(像使用传统架构的服务器一样)为每个请求创建额外的专用进程或者线程,而是在一个工作进程中处理多个连接和请求.为此,NGINX工作在非阻塞的socket模式下,并使用了epoll 和 kqueue这样有效的方法. 因为满负载进程的数量很少(通常每核CPU只有一个)而且恒定,所以任务切换只消耗很少的内存,而且不会浪费CPU周期.通过NGINX本身的实例,这种方法的优点已经为众人所知.NGINX可以非常好地处理百万级规模的并

线程池机制使nginx性能提高9倍

原文标题:Thread Pools in NGINX Boost Performance 9x! 原文官方地址:https://www.nginx.com/blog/thread-pools-boost-performance-9x/ 本文为译文,非直译. 一.问题一般情况下,nginx 是一个事件处理器,一个从内核获取连接事件并告诉系统如何处理的控制器.实际上,在操作系统做读写数据调度的时候,nginx是协同系统工作的,所以nginx能越快响应越好. nginx处理的事件可以是 超时通知.so

NGINX引入线程池 性能提升9倍

1. 引言 正如我们所知,NGINX采用了异步.事件驱动的方法来处理连接.这种处理方式无需(像使用传统架构的服务器一样)为每个请求创建额外的专用进程或者线程,而是在一个工作进程中处理多个连接和请求.为此,NGINX工作在非阻塞的socket模式下,并使用了epoll 和 kqueue这样有效的方法. 因为满负载进程的数量很少(通常每核CPU只有一个)而且恒定,所以任务切换只消耗很少的内存,而且不会浪费CPU周期.通过NGINX本身的实例,这种方法的优点已经为众人所知.NGINX可以非常好地处理百

Nginx 引入线程池,提升 9 倍性能

转载:http://blog.csdn.net/wuliusir/article/details/50760357 众所周知,NGINX 采用异步.事件驱动的方式处理连接.意味着无需对每个请求创建专门的进程或线程,它用一个工作进程(worker process)处理多个连接和请求.为了达到这个目的,NGINX采用非阻塞模式的 socket,并利用诸如 epoll 和 kqueue 的高效方法. 全量进程(full-weight process)数很少(通常是一个 CPU 核只有一个)而且恒定.内