简单线程池的实现

1. 什么是线程池

线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务。如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可。

线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销。一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为

1.     计算密集型任务;

2.     I/O密集型任务。

计算密集型任务是占用CPU资源的,很少被外界的事件打断,CPU的个数是一定的,所以并发数是一定的,因此线程个数等于CPU的个数时是最高效的。

I/O密集型任务意味着执行期间可能会被I/O中断,也就是说这个线程会被挂起,这时的线程个数应该大于CPU的个数。

线程池的本质是生产者与消费者模型的应用。生产者线程向任务队列中添加任务,一旦队列有任务到来,如果线程池有空闲线程,就唤醒空闲线程来执行任务,如果没有空闲线程,并且线程数没有达到阈值(线程池中线程的最大值),就创建新线程来执行任务。

当任务增加的时候能够动态的增加线程池中线程的数量,直到达到一个阈值。这个阈值就是线程池中线程的最大值。

当任务执行完毕时,能够动态的销毁线程池中的线程池。

2.线程池的实现

我在 Ubuntu 系统下用C语言写的程序(传送门:github),这是一个非常简单的线程池实现,代码量约300行,仅仅说明线程池的工作原理,我会文章最后给出扩展线程池的思路,使之成为一个拥有C/S架构,socket通信的线程池。

目前而言,用到的知识点就两个:

1.     pthread_mutex_t:互斥锁;

2.     pthread_cond_t:条件变量;

互斥锁和条件变量要配合使用,我在另一篇博客(点击打开链接)里给出了使用方法,有兴趣的童鞋可以去看一下~

我们的小线程池共有五个文件:

1.             condition.h:把互斥锁和条件变量组合在一起,形成一个条件结构体,condition.h就是这个结构体的声明;

2.             condition.c:与condition.h对应,定义了操作条件结构体的函数;

3.             threadpool.h:包含两个结构体,一个是线程控制块(TCB),另一个是线程池结构体,还有三个函数:初始化线程池、销毁线程池、向线程池中添加任务;

4.             threadpool.c:threadpool.h的实现;

5.             main.c:主函数;

除此之外,我使用autotool编译程序,因此还有两个脚本文件:

1.     makefile.am:定义文件之间的依赖关系;

2.     build.sh:编译脚本;

接下来让我们深入代码去理解线程池~

Condition.h,为了实现线程同步,普遍的做法是将互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)配合在一起使用,最好的做法就是让两者组合成一个结构体,pthread_mutex_t 和 pthread_cond_t 同属于 pthread.h 头文件:

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  [email protected]
******************************************************************/

#ifndef _CONDITION_H_
#define _CONDITION_H_

#include <pthread.h>

/* 将互斥锁和条件变量封装成一个结构体 */
typedef struct condition
{
   pthread_mutex_t pmutex ;
   pthread_cond_t pcond ;
} condition_t ;

/* 初始化结构体 */
int condition_init( condition_t* cond ) ;

/* 拿到结构体中的互斥锁 */
int condition_lock( condition_t* cond ) ;

/* 释放结构体中的互斥锁 */
int condition_unlock( condition_t* cond ) ;

/* 使消费者线程等待在条件变量上 */
int condition_wait( condition_t* cond ) ;

/* 使消费者线程等待在条件变量上,abstime:等待超时 */
int condition_timedwait( condition_t* cond, const struct timespec* abstime ) ;

/* 生产者线程通知等待在条件变量上的消费者线程 */
int condition_signal( condition_t* cond ) ;

/* 生产者线程向等待在条件变量上的消费者线程广播 */
int condition_broadcast( condition_t* cond ) ;

/* 销毁结构体 */
int condition_destroy( condition_t* cond ) ;

#endif

Condition.c,有8个针对条件结构体的函数,condition.c就是这8个函数的定义,很简单,都是转而调用pthread.h头文件中的库函数。

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  [email protected]
******************************************************************/

#include "condition.h"

int condition_init( condition_t* cond )
{
   int status ;
   if ( ( status = pthread_mutex_init( &cond->pmutex, NULL ) ) )
   {
      return status ;
   }

   if ( ( status = pthread_cond_init( &cond->pcond, NULL ) ) )
   {
      return status ;
   }

   return 0 ;
}

int condition_lock( condition_t* cond )
{
   return pthread_mutex_lock( &cond->pmutex) ;
}

int condition_unlock( condition_t* cond )
{
   return pthread_mutex_unlock( &cond->pmutex ) ;
}

int condition_wait( condition_t* cond )
{
   return pthread_cond_wait( &cond->pcond, &cond->pmutex ) ;
}

int condition_timedwait( condition_t* cond, const struct timespec* abstime )
{
   return pthread_cond_timedwait( &cond->pcond, &cond->pmutex, abstime ) ;
}

