前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个。跟大家分享,同时也整理整理思路。
对线程池的要求:
1.用于处理大量短暂的任务。
2.动态增加线程,直到达到最大允许的线程数量。
3.动态销毁线程。
线程池的实现类似于”消费者--生产者”模型:
用一个队列存放任务(仓库,缓存)
主线程添加任务(生产者生产任务)
新建线程函数执行任务(消费者执行任务)
由于任务队列是全部线程共享的,就涉及到同步问题。这里采用条件变量和互斥锁来实现。
--------------------------------------------------------condition.h----------------------------------------------------------
/* 在此线程池中互斥锁和条件变量都是配套使用,编写此头文件使用比较方便 在此线程池中保护共享数据的都是用这个头文件中的函数 */ #ifndef _CONDITION_H_ #define _CONDITION_H_ #include <pthread.h> //有互斥锁和条件变量的结构体condition_t typedef struct condition { pthread_cond_t pcond; pthread_mutex_t pmutex; } condition_t; int condition_init(condition_t *cond) { if(pthread_cond_init(&cond->pcond, NULL)) return 1; if(pthread_mutex_init(&cond->pmutex, NULL)) return 1; return 0; }; int condition_lock(condition_t *cond) { return pthread_mutex_lock(&cond->pmutex); } int condition_unlock(condition_t *cond) { return pthread_mutex_unlock(&cond->pmutex); } int condition_wait(condition_t *cond) { return pthread_cond_wait(&cond->pcond, &cond->pmutex); } int condition_timedwait(condition_t *cond, const struct timespec *abstime) { return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime); } int condition_signal(condition_t *cond) { return pthread_cond_signal(&cond->pcond); } int condition_broadcast(condition_t *cond) { return pthread_cond_broadcast(&cond->pcond); } int condition_destroy(condition_t *cond) { if(pthread_cond_destroy(&cond->pcond)) return 1; if(pthread_mutex_destroy(&cond->pmutex)) return 1; return 0; } #endif
-------------------------------------------------------threadpool.h--------------------------------------------------------
//线程池头文件 #ifndef _THREADPOOL_H_ #define _THREADPOOL_H_ #include "condition.h" #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/time.h> #include <time.h> //线程任务结构体(队列形式存储) typedef struct Task { void* (*run) (void*arg); void *arg; struct Task *next; } task_t; //线程池结构体 typedef struct threadpool { condition_t ready; //互斥锁与条件变量 task_t *first; //任务队列头 task_t *last; //任务队列尾 int counter; //任务总数 int maxthread; //最大线程数 int idle; //正在等待的线程数量 int quit; //销毁线程池标志 } threadpool_t; //初始化线程池 void threadpool_init(threadpool_t *pool, int max_num) { condition_init(&pool->ready); pool->first = NULL; pool->last = NULL; pool->counter = 0; pool->maxthread = max_num; pool->idle = 0; pool->quit = 0; } //新建线程函数 void *thread_routine(void *arg) { struct timespec abstime; int timeout; //是否超过等待任务时间 threadpool_t *pool = (threadpool_t*)arg; printf("0x%0x thread is starting\n", (int)pthread_self()); //让线程不会因为结束任务立刻销毁 for(;;) { timeout = 0; condition_lock(&pool->ready); //对全局变量任务队列进行操作,要加锁 pool->idle ++; //等待任务队列中有任务 while(pool->first == NULL && pool->quit == 0) { printf("0x%0x thread is waiting\n", (int)pthread_self()); clock_gettime(CLOCK_REALTIME, &abstime); abstime.tv_sec += 2; //设置等待超时时间 int status = condition_timedwait(&pool->ready, &abstime); if(status == 110) //timewait函数超时返回110(TIMEDOUT) { printf("0x%0x thread is timeout\n", (int)pthread_self()); timeout = 1; break; } } //正在等待的线程数量减一 pool->idle --; //若任务队列中有任务 if(pool->first != NULL) { task_t *t = pool->first; pool->first = pool->first->next; //先解锁再执行任务函数,让其它线程可以对任务队列进行操作,提高效率 condition_unlock(&pool->ready); t->run(t->arg); free(t); //任务执行完毕,重新上锁 condition_lock(&pool->ready); } //有销毁线程池命令 并且 线程队列为空 if(pool->first == NULL && pool->quit == 1) { pool->counter--; //减掉一个任务 if(pool->counter == 0) //任务全部完成,向销毁函数发起通知 condition_signal(&pool->ready); condition_unlock(&pool->ready); //不要忘记解锁 break; } if(pool->first == NULL && timeout == 1) //等待任务超时 { pool->counter--; //减掉一个任务 condition_unlock(&pool->ready); //同样不要忘记解锁 break; } condition_unlock(&pool->ready); } printf("0x%0x threadpool is exiting\n", (int)pthread_self()); return NULL; } //向任务队列中添加任务 void threadpool_add_task(threadpool_t *pool, void* (*run) (void*arg), void*arg) { //动态分配空间给新任务 task_t *newtask = (task_t*)malloc(sizeof(task_t)); newtask->run = run; newtask->arg = arg; newtask->next = NULL; //要对任务队列进行操作,上锁 condition_lock(&pool->ready); //添加新任务到任务队列 if(pool->first == NULL) pool->first = newtask; else pool->last->next = newtask; pool->last = newtask; //如果有线程在等待,则不用创建新线程,直接发起通知处理任务 if(0 < pool->idle) condition_signal(&pool->ready); //当前线程数不能超过线程数,用<不用<=,因为counter初始为0 else if(pool->counter < pool->maxthread) { pthread_t thread; pthread_create(&thread, NULL, thread_routine, (void*)pool); pool->counter++; } condition_unlock(&pool->ready); } //销毁线程池 void threadpool_destroy(threadpool_t *pool) { if(pool->quit) //若已经销毁,直接返回,避免销毁两次 return; condition_lock(&pool->ready); pool->quit = 1; if(pool->counter > 0) { condition_broadcast(&pool->ready); //广播关闭正在等待的线程 if(pool->counter > 0) //若有正在进行的线程等待线程结束的通知 condition_wait(&pool->ready); } condition_unlock(&pool->ready); condition_destroy(&pool->ready); //销毁互斥锁和条件变量 } #endif
//测试代码(main函数): #include "threadpool.h" #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> void* mytask(void *arg) { printf("0x%0x thread run the task %d\n", (int)pthread_self(), *(int*)arg); free(arg); sleep(1); return NULL; } int main(int argc, char **argv) { threadpool_t pool; int i; threadpool_init(&pool, 4); for (i = 0; i < 10; ++i) { int *arg = (int *)malloc(sizeof(int)); *arg = i; threadpool_add_task(&pool, mytask, (void *)arg); } // sleep(15); threadpool_destroy(&pool); return 0; }
代码执行结果如下:
[[email protected] 线程池]$ ./main
0x335fe700 thread is starting
0x31dfb700 thread is starting
0x31dfb700 thread run the task 1
0x335fe700 thread run the task 0
0x32dfd700 thread is starting
0x32dfd700 thread run the task 2
0x325fc700 thread is starting
0x325fc700 thread run the task 3
0x31dfb700 thread run the task 4
0x335fe700 thread run the task 5
0x32dfd700 thread run the task 6
0x325fc700 thread run the task 7
0x335fe700 thread run the task 9
0x325fc700 threadpool is exiting
0x31dfb700 thread run the task 8
0x32dfd700 threadpool is exiting
0x335fe700 threadpool is exiting
0x31dfb700 threadpool is exiting
可以看到一共处理了0~9十个任务;
最大线程数量为4(0x335fe700,0x31dfb700, 0x32dfd700, 0x325fc700 );
等待每个线程都退出了才结束进程;
第一次写博客,希望能够在这里学到更多,也分享一些自己学习的心得体会,共勉互励。