Linux简单线程池实现(带源码)

  这里给个线程池的实现代码,里面带有个应用小例子,方便学习使用,代码 GCC 编译可用。参照代码看下面介绍的线程池原理跟容易接受,百度云下载链接:

  http://pan.baidu.com/s/1i3zMHDV

一.线程池简介

  为什么使用线程池?

  目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即 时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,这笔开销将是不可忽略的。

  线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

二.线程池的结构

2.1. 线程池任务结点

  线程池任务结点用来保存用户投递过来的任务,并放入线程池中的线程执行。其结构名为worker_t,定义如下:

1 /*线程池任务结点*/
2 struct worker_t
3 {
4     void *(*process) (void *arg);    /**< 回调函数 */
5     int   paratype;                    /**< 函数类型(预留) */
6     void *arg;                        /**< 回调函数参数 */
7     struct worker_t *next;            /**< 连接下一个任务结点 */
8 };

2.2. 线程池控制器

  线程池控制器用来对线程池进行控制管理,包括任务的投递、线程池状态的更新与查询、线程池的销毁等。其结构名为CThread_pool_t,定义如下:

 1 /*线程池控制器*/
 2 struct CThread_pool_t
 3 {
 4     pthread_mutex_t queue_lock;    /**< 互斥锁 */
 5     pthread_cond_t queue_ready;    /**< 条件变量 */
 6
 7     worker_t *queue_head;        /**< 任务结点链表,保存所有投递的任务 */
 8     int shutdown;            /**< 线程池销毁标志,1 -> 销毁 */
 9     pthread_t *threadid;        /**< 线程ID */
10     int max_thread_num;        /**< 线程池可容纳的最大线程数 */
11     int current_pthread_num;    /**< 当前线程池存放的线程数 */
12     int current_pthread_task_num;    /**< 当前正在执行任务和已分配任务的线程数目和 */
13     int cur_queue_size;        /**< 当前等待队列的任务数目 */
14     int free_pthread_num;        /**< 线程池内允许存在的最大空闲线程数 */
15
16     /*向线程池投递任务*/
17     int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg);
18     /*向线程池投递任务,无空闲线程则阻塞*/
19     int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg);
20     /*获取线程池可容纳的最大线程数*/
21     int (*GetMaxThreadNum) (void *pthis);
22     /*获取线程池存放的线程数*/
23     int (*GetCurThreadNum) (void *pthis);
24     /*获取当前正在执行任务和已分配任务的线程数目和*/
25     int (*GetCurTaskThreadNum) (void *pthis);
26     /*获取线程池等待队列任务数*/
27     int (*GetCurTaskNum) (void *pthis);
28     /*销毁线程池*/
29     int (*Destruct) (void *pthis);
30 };

2.3. 线程池结构

  线程池的运行结构图如下:

  根据上图罗列几个注意的地方:

  (1)图中的线程池中的“空闲”和“执行”分别表示空闲线程和执行线程,空闲线程指在正在等待任务的线程,同样执行线程指正在执行任务的线程,两者是相互转换的。当用户投递任务过来则用空闲线程来执行该任务,且空闲线程状态转变为执行线程;当任务执行完后,执行线程状态转变为空闲线程;

  (2)创建线程池时,正常情况会先创建一定数量的线程,所有线程初始为空闲线程,线程阻塞等待用户投递任务;

  (3)用户投递的任务首先放入等待队列 queue_head 链表中,如果线程池中有空闲线程则放入空闲线程中执行,否则根据条件选择继续等待空闲线程或者新建一个线程来执行,新建的线程将放入线程池中;

  (4)执行的任务会从等待队列中脱离,并在任务执行完后释放任务结点worker_t;

三.线程池控制

  这里就线程池实现代码中的部分代码进行解释,请参照我上面给的线程池代码,里面的注释我标注的还是比较详细的。

3.1. 线程池创建

 1 CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num)
 2 {
 3     ......    //略
 4     for (i = 0; i < max_num; i++)
 5     {
 6         pool->current_pthread_num++;    /**< 当前池中的线程数 */
 7         pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool);    /**< 创建线程 */
 8         usleep(1000);
 9     }
10 }

  这里创建了 max_num 个线程 ThreadPoolRoutine( ),即上面提到的空闲线程。

3.2. 投递任务

 1 static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg)
 2 {
 3     ......    //略
 4     worker_t *newworker = (worker_t *) malloc (sizeof (worker_t));
 5     newworker->process     = process;    /**< 回调函数,在线程ThreadPoolRoutine()中执行 */
 6     newworker->arg         = arg;        /**< 回调函数参数 */
 7     newworker->next     = NULL;
 8
 9     pthread_mutex_lock(&(pool->queue_lock));
10     ......    //将任务结点放入等待队列,略
11     pool->cur_queue_size++;        /**< 等待队列加1 */
12
13     int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
14     if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num))
15     {/**< 如果没有空闲线程且池中当前线程数不超过可容纳最大线程 */
16         int current_pthread_num = pool->current_pthread_num;
17         pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t));     /**< 新增线程 */
18         pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
19         pool->current_pthread_num++;    /**< 当前池中线程总数加1 */
20
21         pool->current_pthread_task_num++;    /**< 分配任务的线程数加1 */
22         pthread_mutex_unlock (&(pool->queue_lock));
23         pthread_cond_signal (&(pool->queue_ready));    /**< 发送信号给1个处于条件阻塞等待状态的线程 */
24         return 0;
25     }
26
27     pool->current_pthread_task_num++;
28     pthread_mutex_unlock(&(pool->queue_lock));
29     pthread_cond_signal(&(pool->queue_ready));
30     return 0;
31 }

  投递任务时先创建一个任务结点保存回调函数和函数参数,并将任务结点放入等待队列中。代码第14行判断条件为真时,则进行新线程创建,realloc( )会在保存原始内存中的数据不变的基础上新增1个sizeof (pthread_t)大小的内存。之后更新 current_pthread_num 和 current_pthread_task_num ,并发送信号 pthread_cond_signal(&(pool->queue_ready)) 给一个处于条件阻塞等待状态的线程,即线程 ThreadPoolRoutine( )中的

pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)) 阻塞等待接收信号。重点讲互斥锁和条件变量:

  pthread_mutex_t queue_lock;   /**< 互斥锁 */

  pthread_cond_t queue_ready;   /**< 条件变量 */

  这两个变量是线程池实现里很重要的点,建议网上搜索资料学习,这里简要介绍先代码中会用到的相关函数功能:

  放张图吧,这样直观点,线程互斥锁的一些调用函数一般都有接触过,这里就不作介绍了。

3.3. 执行线程  

 1 static void * ThreadPoolRoutine (void *arg)
 2 {
 3     CThread_pool_t *pool = (CThread_pool_t *)arg;
 4     while (1)
 5     {
 6         pthread_mutex_lock (&(pool->queue_lock)); /**< 上锁, pthread_cond_wait()调用会解锁*/
 7
 8         while ((pool->cur_queue_size == 0) && (!pool->shutdown))    /**< 队列没有等待任务*/
 9         {
10             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));     /**< 条件锁阻塞等待条件信号*/
11         }
12         if (pool->shutdown)
13         {
14             pthread_mutex_unlock (&(pool->queue_lock));
15             pthread_exit (NULL);     /**< 释放线程 */
16         }
17
18         .....    //略
19         worker_t *worker     = pool->queue_head;    /**< 取等待队列任务结点头*/
20         pool->queue_head     = worker->next;     /**< 链表后移 */
21
22         pthread_mutex_unlock (&(pool->queue_lock));
23         (*(worker->process)) (worker->arg);     /**< 执行回调函数 */
24         pthread_mutex_lock (&(pool->queue_lock));
25
26         pool->current_pthread_task_num--;    /**< 函数执行结束 */
27         free (worker);     /**< 释放任务结点 */
28         worker = NULL;
29
30         if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num)
31         {
32             pthread_mutex_unlock (&(pool->queue_lock));
33             break;    /**< 当池中空闲线程超过 free_pthread_num则将线程释放回操作系统 */
34         }
35         pthread_mutex_unlock (&(pool->queue_lock));
36     }
37
38     pool->current_pthread_num--;    /**< 当前池中线程数减1 */
39     pthread_exit (NULL);    /**< 释放线程*/
40     return (void*)NULL;
41 }

  这个就是用来执行投递任务的线程,在初始创建线程时所有线程都全部阻塞在pthread_cond_wait( )处,此时的线程状态就为空闲线程,也就是线程被挂起;

  当收到信号并取得互斥锁时,表明有任务投递过来,则获取等待队列里的任务结点并开始执行回调函数;

  函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态;

3.4. 线程销毁

  线程销毁主要做的就是销毁线程和释放动态内存,自己看代码就懂了。

时间: 2024-10-29 19:09:47

Linux简单线程池实现(带源码)的相关文章

周期性线程池与主要源码解析

之前学习ThreadPool的使用以及源码剖析,并且从面试的角度去介绍知识点的解答.今天小强带来周期性线程池的使用和重点源码剖析. ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor:用来处理延时任务或定时任务 定时线程池类的类结构图 ScheduledThreadPoolExecutor接收ScheduleFutureTask类型的任务,是线程池调度任务的最小单位. 它采用DelayQueue存储等待的任务: 1.DelayQueue

JAVA线程池原理与源码分析

1.线程池常用接口介绍 1.1.Executor public interface Executor { void execute(Runnable command); } 执行提交的Runnable任务.其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程.池化线程或调用线程中执行,具体由Executor的实现者决定. 1.2.ExecutorService ExecutorService继承自Executor,下面挑几个方法介绍: 1.2.1.shutdown() vo

Java线程池ThreadPoolExector的源码分析

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率.压榨CPU处理能力.多条线路同时运行等都是强有力的杀手锏工具.线程是如此的重要,那么我们来思考这样一个问题.假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行

Java线程池及其底层源码实现分析

1.相关类 Executors  ExecutorService   Callable   ThreadPool     Future 2.相关接口 Executor Executor接口的使用: public class TestExecutor implements Executor{ @Override public void execute(Runnable command){ //调用execute方法常常传入runnable接口对象,开启线程 } } ExecutorService接

JDK线程池框架Executor源码阅读

Executor框架 Executor ExecutorService AbstractExecutorService ThreadPoolExecutor ThreadPoolExecutor继承AbstractExecutorService,是一个线程池的具体的实现 主要成员 1. ctl private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT

C#线程池操作演示源码

把开发过程中经常用到的一些代码段做个备份,下面代码内容是关于C#线程池操作演示的代码. static void Main(string[] args){ThreadPool.SetMaxThreads(1000, 1000);for (int i = 0; i < 10;i ){ThreadPool.QueueUserWorkItem(new WaitCallback(ShowMessage), string.Format("当前编号{0}",i));}Console.ReadL

线程池之ThreadPoolExecutor源码解析

1.变量 ThreadPoolExecutor先定义了这几个常量,初看时一脸懵逼,其实它就是用int的二进制高三位来表示线程池的状态, 先回顾一下位运算: <<’左移:右边空出的位置补0,其值相当于乘以2. ‘>>’右移:左边空出的位,如果是正数则补0,若为负数则补0或1,取决于所用的计算机系统OS X中补1.其值相当于除以2. 负数二进制由它的绝对值取反后加1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RU

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同