Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现

声明:本文为原创博文,转载请注明出处。

Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们。在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册、注销、通知功能)、多路复用器(由操作系统提供,比如kqueue、select、epoll等)、事件处理器(负责事件的处理)以及事件源(linux中这就是描述符)这四个组件。一般,会单独启动一个线程运行Reactor实例来实现真正的异步操作。但是,依赖操作系统提供的系统调用来实现异步是有局限的,比如在Reactor模型中我们只能监听到:网络IO事件、signel(信号)、超时事件以及一些管道事件等,但这些事件也只是通知我们资源可读或者可写,真正的读写操作(read和write)还是同步的(也就是你必须等到read或者write返回,虽然linux提供了aio,但是其有诸多槽点),那么Nodejs的全异步是如何做到的呢?你可能会很快想到,就是启用单独的线程来做同步的事情,这也是libuv的设计思路,借用官网的一张图,说明一切:

由上图可以看到,libuv实现了一套自己的线程池来处理所有同步操作(从而模拟出异步的效果),下面就来看一下该线程池的具体实现吧!

一、线程池模型

说道线程池,在java领域中,jdk本身就提供了多种线程池实现,几乎所有的线程池都遵循以下模型(任务队列+线程池):

libuv自身定义了一个非常精炼、高效的队列(双向循环链表),只用了几个简单的宏定义将其实现,具体实现方式可以参见我的另一篇博文:libuv高效队列的实现。现在队列有了,来看一下task的定义:

1 struct uv__work {
2   void (*work)(struct uv__work *w);
3   void (*done)(struct uv__work *w, int status);
4   struct uv_loop_s* loop;
5   void* wq[2];
6 };

uv__work就代表一个task,可以看到里面有两个函数指针(work代表任务实际操作,done用于对任务进行状态确认)。wq成员就是一个QUEUE的节点,  uv__work就是通过wq与其他  uv__work连接成一个队列。

下面来看一下threadpool的初始化,代码如下:

 1 #define MAX_THREADPOOL_SIZE 128
 2
 3 static uv_once_t once = UV_ONCE_INIT;
 4 static uv_cond_t cond;
 5 static uv_mutex_t mutex;
 6 static unsigned int idle_threads;//当前空闲的线程数
 7 static unsigned int nthreads;
 8 static uv_thread_t* threads;
 9 static uv_thread_t default_threads[4];
10 static QUEUE exit_message;
11 static QUEUE wq;//线程池全部会检查这个queue,一旦发现有任务就执行,但是只能有一个线程抢占到
12 static volatile int initialized;
13
14
15 static void init_once(void) {
16   unsigned int i;
17   const char* val;
18   // 线程池中的线程数,默认值为4
19   nthreads = ARRAY_SIZE(default_threads);
20   val = getenv("UV_THREADPOOL_SIZE");
21   if (val != NULL)
22     nthreads = atoi(val);
23   if (nthreads == 0)
24     nthreads = 1;
25   if (nthreads > MAX_THREADPOOL_SIZE)
26     nthreads = MAX_THREADPOOL_SIZE;
27
28   threads = default_threads;
29   if (nthreads > ARRAY_SIZE(default_threads)) {
30     // 分配线程句柄
31     threads = uv__malloc(nthreads * sizeof(threads[0]));
32     if (threads == NULL) {
33       nthreads = ARRAY_SIZE(default_threads);
34       threads = default_threads;
35     }
36   }
37   // 初始化条件变量
38   if (uv_cond_init(&cond))
39     abort();
40   // 初始化互斥锁
41   if (uv_mutex_init(&mutex))
42     abort();
43
44   // 初始化任务队列
45   QUEUE_INIT(&wq);
46
47   // 创建nthreads个线程
48   for (i = 0; i < nthreads; i++)
49     if (uv_thread_create(threads + i, worker, NULL))
50       abort();
51
52   initialized = 1;
53 }

上面的代码中,一共创建了nthreads个线程,那么每个线程的执行代码是什么呢?由线程创建代码:uv_thread_create(threads + i, worker, NULL),可以看到,每一个线程都是执行worker函数,下面看看worker函数都在做什么:

 1 /* To avoid deadlock with uv_cancel() it‘s crucial that the worker
 2  * never holds the global mutex and the loop-local mutex at the same time.
 3  */
 4 static void worker(void* arg) {
 5   struct uv__work* w;
 6   QUEUE* q;
 7
 8   (void) arg;
 9
10   for (;;) {
11     // 因为是多线程访问,因此需要加锁同步
12     uv_mutex_lock(&mutex);
13
14     // 如果任务队列是空的
15     while (QUEUE_EMPTY(&wq)) {
16       // 空闲线程数加1
17       idle_threads += 1;
18       // 等待条件变量
19       uv_cond_wait(&cond, &mutex);
20       // 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1
21       idle_threads -= 1;
22     }
23
24     // 取出队列的头部节点(第一个task)
25     q = QUEUE_HEAD(&wq);
26
27     if (q == &exit_message)
28       uv_cond_signal(&cond);
29     else {
30       // 从队列中移除这个task
31       QUEUE_REMOVE(q);
32       QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
33                              executing. */
34     }
35
36     uv_mutex_unlock(&mutex);
37
38     if (q == &exit_message)
39       break;
40
41     // 取出uv__work首地址
42     w = QUEUE_DATA(q, struct uv__work, wq);
43     // 调用task的work,执行任务
44     w->work(w);
45
46     uv_mutex_lock(&w->loop->wq_mutex);
47     w->work = NULL;  /* Signal uv_cancel() that the work req is done
48                         executing. */
49     QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
50     uv_async_send(&w->loop->wq_async);
51     uv_mutex_unlock(&w->loop->wq_mutex);
52   }
53 }

