一个简单的线程池实现

前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个。跟大家分享,同时也整理整理思路。

 

对线程池的要求:

1.用于处理大量短暂的任务。

2.动态增加线程,直到达到最大允许的线程数量。

3.动态销毁线程。

 

线程池的实现类似于”消费者--生产者”模型:

用一个队列存放任务(仓库,缓存)

主线程添加任务(生产者生产任务)

新建线程函数执行任务(消费者执行任务)

由于任务队列是全部线程共享的,就涉及到同步问题。这里采用条件变量和互斥锁来实现。

--------------------------------------------------------condition.h----------------------------------------------------------

/*

在此线程池中互斥锁和条件变量都是配套使用,编写此头文件使用比较方便

在此线程池中保护共享数据的都是用这个头文件中的函数

*/

#ifndef _CONDITION_H_

#define _CONDITION_H_

#include <pthread.h>

//有互斥锁和条件变量的结构体condition_t

typedef struct condition

{

         pthread_cond_t pcond;

         pthread_mutex_t pmutex;

} condition_t;

int condition_init(condition_t *cond)

{

         if(pthread_cond_init(&cond->pcond, NULL))

                   return 1;

         if(pthread_mutex_init(&cond->pmutex, NULL))

                   return 1;

         return 0; 

};

int condition_lock(condition_t *cond)

{

         return pthread_mutex_lock(&cond->pmutex);

}

int condition_unlock(condition_t *cond)

{

         return pthread_mutex_unlock(&cond->pmutex);

}

int condition_wait(condition_t *cond)

{

         return pthread_cond_wait(&cond->pcond, &cond->pmutex);

}

int condition_timedwait(condition_t *cond, const struct timespec *abstime)

{

        return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);

}

int condition_signal(condition_t *cond)

{

         return pthread_cond_signal(&cond->pcond);

}

int condition_broadcast(condition_t *cond)

{

         return pthread_cond_broadcast(&cond->pcond);

}

int condition_destroy(condition_t *cond)

{

         if(pthread_cond_destroy(&cond->pcond))

                   return 1;

         if(pthread_mutex_destroy(&cond->pmutex))

                   return 1;

         return 0;

}

#endif

-------------------------------------------------------threadpool.h--------------------------------------------------------

//线程池头文件

#ifndef _THREADPOOL_H_

#define _THREADPOOL_H_

#include "condition.h"

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <sys/time.h>

#include <time.h>

//线程任务结构体(队列形式存储)

typedef struct Task

{

         void* (*run) (void*arg);

         void *arg;

         struct Task *next;

} task_t;

//线程池结构体

typedef struct threadpool

{

         condition_t ready;                    //互斥锁与条件变量

         task_t *first;                      //任务队列头

         task_t *last;                                    //任务队列尾

         int counter;                                     //任务总数

         int maxthread;                      //最大线程数

         int idle;                         //正在等待的线程数量

         int quit;                         //销毁线程池标志

} threadpool_t;

//初始化线程池

void threadpool_init(threadpool_t *pool, int max_num)

{

         condition_init(&pool->ready);

         pool->first = NULL;

         pool->last = NULL;

         pool->counter = 0;

         pool->maxthread = max_num;

         pool->idle = 0;

         pool->quit = 0;

}

//新建线程函数

void *thread_routine(void *arg)

