1. 什么是线程池
线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务。如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可。
线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销。一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为
1. 计算密集型任务;
2. I/O密集型任务。
计算密集型任务是占用CPU资源的,很少被外界的事件打断,CPU的个数是一定的,所以并发数是一定的,因此线程个数等于CPU的个数时是最高效的。
I/O密集型任务意味着执行期间可能会被I/O中断,也就是说这个线程会被挂起,这时的线程个数应该大于CPU的个数。
线程池的本质是生产者与消费者模型的应用。生产者线程向任务队列中添加任务,一旦队列有任务到来,如果线程池有空闲线程,就唤醒空闲线程来执行任务,如果没有空闲线程,并且线程数没有达到阈值(线程池中线程的最大值),就创建新线程来执行任务。
当任务增加的时候能够动态的增加线程池中线程的数量,直到达到一个阈值。这个阈值就是线程池中线程的最大值。
当任务执行完毕时,能够动态的销毁线程池中的线程池。
2.线程池的实现
我在 Ubuntu 系统下用C语言写的程序(传送门:github),这是一个非常简单的线程池实现,代码量约300行,仅仅说明线程池的工作原理,我会文章最后给出扩展线程池的思路,使之成为一个拥有C/S架构,socket通信的线程池。
目前而言,用到的知识点就两个:
1. pthread_mutex_t:互斥锁;
2. pthread_cond_t:条件变量;
互斥锁和条件变量要配合使用,我在另一篇博客(点击打开链接)里给出了使用方法,有兴趣的童鞋可以去看一下~
我们的小线程池共有五个文件:
1. condition.h:把互斥锁和条件变量组合在一起,形成一个条件结构体,condition.h就是这个结构体的声明;
2. condition.c:与condition.h对应,定义了操作条件结构体的函数;
3. threadpool.h:包含两个结构体,一个是线程控制块(TCB),另一个是线程池结构体,还有三个函数:初始化线程池、销毁线程池、向线程池中添加任务;
4. threadpool.c:threadpool.h的实现;
5. main.c:主函数;
除此之外,我使用autotool编译程序,因此还有两个脚本文件:
1. makefile.am:定义文件之间的依赖关系;
2. build.sh:编译脚本;
接下来让我们深入代码去理解线程池~
Condition.h,为了实现线程同步,普遍的做法是将互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)配合在一起使用,最好的做法就是让两者组合成一个结构体,pthread_mutex_t 和 pthread_cond_t 同属于 pthread.h 头文件:
/******************************************************************* * Copyright(c) 2016 Chen Gonghao * All rights reserved. * * [email protected] ******************************************************************/ #ifndef _CONDITION_H_ #define _CONDITION_H_ #include <pthread.h> /* 将互斥锁和条件变量封装成一个结构体 */ typedef struct condition { pthread_mutex_t pmutex ; pthread_cond_t pcond ; } condition_t ; /* 初始化结构体 */ int condition_init( condition_t* cond ) ; /* 拿到结构体中的互斥锁 */ int condition_lock( condition_t* cond ) ; /* 释放结构体中的互斥锁 */ int condition_unlock( condition_t* cond ) ; /* 使消费者线程等待在条件变量上 */ int condition_wait( condition_t* cond ) ; /* 使消费者线程等待在条件变量上,abstime:等待超时 */ int condition_timedwait( condition_t* cond, const struct timespec* abstime ) ; /* 生产者线程通知等待在条件变量上的消费者线程 */ int condition_signal( condition_t* cond ) ; /* 生产者线程向等待在条件变量上的消费者线程广播 */ int condition_broadcast( condition_t* cond ) ; /* 销毁结构体 */ int condition_destroy( condition_t* cond ) ; #endif
Condition.c,有8个针对条件结构体的函数,condition.c就是这8个函数的定义,很简单,都是转而调用pthread.h头文件中的库函数。
/******************************************************************* * Copyright(c) 2016 Chen Gonghao * All rights reserved. * * [email protected] ******************************************************************/ #include "condition.h" int condition_init( condition_t* cond ) { int status ; if ( ( status = pthread_mutex_init( &cond->pmutex, NULL ) ) ) { return status ; } if ( ( status = pthread_cond_init( &cond->pcond, NULL ) ) ) { return status ; } return 0 ; } int condition_lock( condition_t* cond ) { return pthread_mutex_lock( &cond->pmutex) ; } int condition_unlock( condition_t* cond ) { return pthread_mutex_unlock( &cond->pmutex ) ; } int condition_wait( condition_t* cond ) { return pthread_cond_wait( &cond->pcond, &cond->pmutex ) ; } int condition_timedwait( condition_t* cond, const struct timespec* abstime ) { return pthread_cond_timedwait( &cond->pcond, &cond->pmutex, abstime ) ; } int condition_signal( condition_t* cond ) { return pthread_cond_signal( &cond->pcond ) ; } int condition_broadcast( condition_t* cond ) { return pthread_cond_broadcast( &cond->pcond ) ; } int condition_destroy( condition_t* cond ) { int status ; if ( ( status = pthread_mutex_destroy( &cond->pmutex ) ) ) { return status ; } if ( ( status = pthread_cond_destroy( &cond->pcond ) ) ) { return status ; } return 0 ; }
Pthreadpool.h:包含两个结构体,线程控制块(TCB)task_t和线程池结构体threadpool,还声明了三个针对线程池的操作:初始化线程池、销毁线程池、向线程池中添加任务。
/******************************************************************* * Copyright(c) 2016 Chen Gonghao * All rights reserved. * * [email protected] ******************************************************************/ #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include "condition.h" /* 线程控制块(task control block),以单向链表的形式组织 TCB */ typedef struct task { void *( *run ) ( void* arg ) ; // 线程的执行函数 void *arg ; // 执行函数的参数 struct tast* next ; // 指向下一个 TCB } task_t ; /* 线程池结构体 */ typedef struct threadpool { condition_t ready ; // 条件变量结构体 task_t* first ; // TCB 链表的头指针 task_t* last ; // TCB 链表的尾指针 int counter ; // TCB 的总数量 int idle ; // 空闲 TCB 的个数 int max_threads ; // 最大线程数量 int quit ; // 线程池销毁标志 } threadpool_t ; /* 初始化线程池 */ void threadpool_init( threadpool_t* pool, int threads ) ; /* 向线程池中添加任务 */ void threadpool_add_task( threadpool_t* pool, void* ( *run )( void* arg ), void* arg ) ; /* 销毁线程池 */ void threadpool_destroy( threadpool_t* pool ) ; #endif
Threadpool.c,共四个函数,对线程池的操作。
/******************************************************************* * Copyright(c) 2016 Chen Gonghao * All rights reserved. * * [email protected] ******************************************************************/ #include "threadpool.h" #include <errno.h> #include <time.h> /* 线程入口函数 */ void* thread_runtime( void* arg ) { struct timespec abstime ; int timeout ; // 拿到线程池对象 threadpool_t* pool = ( threadpool_t* ) arg ; while ( 1 ) { timeout = 0 ; /************************** 进入临界区 ***********************/ condition_lock( &pool->ready ) ; // 空闲线程数加 1 ++ pool->idle ; // 如果线程链表为空,而且线程池处于运行状态,那么线程就该等待任务的到来 while ( pool->first == NULL && pool->quit == 0 ) { printf( "thread 0x%x is waiting\n", (int)pthread_self() ) ; clock_gettime( CLOCK_REALTIME, &abstime ) ; abstime.tv_sec += 2 ; int status = condition_timedwait( &pool->ready, &abstime ) ; if ( status == ETIMEDOUT ) { printf( "thread 0x%x is wait timed out\n", (int)pthread_self() ) ; timeout = 1 ; break ; } } // 如果线程等待超时 if ( timeout && pool->first == NULL ) { -- pool->counter ; // 那么线程数量减 1 condition_unlock( &pool->ready ) ; // 释放互斥锁 break ; // 跳出 while,注意,break 之后,线程入口函数执行完毕,线程将不复存在 } // 线程获得任务 -- pool->idle ; // 线程池空闲数减 1 if ( pool->first != NULL ) { task_t* t = pool->first ; // 从链表头部取出 TCB pool->first = t->next ; // 指向下一个 TCB // 执行任务需要一定时间,所以要先解锁 // 以便生产者可以往链表中加入任务 // 以及其他消费者可以等待任务 condition_unlock( &pool->ready ) ; t->run( t->arg ) ; // 执行任务的回调函数 free( t ) ; // 任务执行完毕,销毁 TCB condition_lock( &pool->ready ) ; } // quit == 1 说明要销毁线程池 if ( pool->quit && pool->first == NULL ) { -- pool->counter ; if ( pool->counter == 0 ) { condition_signal( &pool->ready ) ; // 唤醒等待在条件变量上的主线程 } condition_unlock( &pool->ready ) ; break ; } condition_unlock( &pool->ready ) ; /************************** 退出临界区 ***********************/ } return NULL ; } /* 初始化线程池 */ void threadpool_init( threadpool_t* pool, int threads ) { condition_init( &pool->ready ) ; // 初始化条件变量结构体 pool->first = NULL ; // 设置线程链表头指针 pool->last = NULL ; // 设置线程链表尾指针 pool->counter = 0 ; // 设置线程池当前线程数 pool->idle = 0 ; // 设置线程池当前空闲线程数 pool->max_threads = threads ; // 设置线程池最大线程数 pool->quit = 0 ; // quit = 0,线程池运行状态;quit = 1,线程池销毁状态 } /* 向线程池中添加线程 */ void threadpool_add_task( threadpool_t* pool, void* (*run)( void* arg ), void* arg ) { task_t* newtask = ( task_t* ) malloc ( sizeof( task_t ) ) ; // 创建线程控制块 newtask->run = run ; // 设置线程的回调函数 newtask->arg = arg ; // 设置回调函数的参数 newtask->next = NULL ; // 新加入的线程会被添加到链表尾部 /************************** 进入临界区 ***********************/ condition_lock( &pool->ready ) ; // 拿到互斥锁 // 把新创建的 TCB 添加到线程链表中 if ( pool->first == NULL ) { // 如果线程链表为空,则 TCB 作为链表头部 pool->first = newtask ; } else { // 如果线程链表不为空,加入到链表尾部 pool->last->next = newtask ; } pool->last = newtask ; // 修改链表尾指针 // 如果有空闲线程,那么就唤醒空闲线程 if ( pool->idle > 0 ) { condition_signal( &pool->ready ) ; // 通知等待在条件变量上的空闲线程 } else if ( pool->counter < pool->max_threads ) { // 如果没有空闲线程可用,而且当前线程数量小于线程池的容量,我们就创建一个线程 pthread_t tid ; pthread_create( &tid, NULL, thread_runtime, pool ) ; // 指定新线程的起始函数为 thread_runtime,把线程池传递给 thread_runtime ++ pool->counter ; } condition_unlock( &pool->ready ) ; // 释放互斥锁 /************************** 退出临界区 ***********************/ } /* 销毁线程池 */ void threadpool_destroy( threadpool_t* pool ) { if ( pool->quit ) { return ; } /************************** 进入临界区 ***********************/ condition_lock( &pool->ready ) ; // 设置退出标志为真 pool->quit = 1 ; // 如果线程池中正在运行着线程,那么我们需要等待线程执行完毕再销毁 if ( pool->counter > 0 ) { if ( pool->idle > 0 ) { condition_broadcast( &pool->ready ) ; } while ( pool->counter > 0 ) { condition_wait( &pool->ready ) ; // 主线程(main 函数所在线程)将等待在条件变量上 } } condition_unlock( &pool->ready ) ; /************************** 退出临界区 ***********************/ // 销毁条件变量 condition_destroy( &pool->ready ) ; }
最后是主函数啦,main.c
/******************************************************************* * Copyright(c) 2016 Chen Gonghao * All rights reserved. * * [email protected] ******************************************************************/ #include "threadpool.h" /* 定义线程池最大线程数 */ #define MAX_POOL_SIZE 3 /* 每个任务的回调函数 */ void* mytask( void* arg ) { printf( " thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg ) ; sleep( 1 ) ; free( arg ) ; return NULL ; } int main( void ) { threadpool_t pool ; // 定义一个线程池变量 threadpool_init( &pool, MAX_POOL_SIZE ) ; // 初始化线程池 // 向线程池中添加 10 个任务,每个任务的处理函数都是 mytask for ( int i = 0; i < 10; ++ i ) { int* arg = (int*)malloc( sizeof( int ) ) ; *arg = i ; threadpool_add_task( &pool, mytask, arg ) ; } threadpool_destroy( &pool ) ; // 销毁线程池 return 0 ; }
画个图来理解线程池的原理:
可以发现生产者的代码主要位于 thread_add_task()函数,消费者的代码主要位于 thread_runtime() 函数。
细心的童鞋也许发现了:thread_runtime()中只有while循环,三个消费者同时执行while 循环,不可能保证循环有序的从生产线上取任务,任务来临时如何保证三个消费者不发生争抢行为呢?这就是条件变量的作用了,当消费者发现生产线为空时,就依次睡在条件变量上,睡在条件变量上的关键代码如下:
int status = condition_timedwait( &pool->ready, &abstime ) ;
如果睡超时了就要自动销毁喔~
当任务来临时,生产者有义务通知睡眠的消费者,唤醒后者,起来嗨~,关键代码如下:
condition_signal(&pool->ready ) ; // 唤醒等待在条件变量上的主线程
执行结果如下:
我设置的线程池容量为 2,任务数是 10,因此有 2 个线程处理 10 个任务。
从上图可以发现,两个线程分别是0xaa1de700 和 0xaa9df700,这两个线程依次处理 0 ~ 9 号任务,没有任务处理时,便进入等待状态(is waiting),等待超时的结果就是自动销毁,然后主线程退出,程序结束。
3.扩展线程池
以上就是一个最最小的线程池实现了,这只是一个玩具,仅供了解线程池的实现。但是我们可以把小玩具扩展成一个有模有样的线程池:C/S架构、socket 通信,以下是扩展思路,所有扩展的代码都在:simple_thread_pool/extension/。
客户端:
1. 把 socket 封个包,也就是把原生 socket 封装成一个类,我已经做了这一步,代码在:simple_thread_pool/extension/ossSocket/;
2. 做一个命令工厂类(CommandFactory),根据客户端的输入,生产出一系列的命令对象;
3. 客户端类(Client),其中包含 socket 类和 CommandFactory;
服务器端:
服务器端基本上就是对本文介绍的小线程池的扩展。
1.在main函数中添加监听函数:TCPListener();
2.TCPListener()的代码主要由 threadpool_add_task() 和 threadpool_runtime()的代码构成;
很多基础软件,比如数据库、服务器,底层都跑着一个线程池,要是有时间的话,我会扩展这个小线程池,然后写博客介绍技术实现。