Linux平台下线程池的原理及实现

转自:http://blog.csdn.net/lmh12506/article/details/7753952

前段时间在github上开了个库,准备实现自己的线程池的,因为换工作的事,一直也没有实现,参考这篇文章准备着手实现一下。

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。

下面是Linux系统下用C语言创建的一个线程池。线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。

pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中

  1. while (pool->cur_queue_size == 0)
  2. {
  3. pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
  4. }

表示如果任务链表中没有任务,则该线程出于阻塞等待状态。否则从队列中取出任务并执行。
   
   pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(如果有的话)。
   
   pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。

下面贴出完整代码

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <unistd.h>
  4. #include <sys/types.h>
  5. #include <pthread.h>
  6. #include <assert.h>
  7. /*
  8. *线程池里所有运行和等待的任务都是一个CThread_worker
  9. *由于所有任务都在链表里,所以是一个链表结构
  10. */
  11. typedef struct worker
  12. {
  13. /*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/
  14. void *(*process) (void *arg);
  15. void *arg;/*回调函数的参数*/
  16. struct worker *next;
  17. } CThread_worker;
  18. /*线程池结构*/
  19. typedef struct
  20. {
  21. pthread_mutex_t queue_lock;
  22. pthread_cond_t queue_ready;
  23. /*链表结构,线程池中所有等待任务*/
  24. CThread_worker *queue_head;
  25. /*是否销毁线程池*/
  26. int shutdown;
  27. pthread_t *threadid;
  28. /*线程池中允许的活动线程数目*/
  29. int max_thread_num;
  30. /*当前等待队列的任务数目*/
  31. int cur_queue_size;
  32. } CThread_pool;
  33. int pool_add_worker (void *(*process) (void *arg), void *arg);
  34. void *thread_routine (void *arg);
  35. static CThread_pool *pool = NULL;
  36. void
  37. pool_init (int max_thread_num)
  38. {
  39. pool = (CThread_pool *) malloc (sizeof (CThread_pool));
  40. pthread_mutex_init (&(pool->queue_lock), NULL);
  41. pthread_cond_init (&(pool->queue_ready), NULL);
  42. pool->queue_head = NULL;
  43. pool->max_thread_num = max_thread_num;
  44. pool->cur_queue_size = 0;
  45. pool->shutdown = 0;
  46. pool->threadid =
  47. (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
  48. int i = 0;
  49. for (i = 0; i < max_thread_num; i++)
  50. {
  51. pthread_create (&(pool->threadid[i]), NULL, thread_routine,
  52. NULL);
  53. }
  54. }
  55. /*向线程池中加入任务*/
  56. int
  57. pool_add_worker (void *(*process) (void *arg), void *arg)
  58. {
  59. /*构造一个新任务*/
  60. CThread_worker *newworker =
  61. (CThread_worker *) malloc (sizeof (CThread_worker));
  62. newworker->process = process;
  63. newworker->arg = arg;
  64. newworker->next = NULL;/*别忘置空*/
  65. pthread_mutex_lock (&(pool->queue_lock));
  66. /*将任务加入到等待队列中*/
  67. CThread_worker *member = pool->queue_head;
  68. if (member != NULL)
  69. {
  70. while (member->next != NULL)
  71. member = member->next;
  72. member->next = newworker;
  73. }
  74. else
  75. {
  76. pool->queue_head = newworker;
  77. }
  78. assert (pool->queue_head != NULL);
  79. pool->cur_queue_size++;
  80. pthread_mutex_unlock (&(pool->queue_lock));
  81. /*好了,等待队列中有任务了,唤醒一个等待线程;
  82. 注意如果所有线程都在忙碌,这句没有任何作用*/
  83. pthread_cond_signal (&(pool->queue_ready));
  84. return 0;
  85. }
  86. /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直
  87. 把任务运行完后再退出*/
  88. int
  89. pool_destroy ()
  90. {
  91. if (pool->shutdown)
  92. return -1;/*防止两次调用*/
  93. pool->shutdown = 1;
  94. /*唤醒所有等待线程,线程池要销毁了*/
  95. pthread_cond_broadcast (&(pool->queue_ready));
  96. /*阻塞等待线程退出,否则就成僵尸了*/
  97. int i;
  98. for (i = 0; i < pool->max_thread_num; i++)
  99. pthread_join (pool->threadid[i], NULL);
  100. free (pool->threadid);
  101. /*销毁等待队列*/
  102. CThread_worker *head = NULL;
  103. while (pool->queue_head != NULL)
  104. {
  105. head = pool->queue_head;
  106. pool->queue_head = pool->queue_head->next;
  107. free (head);
  108. }
  109. /*条件变量和互斥量也别忘了销毁*/
  110. pthread_mutex_destroy(&(pool->queue_lock));
  111. pthread_cond_destroy(&(pool->queue_ready));
  112. free (pool);
  113. /*销毁后指针置空是个好习惯*/
  114. pool=NULL;
  115. return 0;
  116. }
  117. void *
  118. thread_routine (void *arg)
  119. {
  120. printf ("starting thread 0x%x\n", pthread_self ());
  121. while (1)
  122. {
  123. pthread_mutex_lock (&(pool->queue_lock));
  124. /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
  125. pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
  126. while (pool->cur_queue_size == 0 && !pool->shutdown)
  127. {
  128. printf ("thread 0x%x is waiting\n", pthread_self ());
  129. pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
  130. }
  131. /*线程池要销毁了*/
  132. if (pool->shutdown)
  133. {
  134. /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
  135. pthread_mutex_unlock (&(pool->queue_lock));
  136. printf ("thread 0x%x will exit\n", pthread_self ());
  137. pthread_exit (NULL);
  138. }
  139. printf ("thread 0x%x is starting to work\n", pthread_self ());
  140. /*assert是调试的好帮手*/
  141. assert (pool->cur_queue_size != 0);
  142. assert (pool->queue_head != NULL);
  143. /*等待队列长度减去1,并取出链表中的头元素*/
  144. pool->cur_queue_size--;
  145. CThread_worker *worker = pool->queue_head;
  146. pool->queue_head = worker->next;
  147. pthread_mutex_unlock (&(pool->queue_lock));
  148. /*调用回调函数,执行任务*/
  149. (*(worker->process)) (worker->arg);
  150. free (worker);
  151. worker = NULL;
  152. }
  153. /*这一句应该是不可达的*/
  154. pthread_exit (NULL);
  155. }

下面是测试代码

  1. void *
  2. myprocess (void *arg)
  3. {
  4. printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
  5. sleep (1);/*休息一秒,延长任务的执行时间*/
  6. return NULL;
  7. }
  8. int
  9. main (int argc, char **argv)
  10. {
  11. pool_init (3);/*线程池中最多三个活动线程*/
  12. /*连续向池中投入10个任务*/
  13. int *workingnum = (int *) malloc (sizeof (int) * 10);
  14. int i;
  15. for (i = 0; i < 10; i++)
  16. {
  17. workingnum[i] = i;
  18. pool_add_worker (myprocess, &workingnum[i]);
  19. }
  20. /*等待所有任务完成*/
  21. sleep (5);
  22. /*销毁线程池*/
  23. pool_destroy ();
  24. free (workingnum);
  25. return 0;
  26. }

将上述所有代码放入threadpool.c文件中,
在Linux输入编译命令
$ gcc -o threadpool threadpool.c -lpthread

以下是运行结果
starting thread 0xb7df6b90
thread 0xb7df6b90 is waiting
starting thread 0xb75f5b90
thread 0xb75f5b90 is waiting
starting thread 0xb6df4b90
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 0
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 1
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 2
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 3
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 4
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 5
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 6
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 7
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 8
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 9
thread 0xb75f5b90 is waiting
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is waiting
thread 0xb75f5b90 will exit
thread 0xb6df4b90 will exit
thread 0xb7df6b90 will exit

时间: 2024-08-08 22:08:50

Linux平台下线程池的原理及实现的相关文章

Linux平台下线程同步,实现“生产者消费者问题”

(1)线程同步,实现"生产者消费者问题" 要求:缓冲区大小为20,生产者每次放一个产品,消费者每次取走一个产品:生产者和消费者至少2个. (2)代码如下: #include <stdio.h> #include <pthread.h> #include <unistd.h> #include <sched.h> void *producter_f (void *arg); /*生产者*/ void *consumer_f (void *a

linux平台下防火墙iptables原理(转)

原文地址:http://www.cnblogs.com/ggjucheng/archive/2012/08/19/2646466.html iptables简介 netfilter/iptables(简称为iptables)组成Linux平台下的包过滤防火墙,与大多数的Linux软件一样,这个包过滤防火墙是免费的,它可以代替昂贵的商业防火墙解决方案,完成封包过滤.封包重定向和网络地址转换(NAT)等功能. iptables基础 规则(rules)其实就是网络管理员预定义的条件,规则一般的定义为“

Java线程池的原理及几类线程池的介绍

刚刚研究了一下线程池,如果有不足之处,请大家不吝赐教,大家共同学习.共同交流. 在什么情况下使用线程池? 单个任务处理的时间比较短 将需处理的任务的数量大 使用线程池的好处: 减少在创建和销毁线程上所花的时间以及系统资源的开销 如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 线程池工作原理: 为什么要用线程池? 诸如 Web 服务器.数据库服务器.文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务.请求以某种方式到

Linux环境下线程消息同步的陷阱

我们程序中常常会使用到线程间的消息同步处理,比如以下一段伪码 var message = "": void func()  {   1. 启动线程Thread(该线程中填充message的内容):   2. 阻塞,直到等待到完成message填充的事件:   3. 处理message:   .... } void Thread()  {   1. 通过某种处理填充message:   2. 触发func中的阻塞事件: } 我们通常会使用条件变量来完成类似情况的线程同步处理 比如wind

Java 多线程:线程池实现原理

前言 我们都知道,所谓线程池,那么就是相当于有一个池子,线程就放在这个池子中进行重复利用,能够减去了线程的创建和销毁所带来的代价.但是这样并不能很好的解释线程池的原理,下面从代码的角度分析一下线程池的实现. 线程池的相关类 对于原理,在 Java 中,有几个接口,类 值得我们关注: Executor ExecutorService AbstractExecutorService ThreadPoolExecutor Executor public interface Executor {    

转:一个跨WINDOWS LINUX平台的线程类

来源:http://blog.csdn.net/dengxu11/article/details/7232681 继Windows下实现一个CThread封装类之后,这里我再实现一个跨WINDOWS LINUX平台的线程类 头文件 DXThread.h #ifndef __DXTHREAD_H__ #define __DXTHREAD_H__ #define DX_WINDOWS //在WINDOWS上就打开它 //#define DX_LINUX //在LINUX 上就打开它 #ifdef D

我眼中的java线程池实现原理

最近在看java线程池实现方面的源码,在此做个小结,因为网上关于线程池源码分析的博客挺多的,我也不打算重复造轮子啦,仅仅用纯语言描述的方式做做总结啦! 个人认为要想理解清楚java线程池实现原理,明白下面几个问题就可以了: (1):线程池存在哪些状态,这些状态之间是如何进行切换的呢? (2):线程池的种类有哪些? (3):创建线程池需要哪些参数,这些参数的具体含义是什么? (4):将任务添加到线程池之后运行流程? (5):线程池是怎么做到重用线程的呢? (6):线程池的关闭 首先回答第一个问题:

11 java 线程池 实现原理

一 关键类的实现 1 ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类. 下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: 1 public class ThreadPoolExecutor extends AbstractExecutorService {

Java的Executor框架和线程池实现原理

一,Java的Executor框架 1,Executor接口 public interface Executor { void execute(Runnable command); } Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法,它没有实现类只有另一个重要的子接口ExecutorService 2,ExecutorService接口 //继承自Executor接口 public interface ExecutorServic