{

         struct timespec abstime;

         int timeout;                                  //是否超过等待任务时间

         threadpool_t *pool = (threadpool_t*)arg;

         printf("0x%0x thread is starting\n", (int)pthread_self());

     //让线程不会因为结束任务立刻销毁
         for(;;)                                            

         {

                   timeout = 0;

                   condition_lock(&pool->ready);                               //对全局变量任务队列进行操作,要加锁

                   pool->idle ++;

           //等待任务队列中有任务

                   while(pool->first == NULL && pool->quit == 0)

                   {

                            printf("0x%0x thread is waiting\n", (int)pthread_self());

                            clock_gettime(CLOCK_REALTIME, &abstime);

                            abstime.tv_sec += 2;                                   //设置等待超时时间

                            int status = condition_timedwait(&pool->ready, &abstime);

                            if(status == 110)                                      //timewait函数超时返回110(TIMEDOUT)

                            {

                                     printf("0x%0x thread is timeout\n", (int)pthread_self());

                                     timeout = 1;

                                     break;

                            }

                   }

                   //正在等待的线程数量减一

                   pool->idle --;    

           //若任务队列中有任务

                   if(pool->first != NULL)

                   {

                            task_t *t = pool->first;

                            pool->first = pool->first->next;

                //先解锁再执行任务函数,让其它线程可以对任务队列进行操作,提高效率

                            condition_unlock(&pool->ready);            

                            t->run(t->arg);

                            free(t);

                //任务执行完毕,重新上锁

                            condition_lock(&pool->ready);

                   }

                   //有销毁线程池命令 并且 线程队列为空

                   if(pool->first == NULL && pool->quit == 1)

                   {

                            pool->counter--;                                            //减掉一个任务

                            if(pool->counter == 0)                      //任务全部完成,向销毁函数发起通知

                                      condition_signal(&pool->ready);

                            condition_unlock(&pool->ready);                 //不要忘记解锁

                            break;

                   }

                   if(pool->first == NULL && timeout == 1)                  //等待任务超时

                   {

                            pool->counter--;                          //减掉一个任务

                            condition_unlock(&pool->ready);                 //同样不要忘记解锁

                            break;

                   }

                   condition_unlock(&pool->ready);

         }

         printf("0x%0x threadpool is exiting\n", (int)pthread_self());

         return NULL;

}

//向任务队列中添加任务

void threadpool_add_task(threadpool_t *pool, void* (*run) (void*arg), void*arg)

{

         //动态分配空间给新任务

         task_t *newtask = (task_t*)malloc(sizeof(task_t));

         newtask->run = run;

         newtask->arg = arg;

         newtask->next = NULL;

     //要对任务队列进行操作,上锁

         condition_lock(&pool->ready);

         //添加新任务到任务队列

         if(pool->first == NULL)

                   pool->first = newtask;

         else

                   pool->last->next = newtask;

         pool->last = newtask;

         //如果有线程在等待,则不用创建新线程,直接发起通知处理任务

         if(0 < pool->idle)

                   condition_signal(&pool->ready);

     //当前线程数不能超过线程数,用<不用<=,因为counter初始为0

         else if(pool->counter < pool->maxthread)                

         {

                   pthread_t thread;

                   pthread_create(&thread, NULL, thread_routine, (void*)pool);

                   pool->counter++;

         }

         condition_unlock(&pool->ready);

}

//销毁线程池

void threadpool_destroy(threadpool_t *pool)

{

         if(pool->quit)                                                      //若已经销毁,直接返回,避免销毁两次

                   return;

         condition_lock(&pool->ready);

         pool->quit = 1;

         if(pool->counter > 0)

         {

                   condition_broadcast(&pool->ready);                //广播关闭正在等待的线程

                   if(pool->counter > 0)                                    //若有正在进行的线程等待线程结束的通知

                            condition_wait(&pool->ready);

         }

         condition_unlock(&pool->ready);

         condition_destroy(&pool->ready);                            //销毁互斥锁和条件变量

}

#endif
//测试代码(main函数):

#include "threadpool.h"

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <errno.h>

void* mytask(void *arg)

{

         printf("0x%0x thread run the task %d\n", (int)pthread_self(), *(int*)arg);

         free(arg);

         sleep(1);

         return NULL;

}

int main(int argc, char **argv)

{

         threadpool_t pool;

         int i;

         threadpool_init(&pool, 4);

         for (i = 0; i < 10; ++i)

         {

                   int *arg = (int *)malloc(sizeof(int));

                   *arg = i;

                   threadpool_add_task(&pool, mytask, (void *)arg);

         }

//      sleep(15);

         threadpool_destroy(&pool);

         return 0;

}

代码执行结果如下:

[[email protected] 线程池]$ ./main

0x335fe700 thread is starting

0x31dfb700 thread is starting

0x31dfb700 thread run the task 1

0x335fe700 thread run the task 0

0x32dfd700 thread is starting

0x32dfd700 thread run the task 2

