1. 概述
在谈到线程池之前,我们看看并发还有哪几种方式.
多进程,包括一次启动多个进程,然后在进程间传递描述符,或者连接来了以后fork子进程处理.
多线程,同理也可以像多进程一样,要么建立一堆放在那里,要么就有连接以后"即时建立,即时销毁"
我们看上面的方法并没有什么不妥,特殊的场景总是能够派上用场的.
比如一个并发量只有两位数甚至个位数,那么上面的这些方式应付起来也很轻松.但是如果要举一个极端的例子,十万的并发,首先不可能建立十万个进程等着这个并发,也不可能即时创建即时销毁.这些都无形中加重了系统的负担.
考虑一个问题,可能10W个并发,每个连接请求的服务仅仅1秒甚至更短就结束了.以上的方式显然面对这个场景而言,有点粗放了.这个时候,我们需要一个可以应付高并发,并且没有那么吃资源的并发方式.那么线程池也就脱颖而出了.
2. 设计
线程池,顾名思义,就是一个池子,里面放一堆线程.没事的时候待着,个个心怀鬼胎,摩拳擦掌,有事了谁抢到算谁的...
那么线程池该怎么实现呢.
1. 入口
正如上面说的,线程池就是一堆线程在一个池子里面,那么问题来了,该怎么保证这些线程没事的时候安静的待着.简单的mutex是做不到这一点的,需要结合cond才能达到这个效果.
pthread_mutex_lock(pthread_pool->pmutex); // while避免虚假唤醒 while (pthread_pool->pthread_job->job_num == 0 && 0 == pthread_pool->need_destroy) { pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex); }
这样就可以保证线程没事的时候不会折腾了.这里解释一下while的作用,就如注释所说,避免虚假唤醒,就是可能没有pthread_cond_signal或者pthread_cond_broadcast别调用,但是线程还是从pthread_cond_wait返回了,如果没有while这个判断,线程将会一直错下去,这显然不是我们想看到的.
2. 任务分配
毫无疑问,我们得借助回调来实现任务的分配.这个回调函数完全可以模拟线程自身的回调.
// argv指定的地址由线程池释放 typedef struct tag_job { thread_cb cb; void *argv; struct tag_job *next; }job_t, *job_pt; typedef struct { int32_t job_num; job_pt head; job_pt tail; }thread_job_t, *thread_job_pt;
对于线程池而言,其任务必然得是一个队列,也可以理解为任务池,任何一个被唤醒的线程就去这个池子里面取一个任务.然后执行.执行完就继续睡大觉.
3. 销毁
先说下我遇到的问题,我今天使用了几种销毁的方式,事实证明都是错的.最后借鉴网络上的一些例子,不过我现在找不到那个例子了...惭愧.
错误a. 直接调用 pthread_cancel,如果一个线程阻塞在cond那里,你无法用这种方式销毁它,如果恰巧它正在工作,然后你成功把它销毁了,还不如没有销毁,因为死锁了.
错误b. 使用pthread_kill发送SIGKILL.然后整个程序就跟着一块挂掉了.最后发现,信号是进程内共享的.如果不想被异常退出,要在主线程中捕获这个信号.然后在线程池也要有相应的信号处理函数.当然了,这个信号绝对不是SIGSTOP和SIGKILL.
错误c. pthread_cond_broadcast后直接释放mutex和cond.要知道,线程池可能并没有主线程退出的快,也就是说,主线程这边已经把cond和mutex释放了,线程池里面还有没成功退出的线程,此时它既可能调用unlock,也可能调用lock,无论那个都会导致程序异常崩溃.
好吧,说下正确的做法,当broad以后,要等待确认所有线程退出以后,再释放cond和mutex.这样才能使程序正常.
在我的线程池里面,我加了一个master,也就是说,如果创建一个10个线程的池子,我的线程池会创建11个线程,第11个线程就是用来保活的,如果线程池中的某个线程以为某个逻辑异常return了.就需要这个线程来重新启动它...
3. 实现
thread.h
/*!**************************************************************************** * Coypright(C) 2014-2024 () technology Co., Ltd * * 文件名 : thread.h * 版本号 : 1.0 * 描 述 : 线程池实现 * 作 者 : cp3alai * 日 期 : 2016.05.31 *****************************************************************************/ #include <unistd.h> #include <stdlib.h> #include <stdint.h> #include <string.h> #include <pthread.h> #include <signal.h> #include <errno.h> typedef void (*thread_cb)(void *argv); // argv指定的地址由线程池释放 typedef struct tag_job { thread_cb cb; void *argv; struct tag_job *next; }job_t, *job_pt; typedef struct { int32_t job_num; job_pt head; job_pt tail; }thread_job_t, *thread_job_pt; int thread_job_init(thread_job_pt *pthread_job); int thread_job_push(thread_job_pt thread_job, thread_cb cb, void *arg); job_pt thread_job_pop(thread_job_pt thread_job); void thread_job_destroy(thread_job_pt thread_job); typedef struct { int32_t need_destroy; int32_t thread_num; pthread_t pthread_master; pthread_t *pthread_id; thread_job_pt pthread_job; pthread_cond_t *pcond; pthread_mutex_t *pmutex; }thread_pool_t, *thread_pool_pt; int pthread_create_detach(pthread_t *thread, void *(*start_routine)(void*), void *arg); void *thread_entry(void *arg); void *thread_master(void *arg); int thread_pool_init(thread_pool_t **pthread_pool, int num); int thread_pool_add(thread_pool_pt pthread_pool, thread_cb, void *arg); int thread_pool_kill(); int thread_pool_keepalive(); void thread_pool_destroy(thread_pool_pt pthread_pool);
thread.c
/*!**************************************************************************** * Coypright(C) 2014-2024 () technology Co., Ltd * * 文件名 : /media/alai/work/workspace/CC++/mrtpoll/thread.c * 版本号 : 1.0 * 描 述 : * 作 者 : cp3alai * 日 期 : 2016.05.31 *****************************************************************************/ #include "thread.h" int thread_job_init(thread_job_pt *pthread_job) { int32_t ret; if (NULL == pthread_job) { ret = -1; goto ERROR; } *pthread_job = (thread_job_pt)malloc(sizeof(thread_job_t)); if (NULL == *pthread_job) { ret = -1; goto ERROR; } bzero(*pthread_job, sizeof(thread_job_t)); (*pthread_job)->head = NULL; (*pthread_job)->tail= NULL; ret = 0; ERROR: return ret; } int thread_job_push(thread_job_pt thread_job, thread_cb cb, void *arg) { int32_t ret = 0; job_pt pjob = NULL; if (NULL == thread_job || NULL == cb) { ret = -1; goto ERROR; } pjob = (job_pt)malloc(sizeof(job_t)); if (NULL == pjob) { ret = -1; goto ERROR; } pjob->argv = arg; pjob->cb = cb; pjob->next = NULL; if (NULL != thread_job->head) { thread_job->tail->next = pjob; thread_job->tail = pjob; } else { thread_job->head = pjob; thread_job->tail = pjob; } thread_job->job_num++; ERROR: return ret; } // 这里返回的job需要使用它的线程释放 job_pt thread_job_pop(thread_job_pt thread_job) { job_pt job = NULL; if (NULL == thread_job) { return NULL; } if (NULL == thread_job->head) { return NULL; } job = thread_job->head; thread_job->head = thread_job->head->next; thread_job->job_num--; return job; } void thread_job_destroy(thread_job_pt thread_job) { if (NULL == thread_job) { return; } while (NULL != thread_job->head) { job_pt pjob = thread_job->head; thread_job->head = pjob->next; free(pjob); pjob = NULL; } free(thread_job); thread_job = NULL; return; } int pthread_create_detach(pthread_t *thread, void *(*start_routine)(void*), void *arg) { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, 1); return pthread_create(thread, &attr, start_routine, arg); } void *thread_master(void *arg) { if (NULL == arg) { return NULL; } thread_pool_pt pthread_pool = (thread_pool_pt)arg; int i; while (1) { for (i = 0; i < pthread_pool->thread_num; i++) { if (0 == pthread_pool->pthread_id[i]) { pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void*)pthread_pool); } if ((pthread_kill(pthread_pool->pthread_id[i], 0)) < 0) { if (errno == ESRCH) { pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void*)pthread_pool); } } sleep(1); } } return NULL; } void *thread_entry(void *arg) { if (NULL == arg) { return NULL; } thread_pool_pt pthread_pool = (thread_pool_pt)arg; while (1) { pthread_mutex_lock(pthread_pool->pmutex); // 貌似这样写也可以避免虚假唤醒 // while (pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex)) while (pthread_pool->pthread_job->job_num == 0 && 0 == pthread_pool->need_destroy) { pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex); } // 如果销毁线程,则这里直接退出 if (0 != pthread_pool->need_destroy) { pthread_pool->thread_num--; pthread_mutex_unlock(pthread_pool->pmutex); break; } job_pt pjob = thread_job_pop(pthread_pool->pthread_job); if (NULL == pjob) { continue; } pthread_mutex_unlock(pthread_pool->pmutex); pjob->cb(pjob->argv); // 用完了要释放... free(pjob); pjob = NULL; } return NULL; } int thread_pool_init(thread_pool_t **pthread_pool, int num) { int ret = 0; int i; if (NULL == pthread_pool || 0 == num) { ret = -1; goto ERROR; } *pthread_pool = (thread_pool_pt)malloc(sizeof(thread_pool_t)); if (NULL == *pthread_pool) { ret = -1; goto ERROR; } bzero((*pthread_pool), sizeof(thread_pool_t)); (*pthread_pool)->pmutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t)); if (NULL == (*pthread_pool)->pmutex) { ret = -1; goto ERROR; } (*pthread_pool)->pcond = (pthread_cond_t *)malloc(sizeof(pthread_cond_t)); if (NULL == (*pthread_pool)->pcond) { ret = -1; goto ERROR; } pthread_mutex_init((*pthread_pool)->pmutex, NULL); pthread_cond_init((*pthread_pool)->pcond, NULL); if ((ret = thread_job_init(&(*pthread_pool)->pthread_job)) < 0) { ret = -1; goto ERROR; } (*pthread_pool)->pthread_id = (pthread_t*)malloc(num * sizeof(pthread_t)); if (NULL == (*pthread_pool)->pthread_id) { ret = -1; goto ERROR; } bzero((*pthread_pool)->pthread_id, sizeof(pthread_t) * num); (*pthread_pool)->thread_num = num; for (i = 0; i < num; i++) { ret = pthread_create_detach(&(*pthread_pool)->pthread_id[i], thread_entry, (void *)(*pthread_pool)); if (ret < 0) { (*pthread_pool)->pthread_id[i] = 0; } } ret = pthread_create_detach(&(*pthread_pool)->pthread_master, thread_master, (void *)(*pthread_pool)); if (ret < 0) { ret = -1; goto ERROR; } ret = 0; ERROR: if (ret < 0) { if (NULL != (*pthread_pool)) { thread_pool_destroy((*pthread_pool)); } } return ret; } int thread_pool_add(thread_pool_pt pthread_pool, thread_cb cb, void *arg) { int32_t ret = 0; if (NULL == pthread_pool || NULL == cb) { ret = -1; goto ERROR; } if (NULL == pthread_pool->pthread_job) { ret = -1; goto ERROR; } pthread_mutex_lock(pthread_pool->pmutex); ret = thread_job_push(pthread_pool->pthread_job, cb, arg); if (ret < 0) { ret = -1; goto ERROR; } pthread_cond_signal(pthread_pool->pcond); pthread_mutex_unlock(pthread_pool->pmutex); ret = 0; ERROR: return ret; } int thread_pool_kill(thread_pool_pt pthread_pool, int32_t thread_no, int32_t signo) { int32_t ret; int32_t i; if (NULL == pthread_pool) { ret = -1; goto ERROR; } if (0 == thread_no) { for (i = 0; i < pthread_pool->thread_num; i++) { pthread_kill(pthread_pool->pthread_id[i], signo); } } else { ret = pthread_kill(pthread_pool->pthread_id[thread_no], signo); if (ret < 0) { ret = -1; goto ERROR; } } ret = 0; ERROR: return ret; } int thread_pool_keepalive(thread_pool_pt pthread_pool) { int32_t ret; int32_t i; if (NULL == pthread_pool) { return -1; } for (i = 0; i < pthread_pool->thread_num; i++) { ret = pthread_kill(pthread_pool->pthread_id[i], 0); if (ret < 0) { if (errno == ESRCH) { pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void *)pthread_pool); } } } return 0; } void thread_pool_destroy(thread_pool_pt pthread_pool) { int32_t i; if (NULL == pthread_pool) { return; } if (0 != pthread_pool->pthread_master) { pthread_cancel(pthread_pool->pthread_master); } if (NULL != pthread_pool->pthread_id) { pthread_pool->need_destroy = 1; pthread_cond_broadcast(pthread_pool->pcond); free(pthread_pool->pthread_id); pthread_pool->pthread_id = NULL; } if (NULL != pthread_pool->pthread_job) { thread_job_destroy(pthread_pool->pthread_job); } // 等待线程自销毁完毕 while (pthread_pool->thread_num > 0) { usleep(0); } if (NULL != pthread_pool->pmutex) { pthread_mutex_destroy(pthread_pool->pmutex); free(pthread_pool->pmutex); pthread_pool->pmutex = NULL; } if (NULL != pthread_pool->pcond) { pthread_cond_destroy(pthread_pool->pcond); free(pthread_pool->pcond); pthread_pool->pcond = NULL; } }
main.c
#include <stdio.h> #include "thread.h" void thread_routine1(void *argv) { fprintf(stderr, "this is thread 1\n"); return; } void thread_routine2(void *argv) { fprintf(stderr, "this is thread 2\n"); return; } int main(int argc, char **argv) { int32_t ret; thread_pool_pt pthread_pool; ret = thread_pool_init(&pthread_pool, 30); if (ret < 0) { return -1; } thread_pool_add(pthread_pool, thread_routine1, NULL); thread_pool_add(pthread_pool, thread_routine2, NULL); thread_pool_add(pthread_pool, thread_routine2, NULL); thread_pool_add(pthread_pool, thread_routine2, NULL); thread_pool_add(pthread_pool, thread_routine2, NULL); thread_pool_add(pthread_pool, thread_routine2, NULL); thread_pool_add(pthread_pool, thread_routine1, NULL); thread_pool_add(pthread_pool, thread_routine1, NULL); thread_pool_add(pthread_pool, thread_routine1, NULL); thread_pool_add(pthread_pool, thread_routine1, NULL); sleep(5); thread_pool_destroy(pthread_pool); sleep(5); return 0; }
简单的Makefile
################################################### ### Make with me ################################################### CC := gcc TARGET := pool_test OBJ := main.o thread.o INCS := -I. LIBS := -lpthread CFLAGS := -ggdb3 -O0 $(TARGET) : $(OBJ) $(CC) -o $(TARGET) $(OBJ) $(CFLAGS) $(INCS) $(LIBS) .PHONY: clean clean: @echo "cleaned" @rm -rf $(TARGET) $(OBJ)
执行结果如图 :
4. 后记
写到最后,其实这篇博客还有另外一个目的,就是希望各位看到的同学们,如果代码中有问题,请在评论中帮忙指出来.真的是不胜感激了...