线程池模型

服务器线程模型分类

(1)按需生成(来一个连接生成一个线程)

(2)线程池(预先生成很多线程)

(3)Leader follower(LF)

线程池的作用:提高消息(任务)响应的实时性、提高任务执行的速度。

线程池的注意事项

(1)线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的话,线程数目与CPU 数

量相适合即可。如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。

(2)并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。

(3)线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。

线程池模型

        

一个典型的线程池,应该包括如下几个部分:

1、线程池管理器(ThreadPool),用于启动、停用,管理线程池

2、工作线程(WorkThread),线程池中的线程

3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行

4、请求队列(RequestQueue),用于存放和提取请求(实现获取请求与插入请求的互斥)

5、结果队列(ResultQueue),用于存储请求执行后返回的结果

线程池管理器:通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得任务,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。

例子:条件变量实现线程池模型(不带结果队列)

条件变量实现线程池模型基本原理:工作线程默认情况下是阻塞在 pthread_cond_wait() 系统调用下的,如果有任务到来,我们可用使用 pthread_cond_singal() 来唤醒一个处于阻塞状态的线程,这样这个线程就可以执行 mc_thread_pool_get_task() 来取得一个任务,并调用相应的回调函数。

typedef struct _thread_pool_t

{

pthread_mutex_t  queue_lock ;//任务锁

pthread_cond_t   task_cond  ;//条件变量

list_t         * tasks      ;// 任务队列

pthread_t      * pthreads   ;//线程池

int              isdestoried;

int              workersnum ;

char             ready      ;//指定当前是否存在任务(0,1),pthreads任务队列是否可用

thread_task_handler  thread_pool_task_handler;//回调函数,线程执行体=mc_thread_entry

}thread_pool_t;

void *thread_entry( void *args )

{

int fd = *(int *)args ;

do_handler_fd( fd );

}

void mc_thread_pool_ini( mc_thread_pool_t * par_tp , int workersnum ,thread_task_handler par_handler )  //创建线程池

{

int err ;

//par_tp = ( thread_pool_t *)malloc( sizeof(thread_pool_t) );

if( par_tp == NULL )

{

fprintf( stderr , "thread_pool_t malloc\n");

return  ;

}

par_tp->workersnum = workersnum ;

pthread_mutex_init( &par_tp->queue_lock ,NULL );

pthread_cond_init(&par_tp->task_cond , NULL );

/*

par_tp->queue_lock = PTHREAD_MUTEX_INITIALIZER ;

par_tp->task_cond  = PTHREAD_COND_INITIALIZER  ;

*/

par_tp->tasks = mc_listcreate() ;

if( par_tp->tasks == NULL )

{

fprintf( stderr , "listcreate() error\n");

//free( par_tp ) ;

return  ;

}

par_tp->pthreads = ( pthread_t *)malloc( sizeof( pthread_t )*workersnum );

if( par_tp->pthreads == NULL )

{

fprintf( stderr , "pthreads malloc\n");

//free( par_tp );

mc_freelist( par_tp->tasks ) ;

return NULL ;

}

int i = 0 ;

for( ; i < workersnum ; i++ )

{

fprintf(stderr,"start to create threads\n");

err = pthread_create(&(par_tp->pthreads[i]),NULL,mc_thread_entry,NULL) ;

if( err == -1 )

{

fprintf( stderr , "pthread_create error\n");

//free( par_tp );

mc_freelist( par_tp->tasks ) ;

free(par_tp->pthreads) ;

}

}

par_tp->thread_pool_task_handler = par_handler ;

par_tp->ready = 0 ;

fprintf(stderr,"successed to create threads\n");

}

static void *mc_thread_entry( void *args ) //工作线程

{

void * task ;

for(;;)

{

pthread_mutex_lock( &mc_global_threads_pool.queue_lock ) ;

fprintf(stderr, " locked to wait task\n");

while( mc_global_threads_pool.ready == 0 )

{

pthread_cond_wait( &mc_global_threads_pool.task_cond , &mc_global_threads_pool.queue_lock ) ;

}

task = mc_thread_pool_get_task() ;

fprintf(stderr, "get a task and ready to unlock \n");

pthread_mutex_unlock( &mc_global_threads_pool.queue_lock ) ;

mc_global_threads_pool.thread_pool_task_handler( task ) ;

}

}

void mc_thread_pool_add_task(void *task , size_t tasksize ) //写任务队列 {

pthread_mutex_lock( &mc_global_threads_pool.queue_lock );

fprintf( stderr ,"thread locked and append to list\n");

mc_list_append( mc_global_threads_pool.tasks , task , tasksize ) ;

pthread_mutex_unlock( &mc_global_threads_pool.queue_lock );

fprintf( stderr ,"thread unlocked and successed append to list\n");

mc_global_threads_pool.ready = 1 ;

if( mc_global_threads_pool.ready == 1 )

{

fprintf( stderr ,"signal to threads\n");

pthread_cond_signal( &mc_global_threads_pool.task_cond ) ;

}

}

void *mc_thread_pool_get_task()

{

void * ret_task ;

ret_task = mc_getnode_del( mc_global_threads_pool.tasks , 0 );

if( ret_task == NULL )

{

fprintf(stderr,"get node_del error\n");

}

fprintf( stderr ," got a task\n");

mc_global_threads_pool.ready = 0 ;

if( ret_task == NULL )

{

fprintf(stderr, "getnode_del error\n");

return NULL ;

}

else

return ret_task ;

}

