线程池简介
简易线程池实现
线程池头文件threadpool.h如下:
1 #ifndef THREADPOOL_H 2 #define THREADPOOL_H 3 4 #include <stdio.h> 5 #include <stdlib.h> 6 #include <unistd.h> 7 #include <pthread.h> 8 9 /** 10 * 线程体数据结构 11 */ 12 typedef struct runner 13 { 14 void(*callback)(void* arg); // 回调函数指针 15 void* arg; // 回调函数的参数 16 struct runner* next; 17 } thread_runner; 18 19 /** 20 * 线程池数据结构 21 */ 22 typedef struct 23 { 24 pthread_mutex_t mutex; // 互斥量 25 pthread_cond_t cond; // 条件变量 26 thread_runner* runner_head; // 线程池中所有等待任务的头指针 27 thread_runner* runner_tail; // 线程池所有等待任务的尾指针 28 int shutdown; // 线程池是否销毁 29 pthread_t* threads; // 所有线程 30 int max_thread_size; // 线程池中允许的活动线程数目 31 } thread_pool; 32 33 /** 34 * 线程体 35 */ 36 void run(void *arg); 37 38 /** 39 * 初始化线程池 40 * 参数: 41 * pool:指向线程池结构有效地址的动态指针 42 * max_thread_size:最大的线程数 43 */ 44 void threadpool_init(thread_pool* pool, int max_thread_size); 45 46 /** 47 * 向线程池加入任务 48 * 参数: 49 * pool:指向线程池结构有效地址的动态指针 50 * callback:线程回调函数 51 * arg:回调函数参数 52 */ 53 void threadpool_add_runner(thread_pool* pool, void(*callback)(void *arg), void *arg); 54 55 /** 56 * 销毁线程池 57 * 参数: 58 * ppool:指向线程池结构有效地址的动态指针地址(二级指针),销毁后释放内存,该指针为NULL 59 */ 60 void threadpool_destroy(thread_pool** ppool); 61 62 #endif
线程池实现文件threadpool.c如下:
1 #include "threadpool.h" 2 3 #define DEBUG 1 4 5 /** 6 * 初始化线程池 7 * 参数: 8 * pool:指向线程池结构有效地址的动态指针 9 * max_thread_size:最大的线程数 10 */ 11 void threadpool_init(thread_pool* pool, int max_thread_size) 12 { 13 // 初始化互斥量 14 pthread_mutex_init(&(pool->mutex), NULL); 15 // 初始化条件变量 16 pthread_cond_init(&(pool->cond), NULL); 17 pool->runner_head = NULL; 18 pool->runner_tail = NULL; 19 pool->max_thread_size = max_thread_size; 20 pool->shutdown = 0; 21 22 // 创建所有分离态线程(即创建线程池) 23 pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t)); 24 int i = 0; 25 for (i = 0; i < max_thread_size; i++) 26 { 27 pthread_attr_t attr; 28 pthread_attr_init(&attr); 29 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 30 pthread_create(&(pool->threads[i]), &attr, (void*)run, (void*)pool); 31 } 32 #ifdef DEBUG 33 printf("threadpool_init-> create %d detached thread\n", max_thread_size); 34 #endif 35 } 36 37 /** 38 * 线程体 39 */ 40 void run(void *arg) 41 { 42 thread_pool* pool = (thread_pool*)arg; 43 while (1) 44 { 45 // 加锁 46 pthread_mutex_lock(&(pool->mutex)); 47 #ifdef DEBUG 48 printf("run-> locked\n"); 49 #endif 50 // 如果等待队列为0并且线程池未销毁,则处于阻塞状态(即等待任务的唤醒) 51 while (pool->runner_head == NULL && !pool->shutdown) 52 { 53 pthread_cond_wait(&(pool->cond), &(pool->mutex)); 54 } 55 //如果线程池已经销毁 56 if (pool->shutdown) 57 { 58 // 解锁 59 pthread_mutex_unlock(&(pool->mutex)); 60 #ifdef DEBUG 61 printf("run-> unlocked and thread exit\n"); 62 #endif 63 pthread_exit(NULL); 64 } 65 // 取出链表中的头元素 66 thread_runner *runner = pool->runner_head; 67 pool->runner_head = runner->next; 68 // 解锁 69 pthread_mutex_unlock(&(pool->mutex)); 70 #ifdef DEBUG 71 printf("run-> unlocked\n"); 72 #endif 73 // 调用回调函数,执行任务 74 (runner->callback)(runner->arg); 75 free(runner); 76 runner = NULL; 77 #ifdef DEBUG 78 printf("run-> runned and free runner\n"); 79 #endif 80 } 81 pthread_exit(NULL); 82 } 83 84 /** 85 * 向线程池加入任务 86 * 参数: 87 * pool:指向线程池结构有效地址的动态指针 88 * callback:线程回调函数 89 * arg:回调函数参数 90 */ 91 void threadpool_add_runner(thread_pool* pool, void(*callback)(void *arg), void *arg) 92 { 93 // 构造一个新任务 94 thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner)); 95 newrunner->callback = callback; 96 newrunner->arg = arg; 97 newrunner->next = NULL; 98 // 加锁 99 pthread_mutex_lock(&(pool->mutex)); 100 #ifdef DEBUG 101 printf("threadpool_add_runner-> locked\n"); 102 #endif 103 // 将任务加入到等待队列中 104 if (pool->runner_head != NULL) 105 { 106 pool->runner_tail->next = newrunner; 107 // 尾指针指到最后一个任务 108 pool->runner_tail = newrunner; 109 } 110 else 111 { 112 pool->runner_head = newrunner; 113 pool->runner_tail = newrunner; 114 } 115 // 解锁 116 pthread_mutex_unlock(&(pool->mutex)); 117 #ifdef DEBUG 118 printf("threadpool_add_runner-> unlocked\n"); 119 #endif 120 // 唤醒一个等待线程 121 pthread_cond_signal(&(pool->cond)); 122 #ifdef DEBUG 123 printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n"); 124 #endif 125 } 126 127 /** 128 * 销毁线程池 129 * 参数: 130 * ppool:指向线程池结构有效地址的动态指针地址(二级指针) 131 */ 132 void threadpool_destroy(thread_pool** ppool) 133 { 134 thread_pool *pool = *ppool; 135 // 防止2次销毁 136 if (!pool->shutdown) 137 { 138 pool->shutdown = 1; 139 // 唤醒所有等待线程,线程池要销毁了 140 pthread_cond_broadcast(&(pool->cond)); 141 // 等待所有线程中止 142 sleep(1); 143 #ifdef DEBUG 144 printf("threadpool_destroy-> wakeup all waiting threads\n"); 145 #endif 146 // 回收空间 147 free(pool->threads); 148 // 销毁等待队列 149 thread_runner *head = NULL; 150 while (pool->runner_head != NULL) 151 { 152 head = pool->runner_head; 153 pool->runner_head = pool->runner_head->next; 154 free(head); 155 } 156 157 #ifdef DEBUG 158 printf("threadpool_destroy-> all runners freed\n"); 159 #endif 160 // 条件变量和互斥量也别忘了销毁 161 pthread_mutex_destroy(&(pool->mutex)); 162 pthread_cond_destroy(&(pool->cond)); 163 164 #ifdef DEBUG 165 printf("threadpool_destroy-> mutex and cond destoryed\n"); 166 #endif 167 free(pool); 168 (*ppool) = NULL; 169 170 #ifdef DEBUG 171 printf("threadpool_destroy-> pool freed\n"); 172 #endif 173 } 174 }
测试文件如下:
1 #include "threadpool.h" 2 3 void threadrun(void* arg) 4 { 5 int *i = (int *)arg; 6 printf("%d\n", *i); 7 } 8 9 int main(void) 10 { 11 thread_pool *pool = malloc(sizeof(thread_pool)); 12 threadpool_init(pool, 2); 13 14 int i; 15 int tmp[3]; 16 for (i = 0; i < 3; i++) 17 { 18 tmp[i] = i; 19 threadpool_add_runner(pool, threadrun, &tmp[i]); 20 } 21 22 sleep(1); 23 threadpool_destroy(&pool); 24 printf("main-> %p\n", pool); 25 printf("main-> test over\n"); 26 27 return 0; 28 }
程序运行结果如下:
参考资料
时间: 2024-11-05 19:04:53