int condition_signal( condition_t* cond )
{
   return pthread_cond_signal( &cond->pcond ) ;
}

int condition_broadcast( condition_t* cond )
{
   return pthread_cond_broadcast( &cond->pcond ) ;
}

int condition_destroy( condition_t* cond )
{
   int status ;
   if ( ( status = pthread_mutex_destroy( &cond->pmutex ) ) )
   {
      return status ;
   }

   if ( ( status = pthread_cond_destroy( &cond->pcond ) ) )
   {
      return status ;
   }

   return 0 ;
}

Pthreadpool.h:包含两个结构体,线程控制块(TCB)task_t和线程池结构体threadpool,还声明了三个针对线程池的操作:初始化线程池、销毁线程池、向线程池中添加任务。

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  [email protected]
******************************************************************/

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include "condition.h"

/* 线程控制块(task control block),以单向链表的形式组织 TCB */
typedef struct task
{
   void *( *run ) ( void* arg ) ; // 线程的执行函数
   void *arg ; // 执行函数的参数
   struct tast* next ; // 指向下一个 TCB
} task_t ;

/* 线程池结构体 */
typedef struct threadpool
{
   condition_t ready ; // 条件变量结构体
   task_t* first ; // TCB 链表的头指针
   task_t* last ; // TCB 链表的尾指针
   int counter ; // TCB 的总数量
   int idle ; // 空闲 TCB 的个数
   int max_threads ; // 最大线程数量
   int quit ; // 线程池销毁标志
} threadpool_t ;

/* 初始化线程池 */
void threadpool_init( threadpool_t* pool, int threads ) ;

/* 向线程池中添加任务 */
void threadpool_add_task( threadpool_t* pool, void* ( *run )( void* arg ), void* arg ) ;

/* 销毁线程池 */
void threadpool_destroy( threadpool_t* pool ) ;

#endif

Threadpool.c,共四个函数,对线程池的操作。

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  [email protected]
******************************************************************/

#include "threadpool.h"
#include <errno.h>
#include <time.h>

/* 线程入口函数 */
void* thread_runtime( void* arg )
{
   struct timespec abstime ;
   int timeout ;

   // 拿到线程池对象
   threadpool_t* pool = ( threadpool_t* ) arg ;

   while ( 1 )
   {
      timeout = 0 ;

      /************************** 进入临界区 ***********************/

      condition_lock( &pool->ready ) ;

      // 空闲线程数加 1
      ++ pool->idle ;

      // 如果线程链表为空,而且线程池处于运行状态,那么线程就该等待任务的到来
      while ( pool->first == NULL && pool->quit == 0 )
      {
         printf( "thread 0x%x is waiting\n", (int)pthread_self() ) ;
         clock_gettime( CLOCK_REALTIME, &abstime ) ;
         abstime.tv_sec += 2 ;
         int status = condition_timedwait( &pool->ready, &abstime ) ;
         if ( status == ETIMEDOUT )
         {
            printf( "thread 0x%x is wait timed out\n", (int)pthread_self() ) ;
            timeout = 1 ;
            break ;
         }
      }

      // 如果线程等待超时
      if ( timeout && pool->first == NULL )
      {
         -- pool->counter ; // 那么线程数量减 1
         condition_unlock( &pool->ready ) ; // 释放互斥锁
         break ; // 跳出 while,注意,break 之后,线程入口函数执行完毕,线程将不复存在
      }

      // 线程获得任务
      -- pool->idle ; // 线程池空闲数减 1
      if ( pool->first != NULL )
      {
         task_t* t = pool->first ; // 从链表头部取出 TCB
         pool->first = t->next ; // 指向下一个 TCB

         // 执行任务需要一定时间,所以要先解锁
         // 以便生产者可以往链表中加入任务
         // 以及其他消费者可以等待任务
         condition_unlock( &pool->ready ) ;

         t->run( t->arg ) ; // 执行任务的回调函数

         free( t ) ; // 任务执行完毕,销毁 TCB

         condition_lock( &pool->ready ) ;
      } 

      // quit == 1 说明要销毁线程池
      if ( pool->quit && pool->first == NULL )
      {
         -- pool->counter ;
         if ( pool->counter == 0 )
         {
            condition_signal( &pool->ready ) ; // 唤醒等待在条件变量上的主线程
         }
         condition_unlock( &pool->ready ) ;
         break ;
      }

      condition_unlock( &pool->ready ) ;

      /************************** 退出临界区 ***********************/
   }

   return NULL ;
}

/* 初始化线程池 */
void threadpool_init( threadpool_t* pool, int threads )
{
   condition_init( &pool->ready ) ; // 初始化条件变量结构体
   pool->first = NULL ; // 设置线程链表头指针
   pool->last = NULL ; // 设置线程链表尾指针
   pool->counter = 0 ; // 设置线程池当前线程数
   pool->idle = 0 ; // 设置线程池当前空闲线程数
   pool->max_threads = threads ; // 设置线程池最大线程数
   pool->quit = 0 ; // quit = 0,线程池运行状态;quit = 1,线程池销毁状态
}