可以看到,多个线程都会在worker方法中等待在conn条件变量上,一旦有任务加入队列,线程就会被唤醒,然后只有一个线程会得到任务的执行权,其他的线程只能继续等待。

那么如何向队列提交一个task呢?看以下代码:

 1 void uv__work_submit(uv_loop_t* loop,
 2                      struct uv__work* w,
 3                      void (*work)(struct uv__work* w),
 4                      void (*done)(struct uv__work* w, int status)) {
 5   uv_once(&once, init_once);
 6   // 构造一个task
 7   w->loop = loop;
 8   w->work = work;
 9   w->done = done;
10   // 将其插入任务队列
11   post(&w->wq);
12 }

接着看post做了什么:

 1 static void post(QUEUE* q) {
 2   // 同步队列操作
 3   uv_mutex_lock(&mutex);
 4   // 将task插入队列尾部
 5   QUEUE_INSERT_TAIL(&wq, q);
 6   // 如果当前有空闲线程,就向条件变量发送信号
 7   if (idle_threads > 0)
 8     uv_cond_signal(&cond);
 9   uv_mutex_unlock(&mutex);
10 }

有提交任务,就肯定会有取消一个任务的操作,是的,他就是uv__work_cancel,代码如下:

 1 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
 2   int cancelled;
 3
 4   uv_mutex_lock(&mutex);
 5   uv_mutex_lock(&w->loop->wq_mutex);
 6
 7   // 只有当前队列不为空并且要取消的uv__work有效时才会继续执行
 8   cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
 9   if (cancelled)
10     QUEUE_REMOVE(&w->wq);// 从队列中移除task
11
12   uv_mutex_unlock(&w->loop->wq_mutex);
13   uv_mutex_unlock(&mutex);
14
15   if (!cancelled)
16     return UV_EBUSY;
17
18   // 更新这个task的状态
19   w->work = uv__cancelled;
20   uv_mutex_lock(&loop->wq_mutex);
21   QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
22   uv_async_send(&loop->wq_async);
23   uv_mutex_unlock(&loop->wq_mutex);
24
25   return 0;
26 }

至此,一个线程池的组成以及实现原理都说完了,可以看到,libuv几乎是用了最少的代码完成了高效的线程池,这对于我们平时写代码时具有很好的借鉴意义,文中涉及到uv_req_t以及uv_loop_t等结构我都直接跳过,因为这牵扯到libuv的其他组件,我将在以后的源码剖析中逐步阐述,谢谢你能看到这里。

时间: 2024-08-01 06:23:46

Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现的相关文章

Nodejs事件引擎libuv源码剖析之:请求(request)结构的设计剖析

声明:本文为原创博文,转载请注明出处.         在libuv中,请求(request)代表一个用户向libuv发出的指令,比如uv_connect_s就表示一个tcp的连接请求.uv_work_s代表要递交给libuv线程池执行的任务请求.uv_write_s代表一个写请求. 类似于上一篇讲句柄(handle)那样,请求也由一个抽象基类和相应的子类组成,这个基类就是uv_req_s,下面来看一下它的定义: 1 /* Abstract base class of all requests.

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe

高并发之——从源码角度分析创建线程池究竟有哪些方式

前言 在Java的高并发领域,线程池一直是一个绕不开的话题.有些童鞋一直在使用线程池,但是,对于如何创建线程池仅仅停留在使用Executors工具类的方式,那么,创建线程池究竟存在哪几种方式呢?就让我们一起从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池. 使用Executors工具类创建线程池 在创建线程池时,初学者用的最多的就是Executors 这个工具类,而使用这个工具类创建线程池时非常简单的,不需要关注太多的线程池细节,只需要传入必要的参数即可.Executors 工具类提供了

老李推荐:第6章5节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-事件

老李推荐:第6章5节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-事件 从网络过来的命令字串需要解析翻译出来,有些命令会在翻译好后直接执行然后返回,但有一大部分命令在翻译后需要转换成对应的事件,然后放入到命令队列里面等待执行.Monkey在取出一个事件执行的时候主要是执行其injectEvent方法来注入事件,而注入事件根据是否需要往系统注入事件分为两种: 需要通过系统服务往系统注入事件:如MonkeyKeyEvent事件会通过系统的InputManager往系