服务器线程模型分类:
(1)按需生成(来一个连接生成一个线程)
(2)线程池(预先生成很多线程)
(3)Leader follower(LF)
线程池的作用:提高消息(任务)响应的实时性、提高任务执行的速度。
线程池的注意事项
(1)线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的话,线程数目与CPU 数
量相适合即可。如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。
(2)并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
(3)线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。
线程池模型
一个典型的线程池,应该包括如下几个部分:
1、线程池管理器(ThreadPool),用于启动、停用,管理线程池
2、工作线程(WorkThread),线程池中的线程
3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行
4、请求队列(RequestQueue),用于存放和提取请求(实现获取请求与插入请求的互斥)
5、结果队列(ResultQueue),用于存储请求执行后返回的结果
线程池管理器:通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得任务,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。
例子:条件变量实现线程池模型(不带结果队列)
条件变量实现线程池模型基本原理:工作线程默认情况下是阻塞在 pthread_cond_wait() 系统调用下的,如果有任务到来,我们可用使用 pthread_cond_singal() 来唤醒一个处于阻塞状态的线程,这样这个线程就可以执行 mc_thread_pool_get_task() 来取得一个任务,并调用相应的回调函数。
typedef struct _thread_pool_t
{
pthread_mutex_t queue_lock ;//任务锁
pthread_cond_t task_cond ;//条件变量
list_t * tasks ;// 任务队列
pthread_t * pthreads ;//线程池
int isdestoried;
int workersnum ;
char ready ;//指定当前是否存在任务(0,1),pthreads任务队列是否可用
thread_task_handler thread_pool_task_handler;//回调函数,线程执行体=mc_thread_entry
}thread_pool_t;
void *thread_entry( void *args )
{
int fd = *(int *)args ;
do_handler_fd( fd );
}
void mc_thread_pool_ini( mc_thread_pool_t * par_tp , int workersnum ,thread_task_handler par_handler ) //创建线程池
{
int err ;
//par_tp = ( thread_pool_t *)malloc( sizeof(thread_pool_t) );
if( par_tp == NULL )
{
fprintf( stderr , "thread_pool_t malloc\n");
return ;
}
par_tp->workersnum = workersnum ;
pthread_mutex_init( &par_tp->queue_lock ,NULL );
pthread_cond_init(&par_tp->task_cond , NULL );
/*
par_tp->queue_lock = PTHREAD_MUTEX_INITIALIZER ;
par_tp->task_cond = PTHREAD_COND_INITIALIZER ;
*/
par_tp->tasks = mc_listcreate() ;
if( par_tp->tasks == NULL )
{
fprintf( stderr , "listcreate() error\n");
//free( par_tp ) ;
return ;
}
par_tp->pthreads = ( pthread_t *)malloc( sizeof( pthread_t )*workersnum );
if( par_tp->pthreads == NULL )
{
fprintf( stderr , "pthreads malloc\n");
//free( par_tp );
mc_freelist( par_tp->tasks ) ;
return NULL ;
}
int i = 0 ;
for( ; i < workersnum ; i++ )
{
fprintf(stderr,"start to create threads\n");
err = pthread_create(&(par_tp->pthreads[i]),NULL,mc_thread_entry,NULL) ;
if( err == -1 )
{
fprintf( stderr , "pthread_create error\n");
//free( par_tp );
mc_freelist( par_tp->tasks ) ;
free(par_tp->pthreads) ;
}
}
par_tp->thread_pool_task_handler = par_handler ;
par_tp->ready = 0 ;
fprintf(stderr,"successed to create threads\n");
}
static void *mc_thread_entry( void *args ) //工作线程
{
void * task ;
for(;;)
{
pthread_mutex_lock( &mc_global_threads_pool.queue_lock ) ;
fprintf(stderr, " locked to wait task\n");
while( mc_global_threads_pool.ready == 0 )
{
pthread_cond_wait( &mc_global_threads_pool.task_cond , &mc_global_threads_pool.queue_lock ) ;
}
task = mc_thread_pool_get_task() ;
fprintf(stderr, "get a task and ready to unlock \n");
pthread_mutex_unlock( &mc_global_threads_pool.queue_lock ) ;
mc_global_threads_pool.thread_pool_task_handler( task ) ;
}
}
void mc_thread_pool_add_task(void *task , size_t tasksize ) //写任务队列 {
pthread_mutex_lock( &mc_global_threads_pool.queue_lock );
fprintf( stderr ,"thread locked and append to list\n");
mc_list_append( mc_global_threads_pool.tasks , task , tasksize ) ;
pthread_mutex_unlock( &mc_global_threads_pool.queue_lock );
fprintf( stderr ,"thread unlocked and successed append to list\n");
mc_global_threads_pool.ready = 1 ;
if( mc_global_threads_pool.ready == 1 )
{
fprintf( stderr ,"signal to threads\n");
pthread_cond_signal( &mc_global_threads_pool.task_cond ) ;
}
}
void *mc_thread_pool_get_task()
{
void * ret_task ;
ret_task = mc_getnode_del( mc_global_threads_pool.tasks , 0 );
if( ret_task == NULL )
{
fprintf(stderr,"get node_del error\n");
}
fprintf( stderr ," got a task\n");
mc_global_threads_pool.ready = 0 ;
if( ret_task == NULL )
{
fprintf(stderr, "getnode_del error\n");
return NULL ;
}
else
return ret_task ;
}
int main()
{
mc_thread_task_t ltask;
ltask.task_num = 1 ;
fprintf(stderr,"begin to ini pool\n");
mc_thread_pool_ini( &mc_global_threads_pool , 20 , my_thread_task_handler );
mc_thread_pool_add_task( <ask , sizeof(mc_thread_task_t) );
int i = 0 ;
for(;i < 10000; i++)
{
ltask.task_num = i ;
mc_thread_pool_add_task( <ask , sizeof(mc_thread_task_t) );
sleep(1);
}
return 0;
}