想做一个多线程服务器测试程序,因此参考了github的一些实例,然后自己动手写了类似来加深理解。
目前了解的线程池实现有2种思路:
第一种:
主进程创建一定数量的线程,并将其全部挂起,此时线程状态为idle,并将running态计数为0,等到任务可以执行了,就唤醒线程,此时线程状态为running,计数增加,如果计数达到最大线程数,就再创建一组空闲线程,等待新任务,上一组线程执行完退出,如此交替。
第二种:
采用生成者-消费者模式,主进程作为生成者,创建FIFO队列,在任务队列尾部添加任务,线程池作为消费者在队列头部取走任务执行,这之间有人会提到无锁环形队列,在单生成者单消费者的模式下是有效的,但是线程池肯定是多消费者同时去队列取任务,环形队列会造成死锁。
我的实例采用第二种方式实现,在某些应用场景下,允许一定时间内,任务排队的情况,重复利用已有线程会比较合适。
代码比较占篇幅,因此折叠在下面。
task.h:
1 #ifndef TASK_H 2 #define TASK_H 3 4 #include <list> 5 #include <pthread.h> 6 7 using std::list; 8 9 struct task { 10 void (*function) (void *); 11 void *arguments; 12 int id; 13 }; 14 15 struct work_queue { 16 work_queue(){ 17 pthread_mutex_init(&queue_lock, NULL); 18 pthread_mutex_init(&queue_read_lock, NULL); 19 pthread_cond_init(&queue_read_cond, NULL); 20 qlen = 0; 21 } 22 23 ~work_queue() { 24 queue.clear(); 25 pthread_mutex_destroy(&queue_read_lock); 26 pthread_mutex_destroy(&queue_lock); 27 pthread_cond_destroy(&queue_read_cond); 28 } 29 30 void push(task *tsk); 31 task *pull(); 32 void post(); 33 void wait(); 34 35 private: 36 int qlen; 37 list< task * > queue; 38 pthread_mutex_t queue_lock; 39 pthread_mutex_t queue_read_lock; 40 pthread_cond_t queue_read_cond; 41 }; 42 43 #endif
task.cpp
#include "task.h" void work_queue::push(task *tsk) { pthread_mutex_lock(&queue_lock); queue.push_back(tsk); qlen++; pthread_cond_signal(&queue_read_cond); pthread_mutex_unlock(&queue_lock); } task* work_queue::pull() { wait(); pthread_mutex_lock(&queue_lock); task* tsk = NULL; if (qlen > 0) { tsk = *(queue.begin()); queue.pop_front(); qlen--; if (qlen > 0) pthread_cond_signal(&queue_read_cond); } pthread_mutex_unlock(&queue_lock); return tsk; } void work_queue::post() { pthread_mutex_lock(&queue_read_lock); pthread_cond_broadcast(&queue_read_cond); pthread_mutex_unlock(&queue_read_lock); } void work_queue::wait() { pthread_mutex_lock(&queue_read_lock); pthread_cond_wait(&queue_read_cond, &queue_read_lock); pthread_mutex_unlock(&queue_read_lock); }
threadpool.h
1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3 4 #include "task.h" 5 #include <vector> 6 7 using std::vector; 8 9 #define safe_delete(p) if (p) { delete p; p = NULL; } 10 11 struct threadpool { 12 threadpool(int size) : pool_size(size) 13 , thread_list(size, pthread_t(0)) 14 , queue(NULL) 15 , finish(false) 16 , ready(0) { 17 18 pthread_mutex_init(&pool_lock, NULL); 19 } 20 21 ~threadpool() { 22 thread_list.clear(); 23 safe_delete(queue); 24 pthread_mutex_destroy(&pool_lock); 25 } 26 27 void init(); 28 void destroy(); 29 static void* thread_run(void *tp); 30 31 void incr_ready(); 32 void decr_ready(); 33 bool close() const; 34 35 work_queue *queue; 36 37 private: 38 int pool_size; 39 int ready; 40 bool finish; 41 pthread_mutex_t pool_lock; 42 vector <pthread_t> thread_list; 43 }; 44 45 46 #endif
threadpool.cpp
1 /* 2 * threadpool.cpp 3 * 4 * Created on: 2017年3月27日 5 * Author: Administrator 6 */ 7 8 #include "threadpool.h" 9 10 11 void* threadpool::thread_run(void *tp) { 12 threadpool *pool = (threadpool *) tp; 13 pool->incr_ready(); 14 15 while(1) { 16 task* tsk = pool->queue->pull(); 17 if (tsk) { 18 (tsk->function)(tsk->arguments); 19 delete tsk; 20 tsk = NULL; 21 } 22 23 if (pool->close()) 24 break; 25 } 26 27 pool->decr_ready(); 28 29 return NULL; 30 } 31 32 void threadpool::incr_ready() { 33 pthread_mutex_lock(&pool_lock); 34 ready++; 35 pthread_mutex_unlock(&pool_lock); 36 } 37 38 void threadpool::decr_ready() { 39 pthread_mutex_lock(&pool_lock); 40 ready--; 41 pthread_mutex_unlock(&pool_lock); 42 } 43 44 bool threadpool::close() const { 45 return finish; 46 } 47 48 void threadpool::init() { 49 queue = new work_queue; 50 if (!queue) { 51 return; 52 } 53 54 for(int i; i<pool_size; i++) { 55 pthread_create(&thread_list[i], NULL, threadpool::thread_run, (void *)this); 56 } 57 58 while(ready != pool_size) {} 59 } 60 61 void threadpool::destroy() { 62 finish = true; 63 64 while(ready) { 65 if(queue) { 66 queue->post(); 67 } 68 } 69 }
main.cpp
1 //============================================================================ 2 // Name : thread_pool.cpp 3 // Author : dancy 4 // Version : 5 // Copyright : Your copyright notice 6 // Description : Hello World in C++, Ansi-style 7 //============================================================================ 8 9 #include <iostream> 10 #include "threadpool.h" 11 #include <pthread.h> 12 #include <stdio.h> 13 #include <stdlib.h> 14 15 using namespace std; 16 17 void job(void *tsk){ 18 printf("job %-2d working on Thread #%u\n", ((task *)tsk)->id, (int)pthread_self()); 19 } 20 21 task *make_task(void (*func) (void *), int id) { 22 task *tsk = new task; 23 if (!tsk) 24 return NULL; 25 26 tsk->function = func; 27 tsk->arguments = (void *)tsk; 28 tsk->id = id; 29 30 return tsk; 31 } 32 33 int main() { 34 threadpool tp(4); 35 tp.init(); 36 37 for(int i=0; i<40; i++) 38 tp.queue->push(make_task(&job, i+1)); 39 40 tp.destroy(); 41 printf("all task has completed\n"); 42 return 0; 43 }
时间: 2024-10-20 16:26:47