int main()

{

mc_thread_task_t ltask;

ltask.task_num = 1 ;

fprintf(stderr,"begin to ini pool\n");

mc_thread_pool_ini( &mc_global_threads_pool , 20 , my_thread_task_handler );

mc_thread_pool_add_task( &ltask , sizeof(mc_thread_task_t) );

int i = 0 ;

for(;i < 10000; i++)

{

ltask.task_num = i ;

mc_thread_pool_add_task( &ltask , sizeof(mc_thread_task_t) );

sleep(1);

}

return 0;

}

时间: 2024-10-29 06:42:24

线程池模型的相关文章

浅谈服务器单I/O线程+工作者线程池模型架构及实现要点

转自 http://www.cnblogs.com/ccdev/p/3542669.html 单I/O线程+多工作者线程的模型,这也是最常用的一种服务器并发模型.我所在的项目中的server代码中,这种模型随处可见.它还有个名字,叫“半同步/半异步“模型,同时,这种模型也是生产者/消费者(尤其是多消费者)模型的一种表现. 这种架构主要是基于I/O多路复用的思想(主要是epoll,select/poll已过时),通过单线程I/O多路复用,可以达到高效并发,同时避免了多线程I/O来回切换的各种开销,

Floodlight controller 线程池模型

官方文档对于ThreadPool的描述是:ThreadPool is a Floodlight module wrapper for a Java's ScheduledExecutorService.  It can be used to have threads be run at specific times or periodically. 所以只要对并发编程有点基础,就很容易理解,它实现了俩接口:1)IThreadPoolService规范的是得到ScheduledExecutorSe

线程池与非线程池应用场景及模型对比分析

在网络编程中经常用到线程池和连接池,今天就对其中常用的线程池的基本应用场景和模型做个简单的对比分析. 1.  业务流程对比 a.  非线程池业务流模型: 上图标识了基本的非线程池的线程模型,前端1有多少连接则前端客户端2与前端服务器端3均需建立一对一的线程数进行响应的连接.前端服务器端3与后端服务器端4也需建立响应数目的线程进行连接处理相关业务. 当一个任务处理完毕后线程退出,在下一个任务到来的时候前端服务器端创建新的线程来处理新的任务. b.线程池模型: 上图标识了基本的线程池模型.前端客户端

Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现

声明:本文为原创博文,转载请注明出处. Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们.在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册.注销.通知功能).多路复用器(由操作系统提供,比如kqueue.select.epoll等).事件处理器(负责事件的处理)以及事件源(linux中这就是描述符)这四个组件.一般,会单独启动一个线程运行Reactor实例来

两种unix网络编程线程池的设计方法

unp27章节中的27.12中,我们的子线程是通过操作共享任务缓冲区,得到task的,也就是通过线程间共享的clifd[]数组,这个数组其实就是我们的任务数组,得到其中的connfd资源. 我们对这个任务数组的操作,需要互斥量+条件变量达到同步的目的..每个线程是无规律的从clifd得到任务,然后执行的.任务和线程之间没有对应关系.线程完成本次任务之后,如果任务数组中任然有任务,则再次运行下一个任务. 而另外的一个线程池模型中,pthread_create (&temp[i].tid, NULL

高效线程池(threadpool)的实现

高效线程池(threadpool)的实现 Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们.在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册.注销.通知功能).多路复用器(由操作系统提供,比如kqueue.select.epoll等).事件处理器(负责事件的处理)以及事件源(linux中这就是描述符)这四个组件.一般,会单独启动一个线程运行Reactor实例

简单实用可线上应用的线程池组件

0 前言 线程池的组件网上很多,之前我自己也尝试写个一个demo,但这些组件一般都比较简单,没有完整的实现后台线程池组件应用的功能.因此,这里我们实现一个可以用在线上环境的线程池组件,该线程池组件具备线程池应用的特性,如下所示: 1. 伸缩性:即线程池中线程的个数应该是动态变化的.繁忙的时候可以申请更多的线程:空闲的时候则注销一部分线程. 2. 线程状态:线程池中对线程的管理引入睡眠.唤醒机制.当线程没有任务在运行时,使线程处于睡眠状态. 3. 线程管理:对线程池中线程的申请和注销,不是通过创建

Linux统系统开发12 Socket API编程3 TCP状态转换 多路IO高并发select poll epoll udp组播 线程池

[本文谢绝转载原文来自http://990487026.blog.51cto.com] Linux统系统开发12 Socket API编程3 TCP状态转换 多路IO高并发select  poll  epoll udp组播 线程池 TCP 11种状态理解: 1,客户端正常发起关闭请求 2,客户端与服务端同时发起关闭请求 3,FIN_WAIT1直接转变TIME_WAIT 4,客户端接收来自服务器的关闭连接请求 多路IO转接服务器: select模型 poll模型 epoll模型 udp组播模型 线

Linux下线程池的理解与简单实现

首先,线程池是什么?顾名思义,就是把一堆开辟好的线程放在一个池子里统一管理,就是一个线程池. 其次,为什么要用线程池,难道来一个请求给它申请一个线程,请求处理完了释放线程不行么?也行,但是如果创建线程和销毁线程的时间比线程处理请求的时间长,而且请求很多的情况下,我们的CPU资源都浪费在了创建和销毁线程上了,所以这种方法的效率比较低,于是,我们可以将若干已经创建完成的线程放在一起统一管理,如果来了一个请求,我们从线程池中取出一个线程来处理,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的