/* 向线程池中添加线程 */
void threadpool_add_task( threadpool_t* pool, void* (*run)( void* arg ), void* arg )
{
   task_t* newtask = ( task_t* ) malloc ( sizeof( task_t ) ) ; // 创建线程控制块
   newtask->run = run ; // 设置线程的回调函数
   newtask->arg = arg ; // 设置回调函数的参数
   newtask->next = NULL ; // 新加入的线程会被添加到链表尾部

   /************************** 进入临界区 ***********************/

   condition_lock( &pool->ready ) ; // 拿到互斥锁

   // 把新创建的 TCB 添加到线程链表中
   if ( pool->first == NULL )
   {
   	  // 如果线程链表为空,则 TCB 作为链表头部
      pool->first = newtask ;
   }
   else
   {
   	  // 如果线程链表不为空,加入到链表尾部
      pool->last->next = newtask ;
   }
   pool->last = newtask ; // 修改链表尾指针

   // 如果有空闲线程,那么就唤醒空闲线程
   if ( pool->idle > 0 )
   {
      condition_signal( &pool->ready ) ; // 通知等待在条件变量上的空闲线程
   }
   else if ( pool->counter < pool->max_threads )
   {
   	  // 如果没有空闲线程可用,而且当前线程数量小于线程池的容量,我们就创建一个线程
      pthread_t tid ;
      pthread_create( &tid, NULL, thread_runtime, pool ) ; // 指定新线程的起始函数为 thread_runtime,把线程池传递给 thread_runtime
      ++ pool->counter ;
   }

   condition_unlock( &pool->ready ) ; // 释放互斥锁

   /************************** 退出临界区 ***********************/
}

/* 销毁线程池 */
void threadpool_destroy( threadpool_t* pool )
{
   if ( pool->quit )
   {
      return ;
   } 

   /************************** 进入临界区 ***********************/

   condition_lock( &pool->ready ) ;

   // 设置退出标志为真
   pool->quit = 1 ;

   // 如果线程池中正在运行着线程,那么我们需要等待线程执行完毕再销毁
   if ( pool->counter > 0 )
   {
      if ( pool->idle > 0 )
      {
         condition_broadcast( &pool->ready ) ;
      }
      while ( pool->counter > 0 )
      {
         condition_wait( &pool->ready ) ; // 主线程(main 函数所在线程)将等待在条件变量上
      }
   }

   condition_unlock( &pool->ready ) ;

   /************************** 退出临界区 ***********************/

   // 销毁条件变量
   condition_destroy( &pool->ready ) ;
}

最后是主函数啦,main.c

/*******************************************************************
*  Copyright(c) 2016 Chen Gonghao
*  All rights reserved.
*
*  [email protected]
******************************************************************/

#include "threadpool.h"

/* 定义线程池最大线程数 */
#define MAX_POOL_SIZE	3

