1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <sys/types.h> 6 7 typedef struct CThread_worker 8 { 9 void *(*process)(void *arg); 10 void *arg; 11 struct CThread_worker *next; 12 }CThread_worker; 13 14 15 typedef struct 16 { 17 pthread_mutex_t queue_head; 18 pthread_cond_t queue_ready; 19 20 struct CThread_worker *queue_worker; 21 pthread_t *pthread_id; 22 23 int max_task_num; 24 int cur_task_num; 25 int shutdown; 26 27 }CThread_pool; 28 29 static CThread_pool *pool = NULL; 30 31 void pool_init(int); 32 void add_task_to_pool(void *(*)(void *), void *); 33 void *pthread_fun(void *); 34 void pool_destroy(); 35 void *my_process(void *); 36 37 int main(int argc, char *argv[]) 38 { 39 int max_task_num = 0; 40 int i = 0; 41 max_task_num = 5; 42 43 pool_init(max_task_num); 44 45 int *worker_num; 46 worker_num = (int *)malloc(10 * sizeof(int)); 47 for (i=0; i<10; i++) 48 { 49 worker_num[i] = i; 50 add_task_to_pool(my_process, &worker_num[i]); 51 } 52 53 sleep(12); 54 pool_destroy(); 55 56 free(worker_num); 57 worker_num = NULL; 58 59 return 0; 60 } 61 62 63 void pool_init(int num) 64 { 65 int i = 0; 66 pool = (CThread_pool *)malloc(sizeof(CThread_pool)); 67 68 pthread_mutex_init(&(pool->queue_head), NULL); 69 pthread_cond_init(&(pool->queue_ready), NULL); 70 71 pool->queue_worker = NULL; 72 pool->max_task_num = num; 73 pool->cur_task_num = 0; 74 pool->shutdown = 0; 75 pool->pthread_id = (pthread_t *)malloc(num * sizeof(pthread_t)); 76 77 for (i=0; i<num; ++i) 78 { 79 pthread_create(&(pool->pthread_id[i]), NULL, pthread_fun, NULL); 80 } 81 82 } 83 84 void *pthread_fun(void *arg) 85 { 86 CThread_worker *worker = NULL; 87 88 printf("pthread %u is starting\n", (unsigned int)pthread_self()); 89 while (1) 90 { 91 pthread_mutex_lock(&(pool->queue_head)); 92 while (pool->cur_task_num == 0 && !pool->shutdown) 93 { 94 printf("pthread %u is waiting task...\n\n", (unsigned int)(pthread_self())); 95 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_head)); 96 } 97 if (pool->shutdown) 98 { 99 /*线程退出之前,必须解锁,以让其他的线程得以访问该共享资源.*/ 100 pthread_mutex_unlock(&(pool->queue_head)); 101 printf("pthread %u is exiting\n", (unsigned int)pthread_self()); 102 pthread_exit(NULL); 103 } 104 105 pool->cur_task_num--; 106 107 worker = pool->queue_worker; 108 pool->queue_worker = worker->next; 109 /* 110 while (worker->next != NULL) 111 { 112 worker = worker->next; 113 } 114 */ 115 pthread_mutex_unlock(&(pool->queue_head)); 116 117 (*(worker->process))(worker->arg); 118 } 119 pthread_exit(NULL); 120 } 121 122 void add_task_to_pool(void *(*my_process)(void *), void *arg) 123 { 124 CThread_worker *worker = NULL; 125 CThread_worker *p = NULL; 126 127 worker = (CThread_worker *)malloc(sizeof(CThread_worker)); 128 worker->process = my_process; 129 worker->arg = arg; 130 worker->next = NULL; 131 132 pthread_mutex_lock(&(pool->queue_head)); 133 134 p = pool->queue_worker; 135 if ( p == NULL) 136 { 137 pool->queue_worker = worker; 138 } 139 else 140 { 141 while (p->next != NULL) 142 { 143 p = p->next; 144 } 145 p->next = worker; 146 } 147 148 pool->cur_task_num++; 149 150 pthread_mutex_unlock(&(pool->queue_head)); 151 152 pthread_cond_signal(&(pool->queue_ready)); 153 154 } 155 156 157 void pool_destroy() 158 { 159 int i = 0; 160 CThread_worker *p = NULL; 161 162 pool->shutdown = 1; 163 /*唤醒等待该条件的所有线程.否则线程将处于阻塞状态,等待条件满足.*/ 164 pthread_cond_broadcast(&(pool->queue_ready)); 165 /*阻塞等待线程退出,否则成为僵尸线程.*/ 166 for (i=0; i<pool->max_task_num; ++i) 167 { 168 pthread_join(pool->pthread_id[i], NULL); 169 } 170 free (pool->pthread_id); 171 172 while (pool->queue_worker != NULL) 173 { 174 p = pool->queue_worker; 175 pool->queue_worker = p->next; 176 free(p); 177 } 178 /*信号量和条件变量需要销毁.*/ 179 pthread_mutex_destroy(&(pool->queue_head)); 180 pthread_cond_destroy(&(pool->queue_ready)); 181 182 free(pool); 183 /*为避免pool成为野指针,将其赋值为空.*/ 184 pool = NULL; 185 } 186 187 void *my_process(void *task_id) 188 { 189 printf("thread %u is doing task %d\n",(unsigned int)pthread_self(), *(int *)task_id); 190 sleep(1); 191 }
时间: 2024-10-06 11:18:23