0x325fc700 thread is starting

0x325fc700 thread run the task 3

0x31dfb700 thread run the task 4

0x335fe700 thread run the task 5

0x32dfd700 thread run the task 6

0x325fc700 thread run the task 7

0x335fe700 thread run the task 9

0x325fc700 threadpool is exiting

0x31dfb700 thread run the task 8

0x32dfd700 threadpool is exiting

0x335fe700 threadpool is exiting

0x31dfb700 threadpool is exiting

可以看到一共处理了0~9十个任务;

最大线程数量为4(0x335fe700,0x31dfb700, 0x32dfd700, 0x325fc700 );

等待每个线程都退出了才结束进程;

第一次写博客,希望能够在这里学到更多,也分享一些自己学习的心得体会,共勉互励。

时间: 2024-10-27 07:22:13

一个简单的线程池实现的相关文章

一个简单的线程池程序设计(消费者和生产者)

最近在学习linux下的编程,刚开始接触感觉有点复杂,今天把线程里比较重要的线程池程序重新理解梳理一下. 实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在l

Linux C 一个简单的线程池程序设计

实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在linux环境下构建三个文件夹(include,src,bin) include:包含该程序所需要的头文件.

Linux C 实现一个简单的线程池

线程池的定义 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动. 什么时

一个简单的线程池

/** * * @author hc * @version 1.0 * * @param <Job> */ public interface ThreadPool<Job extends Runnable>{ //执行一个job void execute(Job job); //关闭线程 void shutdown(); //增加工作者线程 void addWorkers(int num); //减少工作者线程 void removeWorkers(int num); //正在等待

一个最简单的线程池

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**  * 一个最简单的线程池,这个模型很简单,但是很有用  *  * @author leizhimin 2014/8/22 20:21  */ public class Test3 {     private static final ExecutorService threadPool = Executors.newFix

自己动手实现简单的线程池

为了节省系统在多线程并发情况下不断的创建新和销毁线程所带来的性能浪费,就需要引入线程池. 线程池的基本功能就是线程复用.每当系统提交一个任务时,会尝试从线程池内取出空闲线程来执行它.如果没有空闲线程,这时候再创建新的线程.任务执行完毕,线程也不会立即销毁,而是加入到线程池中以便下次复用. Java提供了多种线程池的实现,以满足不同业务的需求.为了理解它们,下面给出一个最简单的线程池的实现. 线程池主要分为两大部分,线程池和一些永不退出的线程 首先是线程池部分: package optimisti

一个简单的线程锁------pthread和win32的临界区(Critical Section)

临界区: 临界区是指一个小代码段,在代码能够执行前,它必须独占对某些资源的访问权.这是让若干代码能够"以原子操作方式"来使用资源的一种方法. 所谓原子(atomic)操作方式,是指这段代码知道没有别的线程要访问这个资源. 说明: 1.  MacOSX,Windows有自己的线程模型, pthread可以说是跨平台的线程编程模型解决方案,当然对pthread不熟悉的也可以使用本地线程模型, 其实pthread的win32版本也是基于本地线程模型的, pthread-win32的mutex

一个Windows下线程池的实现(C++)

前言 本文配套代码:https://github.com/TTGuoying/ThreadPool 先看看几个概念: 线程:进程中负责执行的执行单元.一个进程中至少有一个线程. 多线程:一个进程中有多个线程同时运行,根据cpu切换轮流工作,在多核cpu上可以几个线程同时在不同的核心上同时运行. 线程池:基本思想还是一种对象池思想,开辟一块内存空间,里面存放一些休眠(挂起Suspend)的线程.当有任务要执行时,从池中取一个空闲的线程执行任务,执行完成后线程休眠放回池中.这样可以避免反复创建线程对

Python简单的线程池

class ThreadPool(object): def __init__(self, max_num=20): # 创建一个队列,队列里最多只能有10个数据 self.queue = queue.Queue(max_num) # 在队列里填充线程类 # [线程类.线程类.线程类.线程类.线程类.线程类.线程类] for i in range(max_num): self.queue.put(threading.Thread) def get_thread(self): # 去队列里去数据,