/* 写在前面的话:
今天刚“开原”,选择了一篇关于线程池的文件与大家分享,希望能对您学习有所帮助,也希望能与大家共同学习!
选择在这个特殊的时候注册并发文章也是有一些我个人特殊的意义的,看我的id(西游小学生.45)就知道了,哈哈。在这里也很感谢博客园的员工,刚发申请两分钟就同意了。
*/
最近由于要写一个类似于QQ的程序,所以想到要用到多线程。既然要用多线程,那何不写一个线程池?于是上网搜了搜多线程的代码,发现大多都不是很完善,或者有些小bug。所以,在这里贴出一个完整的,经过我多重测试的,先贴上一个线程池的简单实现过程(固定大小的线程池),稍后和网络编程的结合过些天会再贴出来。这里引用Linux的一句话:“
talk is cheap show me the code
”。所以我的博客大部分都会给大家贴代码的,谢谢。
首先,不得不提的就是线程池的好处:
简单来说,如果调用一个线程分为:
t1创建线程
t2完成作业
t3销毁线程
那么如果我们有100个作业要完成,总时间T=100*(t1+t2+t3)。但如果我们提前申请一个10个线程的固定大小线程池,那么完成作业的总时间为申请10个线程和销毁10个线程以及100个作业的时间,及T=10*t1+100*t2+10*t3。所以,线程池最大的优势就是节省了线程申请以及销毁的时间!
下面是具体的代码实现:
/*************************************************************************
> File Name: tpool.h
> Author:
> Mail:
> Created Time: 2015年04月01日 星期三 17时34分00秒
************************************************************************/
#ifndef THREAD_POOL_H__
#define THREAD_POOL_H__
#include <pthread.h>
/* 要执行的任务链表 */
typedef struct tpool_work {
void* (*routine)(int); /* 任务函数 */
int arg; /* 传入任务函数的参数 */
struct tpool_work *next;
}tpool_work_t;
typedef struct tpool {
int shutdown; /* 线程池是否销毁 */
int max_thr_num; /* 最大线程数 */
pthread_t *thr_id; /* 线程ID数组 */
tpool_work_t *queue_head; /* 线程链表 */
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
}tpool_t;
/*
* @brief 创建线程池
* @param max_thr_num 最大线程数
* @return 0: 成功 其他: 失败
*/
int
tpool_create(int max_thr_num);
/*
* @brief 销毁线程池
*/
void
tpool_destroy();
/*
* @brief 向线程池中添加任务
* @param routine 任务函数指针
* @param arg 任务函数参数
* @return 0: 成功 其他:失败
*/
int
tpool_add_work(void*(*routine)(int), int arg);
#endif
/*************************************************************************
> File Name: func.c
> Author:
> Mail:
> Created Time: 2015年04月01日 星期三 17时35分56秒
************************************************************************/
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include "tpool.h"
static tpool_t *tpool = NULL;
void
print_work(tpool_work_t *head) //输出任务列队当前工作
{
tpool_work_t *work= head;
printf("\n当前任务列队里还有的任务是:(任务的参数是)\n");
while(work){
printf("%d ",work->arg);
work = work->next;
}
printf("\n");
}
/* 工作者线程函数, 从任务链表中取出任务并执行 */
static void*
thread_routine(void *arg)
{
tpool_work_t *work; //任务链表
while(1) {
/* 如果线程池没有被销毁且没有任务要执行,则等待 */
pthread_mutex_lock(&tpool->queue_lock); //锁住线程池
while(!tpool->queue_head && !tpool->shutdown) {
pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);
}
if (tpool->shutdown) { //0没有注销
pthread_mutex_unlock(&tpool->queue_lock);
pthread_exit(NULL);
}
sleep(3);
// print_work(tpool->queue_head);
//取出任务链表头的一个工作
work = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
pthread_mutex_unlock(&tpool->queue_lock);
work->routine(work->arg);
free(work);
}
return NULL;
}
/*
* 创建线程池
*/
int
tpool_create(int max_thr_num)
{
int i;
tpool = calloc(1, sizeof(tpool_t)); //线程池tpool
if (!tpool) {
printf("%s: calloc failed\n", __FUNCTION__);
exit(1);
}
/* 初始化 */
tpool->max_thr_num = max_thr_num; //初始化最大线程池数
tpool->shutdown = 0; //线程池注销设为0未注
tpool->queue_head = NULL; //线程池链表
if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) { //初始化线程锁
printf("%s: pthread_mutex_init failed, errno:%d, error:%s\n",
__FUNCTION__, errno, strerror(errno));
exit(1);
}
if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) { //初始化条件变量
printf("%s: pthread_cond_init failed, errno:%d, error:%s\n",
__FUNCTION__, errno, strerror(errno));
exit(1);
}
/* 创建工作者线程 */
tpool->thr_id = calloc(max_thr_num, sizeof(pthread_t));//申请线程ID数组
if (!tpool->thr_id) {
printf("%s: calloc failed\n", __FUNCTION__);
exit(1);
}
for (i = 0; i < max_thr_num; ++i) {
if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){
printf("%s:pthread_create failed, errno:%d, error:%s\n", __FUNCTION__,
errno, strerror(errno));
exit(1);
}
}
return 0;
}
/* 销毁线程池 */
void
tpool_destroy()
{
int i;
tpool_work_t *member;
if (tpool->shutdown) {
return;
}
tpool->shutdown = 1;
/* 通知所有正在等待的线程 */
pthread_mutex_lock(&tpool->queue_lock);
pthread_cond_broadcast(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
for (i = 0; i < tpool->max_thr_num; ++i) {
pthread_join(tpool->thr_id[i], NULL);
}
free(tpool->thr_id);
while(tpool->queue_head) {
member = tpool->queue_head;
tpool->queue_head = tpool->queue_head->next;
free(member);
}
pthread_mutex_destroy(&tpool->queue_lock);
pthread_cond_destroy(&tpool->queue_ready);
free(tpool);
}
/* 向线程池添加任务 */
int
tpool_add_work(void*(*routine)(int), int arg)
{
tpool_work_t *work, *member;
if (!routine){
printf("%s:Invalid argument\n", __FUNCTION__);
return -1;
}
work = malloc(sizeof(tpool_work_t)); //申请一个任务结点
if (!work) { //申请失败
printf("%s:malloc failed\n", __FUNCTION__);
return -1;
}
work->routine = routine; //线程执行函数设为main中传进来的参数
work->arg = arg; //线程执行函数的参数设置为main中传进来的参数
work->next = NULL;
pthread_mutex_lock(&tpool->queue_lock); //锁住线程池锁
member = tpool->queue_head;
if (!member) { //找到线程链表中最后一个结点,并该工作放置于线程链表尾
tpool->queue_head = work;
} else {
while(member->next) {
member = member->next;
}
member->next = work;
}
/* 通知工作者线程,有新任务添加 */
pthread_cond_signal(&tpool->queue_ready);
pthread_mutex_unlock(&tpool->queue_lock);
return 0;
}
void *func(int arg)
{
printf("thread %d threadID is: %d\n",arg,(int)pthread_self());
print_work(tpool->queue_head);
sleep(3);
return NULL;
}
int
main(int arg, char **argv)
{
if (tpool_create(5) != 0) {
printf("tpool_create failed\n");
exit(1);
}
int i;
for (i = 0; i < 10; ++i) {
tpool_add_work(func, i);
// usleep(1);
}
sleep(10); //如果sleep无或时间太短,注意主线程可能先于其他线程完成导致部分作业未完成
tpool_destroy();
return 0;
}