经典的线程池模型是一组线程抢一个资源链表的模型,程序启动了一组线程,让它们等待信号waitQ的到来。同时又初始化一个资源链表,当某个线程往资源链表中添加一个资源时,它同时使用信号通知线程池。线程池中的线程接到信号后,就从资源链表中取出资源进行处理。
接下来,我们先来观察一下用户空间线程池的创建过程吧!
1 int 2 init (xlator_t *this) 3 { 4 iot_conf_t *conf = NULL; 5 int ret = -1; 6 int i = 0; 7 8 if (!this->children || this->children->next) { 9 gf_log ("io-threads", GF_LOG_ERROR, 10 "FATAL: iot not configured with exactly one child"); 11 goto out; 12 } 13 14 if (!this->parents) { 15 gf_log (this->name, GF_LOG_WARNING, 16 "dangling volume. check volfile "); 17 } 18 19 conf = (void *) GF_CALLOC (1, sizeof (*conf), 20 gf_iot_mt_iot_conf_t); 21 if (conf == NULL) { 22 gf_log (this->name, GF_LOG_ERROR, 23 "out of memory"); 24 goto out; 25 } 26 27 if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) { 28 gf_log (this->name, GF_LOG_ERROR, 29 "pthread_cond_init failed (%d)", ret); 30 goto out; 31 } 32 33 if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) { 34 gf_log (this->name, GF_LOG_ERROR, 35 "pthread_mutex_init failed (%d)", ret); 36 goto out; 37 } 38 39 set_stack_size (conf); 40 41 GF_OPTION_INIT ("thread-count", conf->max_count, int32, out); 42 43 GF_OPTION_INIT ("high-prio-threads", 44 conf->ac_iot_limit[IOT_PRI_HI], int32, out); 45 46 GF_OPTION_INIT ("normal-prio-threads", 47 conf->ac_iot_limit[IOT_PRI_NORMAL], int32, out); 48 49 GF_OPTION_INIT ("low-prio-threads", 50 conf->ac_iot_limit[IOT_PRI_LO], int32, out); 51 52 GF_OPTION_INIT ("least-prio-threads", 53 conf->ac_iot_limit[IOT_PRI_LEAST], int32, out); 54 55 GF_OPTION_INIT ("idle-time", conf->idle_time, int32, out); 56 GF_OPTION_INIT ("enable-least-priority", conf->least_priority, 57 bool, out); 58 59 GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32, 60 out); 61 if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) { 62 gf_log (this->name, GF_LOG_ERROR, 63 "pthread_mutex_init failed (%d)", ret); 64 goto out; 65 } 66 67 conf->this = this; 68 69 for (i = 0; i < IOT_PRI_MAX; i++) { 70 INIT_LIST_HEAD (&conf->reqs[i]); 71 } 72 73 ret = iot_workers_scale (conf); 74 75 if (ret == -1) { 76 gf_log (this->name, GF_LOG_ERROR, 77 "cannot initialize worker threads, exiting init"); 78 goto out; 79 } 80 81 this->private = conf; 82 ret = 0; 83 out: 84 if (ret) 85 GF_FREE (conf); 86 87 return ret; 88 }
这是glusterfs中的io-threads xlator中的初始化部分,其中包含了线程的栈空间的初始化set_stack_size,启动的线程数量参数设定。通过调用iot_workers_scale启动线程。
1 int 2 iot_workers_scale (iot_conf_t *conf) 3 { 4 int ret = -1; 5 6 if (conf == NULL) { 7 ret = -EINVAL; 8 goto out; 9 } 10 11 pthread_mutex_lock (&conf->mutex); 12 { 13 ret = __iot_workers_scale (conf); 14 } 15 pthread_mutex_unlock (&conf->mutex); 16 17 out: 18 return ret; 19 }
iot_workers_scale先加锁,再调用__iot_workers_scale创建线程。
1 int 2 __iot_workers_scale (iot_conf_t *conf) 3 { 4 int scale = 0; 5 int diff = 0; 6 pthread_t thread; 7 int ret = 0; 8 int i = 0; 9 10 for (i = 0; i < IOT_PRI_MAX; i++) 11 scale += min (conf->queue_sizes[i], conf->ac_iot_limit[i]); 12 13 if (scale < IOT_MIN_THREADS) 14 scale = IOT_MIN_THREADS; 15 16 if (scale > conf->max_count) 17 scale = conf->max_count; 18 19 if (conf->curr_count < scale) { 20 diff = scale - conf->curr_count; 21 } 22 23 while (diff) { 24 diff --; 25 26 ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf); 27 if (ret == 0) { 28 conf->curr_count++; 29 gf_log (conf->this->name, GF_LOG_DEBUG, 30 "scaled threads to %d (queue_size=%d/%d)", 31 conf->curr_count, conf->queue_size, scale); 32 } else { 33 break; 34 } 35 } 36 37 return diff; 38 }
就这样,上面的一个while循环创建了所有线程,线程函数为iot_worker。接下来看看每个线程里面都是怎么工作的呢?
1 void * 2 iot_worker (void *data) 3 { 4 iot_conf_t *conf = NULL; 5 xlator_t *this = NULL; 6 call_stub_t *stub = NULL; 7 struct timespec sleep_till = {0, }; 8 int ret = 0; 9 int pri = -1; 10 char timeout = 0; 11 char bye = 0; 12 struct timespec sleep = {0,}; 13 14 conf = data; 15 this = conf->this; 16 THIS = this; 17 18 for (;;) { 19 sleep_till.tv_sec = time (NULL) + conf->idle_time; 20 21 pthread_mutex_lock (&conf->mutex); 22 { 23 if (pri != -1) { 24 conf->ac_iot_count[pri]--; 25 pri = -1; 26 } 27 while (conf->queue_size == 0) { 28 conf->sleep_count++; 29 30 ret = pthread_cond_timedwait (&conf->cond, 31 &conf->mutex, 32 &sleep_till); 33 conf->sleep_count--; 34 35 if (ret == ETIMEDOUT) { 36 timeout = 1; 37 break; 38 } 39 } 40 41 if (timeout) { 42 if (conf->curr_count > IOT_MIN_THREADS) { 43 conf->curr_count--; 44 bye = 1; 45 gf_log (conf->this->name, GF_LOG_DEBUG, 46 "timeout, terminated. conf->curr_count=%d", 47 conf->curr_count); 48 } else { 49 timeout = 0; 50 } 51 } 52 53 stub = __iot_dequeue (conf, &pri, &sleep); 54 if (!stub && (sleep.tv_sec || sleep.tv_nsec)) { 55 pthread_cond_timedwait(&conf->cond, 56 &conf->mutex, &sleep); 57 pthread_mutex_unlock(&conf->mutex); 58 continue; 59 } 60 } 61 pthread_mutex_unlock (&conf->mutex); 62 63 if (stub) /* guard against spurious wakeups */ 64 call_resume (stub); 65 66 if (bye) 67 break; 68 } 69 70 if (pri != -1) { 71 pthread_mutex_lock (&conf->mutex); 72 { 73 conf->ac_iot_count[pri]--; 74 } 75 pthread_mutex_unlock (&conf->mutex); 76 } 77 return NULL; 78 }
从上面代码30行可以看出,每个线程都在睡眠等待conf->cond条件变量,conf->mutex互斥锁会导致线程睡眠,而pthread_cond_timedwait接口是采用超时等待的机制。这里加入了超时多次后无任务时线程退出的机制。
线程通过从调用__iot_dequeue从资源链表中取出一个资源进行处理,call_resume是线程的处理方式。
接下来是资源添加的工作了:
glusterfs中的每个系统调用到了io-thread层都会转换成一个资源,每个资源都将交由线程池来处理。下面是open操作产生资源的处理过程。
1 int 2 iot_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, 3 fd_t *fd, dict_t *xdata) 4 { 5 call_stub_t *stub = NULL; 6 int ret = -1; 7 8 stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd, 9 xdata); 10 if (!stub) { 11 gf_log (this->name, GF_LOG_ERROR, 12 "cannot create open call stub" 13 "(out of memory)"); 14 ret = -ENOMEM; 15 goto out; 16 } 17 18 ret = iot_schedule (frame, this, stub); 19 20 out: 21 if (ret < 0) { 22 STACK_UNWIND_STRICT (open, frame, -1, -ret, NULL, NULL); 23 24 if (stub != NULL) { 25 call_stub_destroy (stub); 26 } 27 } 28 29 return 0; 30 }
先由fop_open_stub产生一个stub资源,通过调用iot_schedule()加入到资源链表中。
1 int 2 iot_schedule (call_frame_t *frame, xlator_t *this, call_stub_t *stub) 3 { 4 int ret = -1; 5 iot_pri_t pri = IOT_PRI_MAX - 1; 6 iot_conf_t *conf = this->private; 7 8 if ((frame->root->pid < GF_CLIENT_PID_MAX) && conf->least_priority) { 9 pri = IOT_PRI_LEAST; 10 goto out; 11 } 12 13 switch (stub->fop) { 14 case GF_FOP_OPEN: 15 case GF_FOP_STAT: 16 case GF_FOP_FSTAT: 17 case GF_FOP_LOOKUP: 18 case GF_FOP_ACCESS: 19 case GF_FOP_READLINK: 20 case GF_FOP_OPENDIR: 21 case GF_FOP_STATFS: 22 case GF_FOP_READDIR: 23 case GF_FOP_READDIRP: 24 pri = IOT_PRI_HI; 25 break; 26 27 case GF_FOP_CREATE: 28 case GF_FOP_FLUSH: 29 case GF_FOP_LK: 30 case GF_FOP_INODELK: 31 case GF_FOP_FINODELK: 32 case GF_FOP_ENTRYLK: 33 case GF_FOP_FENTRYLK: 34 case GF_FOP_UNLINK: 35 case GF_FOP_SETATTR: 36 case GF_FOP_FSETATTR: 37 case GF_FOP_MKNOD: 38 case GF_FOP_MKDIR: 39 case GF_FOP_RMDIR: 40 case GF_FOP_SYMLINK: 41 case GF_FOP_RENAME: 42 case GF_FOP_LINK: 43 case GF_FOP_SETXATTR: 44 case GF_FOP_GETXATTR: 45 case GF_FOP_FGETXATTR: 46 case GF_FOP_FSETXATTR: 47 case GF_FOP_REMOVEXATTR: 48 case GF_FOP_FREMOVEXATTR: 49 pri = IOT_PRI_NORMAL; 50 break; 51 52 case GF_FOP_READ: 53 case GF_FOP_WRITE: 54 case GF_FOP_FSYNC: 55 case GF_FOP_TRUNCATE: 56 case GF_FOP_FTRUNCATE: 57 case GF_FOP_FSYNCDIR: 58 case GF_FOP_XATTROP: 59 case GF_FOP_FXATTROP: 60 case GF_FOP_RCHECKSUM: 61 pri = IOT_PRI_LO; 62 break; 63 64 case GF_FOP_NULL: 65 case GF_FOP_FORGET: 66 case GF_FOP_RELEASE: 67 case GF_FOP_RELEASEDIR: 68 case GF_FOP_GETSPEC: 69 case GF_FOP_MAXVALUE: 70 //fail compilation on missing fop 71 //new fop must choose priority. 72 break; 73 } 74 out: 75 gf_log (this->name, GF_LOG_DEBUG, "%s scheduled as %s fop", 76 gf_fop_list[stub->fop], iot_get_pri_meaning (pri)); 77 ret = do_iot_schedule (this->private, stub, pri); 78 return ret; 79 }
上面对不同的操作接口分配给不同级别的资源链表。这里给资源分级是glusterfs业务处理所需要的。
1 int 2 do_iot_schedule (iot_conf_t *conf, call_stub_t *stub, int pri) 3 { 4 int ret = 0; 5 6 pthread_mutex_lock (&conf->mutex); 7 { 8 __iot_enqueue (conf, stub, pri); 9 10 pthread_cond_signal (&conf->cond); 11 12 ret = __iot_workers_scale (conf); 13 } 14 pthread_mutex_unlock (&conf->mutex); 15 16 return ret; 17 }
do_iot_schedule是添加资源的核心操作,它调用__iot_enqueue添加一个资源,通过发送信号通知线程过来取资源。由于ptrhead_cond_signal最多只能通知一次,并且只能通知一个线程,所有这里有类似随机取一个线程来处理的性质。由于使用了动态线程池处理的方式,调用__iot_workers_scale函数可以动态增加线程池的数量。
下面是内核中线程池的处理方式:
这里介绍LINUX SAN存储scst模块中的线程池处理的特点。
编写内核模块时,在创建线程池时,线程池中的线程数量一般是由CPU的核数来决定的。如下所示:
if (scst_threads == 0) scst_threads = scst_num_cpus; if (scst_threads < 1) { PRINT_ERROR("%s", "scst_threads can not be less than 1"); scst_threads = scst_num_cpus; }
scst模块中调用scst_start_global_threads来创建线程:
1 static int scst_start_global_threads(int num) 2 { 3 int res; 4 5 TRACE_ENTRY(); 6 7 mutex_lock(&scst_mutex); 8 9 res = scst_add_threads(&scst_main_cmd_threads, NULL, NULL, num); 10 if (res < 0) 11 goto out_unlock; 12 13 scst_init_cmd_thread = kthread_run(scst_init_thread, 14 NULL, "scst_initd"); 15 if (IS_ERR(scst_init_cmd_thread)) { 16 res = PTR_ERR(scst_init_cmd_thread); 17 PRINT_ERROR("kthread_create() for init cmd failed: %d", res); 18 scst_init_cmd_thread = NULL; 19 goto out_unlock; 20 } 21 22 scst_mgmt_cmd_thread = kthread_run(scst_tm_thread, 23 NULL, "scsi_tm"); 24 if (IS_ERR(scst_mgmt_cmd_thread)) { 25 res = PTR_ERR(scst_mgmt_cmd_thread); 26 PRINT_ERROR("kthread_create() for TM failed: %d", res); 27 scst_mgmt_cmd_thread = NULL; 28 goto out_unlock; 29 } 30 31 scst_mgmt_thread = kthread_run(scst_global_mgmt_thread, 32 NULL, "scst_mgmtd"); 33 if (IS_ERR(scst_mgmt_thread)) { 34 res = PTR_ERR(scst_mgmt_thread); 35 PRINT_ERROR("kthread_create() for mgmt failed: %d", res); 36 scst_mgmt_thread = NULL; 37 goto out_unlock; 38 } 39 40 out_unlock: 41 mutex_unlock(&scst_mutex); 42 43 TRACE_EXIT_RES(res); 44 return res; 45 }
第9行是创建线程池处理的地方。
1 int scst_add_threads(struct scst_cmd_threads *cmd_threads, 2 struct scst_device *dev, struct scst_tgt_dev *tgt_dev, int num) 3 { 4 int res = 0, i; 5 struct scst_cmd_thread_t *thr; 6 int n = 0, tgt_dev_num = 0; 7 8 TRACE_ENTRY(); 9 10 if (num == 0) { 11 res = 0; 12 goto out; 13 } 14 15 list_for_each_entry(thr, &cmd_threads->threads_list, thread_list_entry) { 16 n++; 17 } 18 19 TRACE_DBG("cmd_threads %p, dev %p, tgt_dev %p, num %d, n %d", 20 cmd_threads, dev, tgt_dev, num, n); 21 22 if (tgt_dev != NULL) { 23 struct scst_tgt_dev *t; 24 list_for_each_entry(t, &tgt_dev->dev->dev_tgt_dev_list, 25 dev_tgt_dev_list_entry) { 26 if (t == tgt_dev) 27 break; 28 tgt_dev_num++; 29 } 30 } 31 32 for (i = 0; i < num; i++) { 33 thr = kmalloc(sizeof(*thr), GFP_KERNEL); 34 if (!thr) { 35 res = -ENOMEM; 36 PRINT_ERROR("Fail to allocate thr %d", res); 37 goto out_wait; 38 } 39 40 if (dev != NULL) { 41 char nm[14]; /* to limit the name‘s len */ 42 strlcpy(nm, dev->virt_name, ARRAY_SIZE(nm)); 43 thr->cmd_thread = kthread_create(scst_cmd_thread, 44 cmd_threads, "%s%d", nm, n++); 45 } else if (tgt_dev != NULL) { 46 char nm[11]; /* to limit the name‘s len */ 47 strlcpy(nm, tgt_dev->dev->virt_name, ARRAY_SIZE(nm)); 48 thr->cmd_thread = kthread_create(scst_cmd_thread, 49 cmd_threads, "%s%d_%d", nm, tgt_dev_num, n++); 50 } else 51 thr->cmd_thread = kthread_create(scst_cmd_thread, 52 cmd_threads, "scstd%d", n++); 53 54 if (IS_ERR(thr->cmd_thread)) { 55 res = PTR_ERR(thr->cmd_thread); 56 PRINT_ERROR("kthread_create() failed: %d", res); 57 kfree(thr); 58 goto out_wait; 59 } 60 61 list_add(&thr->thread_list_entry, &cmd_threads->threads_list); 62 cmd_threads->nr_threads++; 63 64 TRACE_DBG("Added thr %p to threads list (nr_threads %d, n %d)", 65 thr, cmd_threads->nr_threads, n); 66 67 wake_up_process(thr->cmd_thread); 68 } 69 70 out_wait: 71 if (i > 0 && cmd_threads != &scst_main_cmd_threads) { 72 /* 73 * Wait for io_context gets initialized to avoid possible races 74 * for it from the sharing it tgt_devs. 75 */ 76 while (!*(volatile bool*)&cmd_threads->io_context_ready) { 77 TRACE_DBG("Waiting for io_context for cmd_threads %p " 78 "initialized", cmd_threads); 79 msleep(50); 80 } 81 } 82 83 if (res != 0) 84 scst_del_threads(cmd_threads, i); 85 86 out: 87 TRACE_EXIT_RES(res); 88 return res; 89 }
内核中创建线程的方法与用户空间稍有不同,它先调用kthread_create创建一个内核线程数据结构,再调用wake_up_process来唤醒线程。当然,也可以像pthread_create一样直接调用kthread_run来创建并执行线程。线程的执行函数为scst_cmd_thread。
1 int scst_cmd_thread(void *arg) 2 { 3 struct scst_cmd_threads *p_cmd_threads = arg; 4 5 TRACE_ENTRY(); 6 7 PRINT_INFO("Processing thread %s (PID %d) started", current->comm, 8 current->pid); 9 10 #if 0 11 set_user_nice(current, 10); 12 #endif 13 current->flags |= PF_NOFREEZE; 14 15 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25) 16 mutex_lock(&p_cmd_threads->io_context_mutex); 17 18 WARN_ON(current->io_context); 19 20 if (p_cmd_threads != &scst_main_cmd_threads) { 21 /* 22 * For linked IO contexts io_context might be not NULL while 23 * io_context 0. 24 */ 25 if (p_cmd_threads->io_context == NULL) { 26 p_cmd_threads->io_context = get_io_context(GFP_KERNEL, -1); 27 TRACE_MGMT_DBG("Alloced new IO context %p " 28 "(p_cmd_threads %p)", 29 p_cmd_threads->io_context, 30 p_cmd_threads); 31 /* 32 * Put the extra reference created by get_io_context() 33 * because we don‘t need it. 34 */ 35 put_io_context(p_cmd_threads->io_context); 36 } else { 37 current->io_context = ioc_task_link(p_cmd_threads->io_context); 38 TRACE_MGMT_DBG("Linked IO context %p " 39 "(p_cmd_threads %p)", p_cmd_threads->io_context, 40 p_cmd_threads); 41 } 42 p_cmd_threads->io_context_refcnt++; 43 } 44 45 mutex_unlock(&p_cmd_threads->io_context_mutex); 46 #endif 47 48 p_cmd_threads->io_context_ready = true; 49 50 spin_lock_irq(&p_cmd_threads->cmd_list_lock); 51 while (!kthread_should_stop()) { 52 wait_queue_t wait; 53 init_waitqueue_entry(&wait, current); 54 55 if (!test_cmd_threads(p_cmd_threads)) { 56 add_wait_queue_exclusive_head( 57 &p_cmd_threads->cmd_list_waitQ, 58 &wait); 59 for (;;) { 60 set_current_state(TASK_INTERRUPTIBLE); 61 if (test_cmd_threads(p_cmd_threads)) 62 break; 63 spin_unlock_irq(&p_cmd_threads->cmd_list_lock); 64 schedule(); 65 spin_lock_irq(&p_cmd_threads->cmd_list_lock); 66 } 67 set_current_state(TASK_RUNNING); 68 remove_wait_queue(&p_cmd_threads->cmd_list_waitQ, &wait); 69 } 70 71 if (tm_dbg_is_release()) { 72 spin_unlock_irq(&p_cmd_threads->cmd_list_lock); 73 tm_dbg_check_released_cmds(); 74 spin_lock_irq(&p_cmd_threads->cmd_list_lock); 75 } 76 77 scst_do_job_active(&p_cmd_threads->active_cmd_list, 78 &p_cmd_threads->cmd_list_lock, false); 79 } 80 spin_unlock_irq(&p_cmd_threads->cmd_list_lock); 81 82 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25) 83 if (p_cmd_threads != &scst_main_cmd_threads) { 84 mutex_lock(&p_cmd_threads->io_context_mutex); 85 if (--p_cmd_threads->io_context_refcnt == 0) 86 p_cmd_threads->io_context = NULL; 87 mutex_unlock(&p_cmd_threads->io_context_mutex); 88 } 89 #endif 90 91 PRINT_INFO("Processing thread %s (PID %d) finished", current->comm, 92 current->pid); 93 94 TRACE_EXIT(); 95 return 0; 96 }
内核中要处理的东西比较多,如线程上下文,中断上下文,以及要考虑一些地方不能休眠的问题,代码写起来比较复杂。这里主要看的是50-80行,50行对资源链表加锁,51行检测是否停止线程,52行定义一个wait_queue_t类型的变量wait,这个结构相当于用户空间的线程信号量,53行对这个信号量进行了初始化,初始化的工作是与本线程绑定并注册通知函数(通知函数是使内核调度策略调度唤醒本线程),56-58行把信号量加入到资源唤醒信号量队列中。60行打开中断,63行解锁资源,64行将CPU让出来,等待资源到来通知本线程时才会被唤醒,唤醒后会执行77行的函数。
再看一下这个线程是如何被唤醒的呢?
1 spin_lock(&cmd->cmd_threads->cmd_list_lock); 2 TRACE_MGMT_DBG("Adding cmd %p to active cmd list", cmd); 3 if (unlikely(cmd->queue_type == SCST_CMD_QUEUE_HEAD_OF_QUEUE)) 4 list_add(&cmd->cmd_list_entry, 5 &cmd->cmd_threads->active_cmd_list); 6 else 7 list_add_tail(&cmd->cmd_list_entry, 8 &cmd->cmd_threads->active_cmd_list); 9 wake_up(&cmd->cmd_threads->cmd_list_waitQ); 10 spin_unlock(&cmd->cmd_threads->cmd_list_lock);
此处只列出了相关和片段。第一行对资源加锁,第4-8行将资源加入链表,第9行唤醒工作线程去处理。第10行解锁资源。
经典的线程池--用户空间与内核空间实现的对比