/* 每个任务的回调函数 */
void* mytask( void* arg )
{
   printf( " thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg ) ;
   sleep( 1 ) ;
   free( arg ) ;
   return NULL ;
}

int main( void )
{
   threadpool_t pool ; // 定义一个线程池变量
   threadpool_init( &pool, MAX_POOL_SIZE ) ; // 初始化线程池

   // 向线程池中添加 10 个任务,每个任务的处理函数都是 mytask
   for ( int i = 0; i < 10; ++ i )
   {
      int* arg = (int*)malloc( sizeof( int ) ) ;
      *arg = i ;
      threadpool_add_task( &pool, mytask, arg ) ;
   }

   threadpool_destroy( &pool ) ; // 销毁线程池

   return 0 ;
}

画个图来理解线程池的原理:

可以发现生产者的代码主要位于 thread_add_task()函数,消费者的代码主要位于 thread_runtime() 函数。

细心的童鞋也许发现了:thread_runtime()中只有while循环,三个消费者同时执行while 循环,不可能保证循环有序的从生产线上取任务,任务来临时如何保证三个消费者不发生争抢行为呢?这就是条件变量的作用了,当消费者发现生产线为空时,就依次睡在条件变量上,睡在条件变量上的关键代码如下:

int status = condition_timedwait( &pool->ready, &abstime ) ;

如果睡超时了就要自动销毁喔~

当任务来临时,生产者有义务通知睡眠的消费者,唤醒后者,起来嗨~,关键代码如下:

condition_signal(&pool->ready ) ; // 唤醒等待在条件变量上的主线程

执行结果如下:

我设置的线程池容量为 2,任务数是 10,因此有 2 个线程处理 10 个任务。

从上图可以发现,两个线程分别是0xaa1de700 和 0xaa9df700,这两个线程依次处理 0 ~ 9 号任务,没有任务处理时,便进入等待状态(is waiting),等待超时的结果就是自动销毁,然后主线程退出,程序结束。

3.扩展线程池

以上就是一个最最小的线程池实现了,这只是一个玩具,仅供了解线程池的实现。但是我们可以把小玩具扩展成一个有模有样的线程池:C/S架构、socket 通信,以下是扩展思路,所有扩展的代码都在:simple_thread_pool/extension/。

客户端:

1.     把 socket 封个包,也就是把原生 socket 封装成一个类,我已经做了这一步,代码在:simple_thread_pool/extension/ossSocket/;

2.     做一个命令工厂类(CommandFactory),根据客户端的输入,生产出一系列的命令对象;

3.     客户端类(Client),其中包含 socket 类和 CommandFactory;

服务器端:

服务器端基本上就是对本文介绍的小线程池的扩展。

1.在main函数中添加监听函数:TCPListener();

2.TCPListener()的代码主要由 threadpool_add_task() 和 threadpool_runtime()的代码构成;

很多基础软件,比如数据库、服务器,底层都跑着一个线程池,要是有时间的话,我会扩展这个小线程池,然后写博客介绍技术实现。

时间: 2024-10-20 15:42:50

简单线程池的实现的相关文章

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

LINUX下的简单线程池

前言 任何一种设计方式的引入都会带来额外的开支,是否使用,取决于能带来多大的好处和能带来多大的坏处,好处与坏处包括程序的性能.代码的可读性.代码的可维护性.程序的开发效率等. 线程池适用场合:任务比较多,需要拉起大量线程来处理:任务的处理时间相对比较短,按照线程的周期T1(创建阶段).T2(执行阶段).T3(销毁阶段)来算,执行阶段仅占用较少时间. 简单的线程池通常有以下功能:预创建一定数量的线程:管理线程任务,当工作线程没有事情可做时休眠自己:销毁线程池. 复杂一些的线程池有额外的调节功能:管

C++版简单线程池

需求 之前写过一个C#版本的简单线程池http://blog.csdn.net/ylbs110/article/details/51224979 由于刚刚学习了C++11新特性中的future,于是想到用它来实现一个线程池. 实现 思路基本和C#版本的一样,主要区别是委托的实现,线程句柄的不同和线程锁: 本来C++有function模板,但是实现起来比较麻烦,这里主要是实现线程池,所以动态参数的委托就不实现了,直接使用typedef void(*Func)();来实现一个无参数无返回值的函数指针

自实现简单线程池

线程池在现在的系统和框架中十分常见.明白线程池的思想原理,不仅对学习线程只是有很大的帮助.对理解一些系统的线程池实现也有很大的帮助.下面是我自己简单实现的一个线程池.用以对线程的简单理解. 线程的实现原理很简单: 线程池对象包含以下组件:工作者队列,Job队列: 用户通过线程池对象添加删除工作者,线程池对象维持工作者对象这个池和工作者的实际工作: 工作者池中的线程在用户没用明确关闭前不断的从Job队列拿取job执行job. 好了,一切看代码: 1.以接口编程,首先创建ThreadPool接口:

简单线程池原理和代码

线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务 本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理 代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了. 线程池代码见Github[点击] 由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明) std::

自己实现一个简单线程池

先上原理图: 上代码之前,要先补充一下线程池构造的核心几个点 线程池里的核心线程数与最大线程数 线程池里真正工作的线程worker 线程池里用来存取任务的队列BlockingQueue 线程中的任务task 本例实现简化了一些,只实现了BlockingQueue存放任务,然后每个worker取任务并执行,下面看代码首先定义一个线程池ThreadExcutor class ThreadExcutor{ //创建 private volatile boolean RUNNING = true; //

简单线程池实现 (C版本)

1. 概述 在谈到线程池之前,我们看看并发还有哪几种方式. 多进程,包括一次启动多个进程,然后在进程间传递描述符,或者连接来了以后fork子进程处理. 多线程,同理也可以像多进程一样,要么建立一堆放在那里,要么就有连接以后"即时建立,即时销毁" 我们看上面的方法并没有什么不妥,特殊的场景总是能够派上用场的. 比如一个并发量只有两位数甚至个位数,那么上面的这些方式应付起来也很轻松.但是如果要举一个极端的例子,十万的并发,首先不可能建立十万个进程等着这个并发,也不可能即时创建即时销毁.这些

c++简单线程池实现

线程池,简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行,下面是线程池的工作原理图: 我们为什么要使用线程池呢? 简单来说就是线程本身存在开销,我们利用多线程来进行任务处理,单线程也不能滥用,无止禁的开新线程会给系统产生大量