一个简单的线程池程序设计(消费者和生产者)

最近在学习linux下的编程,刚开始接触感觉有点复杂,今天把线程里比较重要的线程池程序重新理解梳理一下。

实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务。如果线程池中的线程都在忙,那么任务队列中的任务则等待。本程序较为简单,把任务定义为了两个数相加,输出它们的和。

采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全。

1.在linux环境下构建三个文件夹(include,src,bin)

include:包含该程序所需要的头文件。

src:包括程序的main函数,以及其他函数。

bin:程序编译,链接生成的可执行函数。

2.编写include下的头文件函数

vim pool_common.h

 1 #ifndef _COMM
 2 #define _COMM
 3 #include<stdio.h>
 4 #include<stdlib.h>
 5 #include<pthread.h>
 6 #include<string.h>
 7 #include<sys/stat.h>
 8 #include<sys/select.h>
 9 #include<sys/time.h>
10 #include<fcntl.h>
11 #include<unistd.h>
12 #include<sys/types.h>
13  struct tag
14 {
15     int left;
16     int right;
17 };
18 typedef struct tag  elem_t;  //定义数据成员,即要输入的两个相加的数
19 #endif

vim pool_que.h

 1 #ifndef _QUE
 2 #define _QUE
 3 #include "pool_common.h"//有关任务队列的函数
 4 typedef enum
 5 {
 6     EMPTY,FULL,NEITHER
 7 }STATUS;                           //枚举类型,判断队列是否为空返回的状态
 8 typedef struct que
 9 {
10     elem_t *que_arr;   //任务队列中的任务数组,储存数据
11     int que_front;      //任务队列的队头
12     int que_tail;       //任务队列的队尾
13     int que_capacity;  //任务队列所容纳任务的数量
14     pthread_mutex_t que_lock;  //任务队列的互斥锁
15     pthread_cond_t que_pro;   //任务产生的信号
16     pthread_cond_t que_con;   //任务被执行的信号
17 }que_t,*pque_t;
18 void que_init(pque_t pq,int sizenum);//任务队列初始化
19
20 STATUS que_full(pque_t pq);    //判断队列是否满了
21 STATUS que_empty(pque_t pq); //判断队列是否为空
22 void que_push(pque_t pq,elem_t val);  //将新的任务加入队列
23 void que_pop(pque_t pq);   //将任务从队列中删除
24 elem_t que_top(pque_t pq);  //获得任务队首的任务,和上个函数结合使用,即将任务交于线程执行
25 void que_destroy(pque_t pq); //销毁队列
26 #endif

vim pool_thread.h  //关于多线程的头文件

 1 #ifndef _POOL_THREAD
 2 #define _POOL_THREAD
 3 #include "pool_que.h"
 4 typedef void* (*consumer_handle)(void*); //定义线程所以执行的任务函数名称
 5 typedef struct threads
 6 {
 7     que_t pool_que;  //线程池中的任务队列
 8     int pool_cnt;   //线程池中线程个数
 9     pthread_t *pool_arr;  //线程数组
10     consumer_handle pool_handle; //线程索要处理的任务函数
11 }threadspool_t,*pthreadspool_t;
12 void pool_init(pthreadspool_t ppool,int quecapacity,int threadsnum,consumer_handle hd);//初始化多线程
13 void pool_on(pthreadspool_t ppool);//启动多线程
14 void pool_off(pthreadspool_t ppool);//关闭多线程
15 void pool_put(pthreadspool_t ppool,elem_t val);//将新的任务加入到线程池中
16 void pool_get(pthreadspool_t ppool,elem_t *val);//取出线程池中的任务
17 void pool_destroy(pthreadspool_t ppool);//销毁线程池
18 #endif

3.编写main函数

cd src

vim pool_main.c

 1 #include "pool_pthread.h"
 2 void *handle(void * arg)//任务处理函数
 3 {
 4     pthreadspool_t ppool=(pthreadspool_t)arg;
 5     elem_t val;
 6     while(1)
 7     {
 8         pool_get(ppool,&val);    //从线程池中获得任务数据
 9         printf("%u:excute task!,%d +%d=%d\n",pthread_self(),val.left,val.right,val.left+val.right);//执行输出
10     }
11
12 }
13 int main(int argc,char *argv[])
14 {
15     if(argc!=3)
16     {
17         printf("EXE QUE_CAPACITY THREADS_NUM!\n");
18         exit(1);
19     }//生成的可执行函数后面必须加上2个变量,否则出错
20     threadspool_t mypool;
21     pool_init(&mypool,atoi(argv[1]),atoi(argv[2]),handle);//线程池初始化
22     pool_on(&mypool);//启动线程池
23     char buf[1024];
24 //下面是一个select函数,其主要作用是每隔一秒去监听键盘上是否有数字输入,如果有,那么把数据流中的数据转化为任务数据添加到任务队列中去
25 /*    while(1)
26     {
27         fd_set rds;
28         FD_ZERO(&rds); //初始化
29         FD_SET(0,&rds);//rds捆绑输入,监听
30
31         struct timeval tm;
32         tm.tv_sec=1;
33         tm.tv_usec=0;//设置时间
34         if(0==select(1024,&rds,NULL,NULL,&tm))
35         {
36             continue;
37         }//如果没输入,继续监听
38         if(FD_ISSET(0,&rds))
39         {
40         memset(buf,0,sizeof(buf));
41         if(read(0,buf,127)==0)//将键盘上输入的字符流储存到数组中去
42         {
43             break;
44         }
45         elem_t val;
46         sscanf(buf,"%d%d",&val.left,&val.right);//将数组中的字符赋值两个数字到数据项
47         pool_put(&mypool,val);//加入任务队列
48         }
49     }*/
50 //第二种输入方法,不停的输入数字,并将这些数据添加到任务队列中去,不用select函数
51    int i,x,y;
52    elem_t val;
53    while(1)
54    {
55     scanf("%d%d",&x,&y);
56         val.left=x;
57         val.right=y;
58     pool_put(&mypool,val);
59    }
60     pool_off(&mypool);//关闭多线程
61     pool_destroy(&mypool);//销毁线程池
62     return 0;
63 }

4.实现多线程的具体功能函数pool_thread.c和队列操作的具体功能函数pool_que.c

vim pool_thread.c

 1 #include<pool_pthread.h>
 2 void pool_init(pthreadspool_t ppool,int quecapacity,int threadsnum,consumer_handle hd)
 3 {
 4     que_init(&ppool->pool_que,quecapacity);
 5     ppool->pool_arr=(pthread_t *)calloc(threadsnum,sizeof(pthread_t));
 6     ppool->pool_cnt=threadsnum;//线程数
 7     ppool->pool_handle=hd;
 8 }
 9 void pool_on(pthreadspool_t ppool)
10 {
11     int i;
12     for(i=0;i<ppool->pool_cnt;i++)
13     {
14         if(0!=pthread_create(ppool->pool_arr+i,NULL,ppool->pool_handle,(void*)ppool))//创建线程,并且执行
15         {
16             printf("thread create fail!\n");
17             exit(1);
18         }
19     }
20 }
21 void pool_off(pthreadspool_t ppool)
22 {
23     int i;
24     for(i=0;i<ppool->pool_cnt;i++)
25     {
26     pthread_join(ppool->pool_arr[i],NULL);//关闭线程
27     }
28 }
29 void pool_put(pthreadspool_t ppool,elem_t val)
30 {
31     que_push(&ppool->pool_que,val);
32 }
33 void pool_get(pthreadspool_t ppool,elem_t* val)
34 {
35     *val=que_top(&ppool->pool_que);
36     que_pop(&ppool->pool_que);
37 }
38 void pool_destroy(pthreadspool_t ppool)
39 {
40 que_destroy(&ppool->pool_que);
41 free(ppool->pool_arr);
42 }

vim pool_que.c

 1 #include  "pool_que.h"
 2 void que_init(pque_t pq,int sizenum)
 3 {
 4     pq->que_arr=(elem_t *)calloc(sizenum,sizeof(elem_t));
 5     pq->que_capacity=sizenum;
 6     pq->que_front =0;
 7     pq->que_tail=0;
 8     pthread_mutex_init(&pq->que_lock,NULL);//初始化互斥锁
 9     pthread_cond_init(&pq->que_pro,NULL);//初始化信号量
10     pthread_cond_init(&pq->que_con,NULL);
11 }
12
13 STATUS que_empty(pque_t pq)
14 {
15     if(pq->que_front==pq->que_tail)//队首和队尾相等则为空,该队列是一个循环队列
16     {
17         return EMPTY;
18     }
19     else
20         return NEITHER;
21 }
22 STATUS que_full(pque_t pq)
23 {
24     if((pq->que_tail+1)%pq->que_capacity==pq->que_front)
25     {
26         return FULL;
27     }
28     else
29         return NEITHER;
30 }
31
32 void que_push(pque_t pq,elem_t val)
33 {
34     pthread_mutex_lock(&pq->que_lock);//关闭互斥锁
35     while(que_full(pq)==FULL)
36     {
37    pthread_cond_wait(&pq->que_pro,&pq->que_lock); //当任务队列为满时,等待执行任务的信号,并且将互斥锁打开
38     }
39     pq->que_arr[pq->que_tail]=val;
40     pq->que_tail=(pq->que_tail+1)%pq->que_capacity;
41     pthread_cond_signal(&pq->que_con);//通知线程有新任务出现
42     pthread_mutex_unlock(&pq->que_lock);//打开互斥锁
43 }
44 elem_t que_top(pque_t pq)
45 {
46     pthread_mutex_lock(&pq->que_lock);//上锁
47     while(EMPTY==que_empty(pq))
48     {
49         pthread_cond_wait(&pq->que_con,&pq->que_lock);
50     }
51     return pq->que_arr[pq->que_front];
52 }
53 void que_pop(pque_t pq)
54 {
55     pq->que_front=(pq->que_front+1)%pq->que_capacity;
56     pthread_cond_signal(&pq->que_pro);
57     pthread_mutex_unlock(&pq->que_lock);
58 }
59 void que_destroy(pque_t pq)
60 {
61     free(pq->que_arr);
62     pthread_mutex_destroy(&pq->que_lock);//销毁锁和信号
63     pthread_cond_destroy(&pq->que_pro);
64     pthread_cond_destroy(&pq->que_con);
65 }

5.Makefile函数

由于程序涉及的函数较多,为了编译执行方便,可以使用Makefile函数,用make命令来执行

1 EXE_DIR:=./bin   //生成可执行函数的地址
2 INC_DIR:=./include  //头文件
3 SRC_DIR:=./src  //源文件
4
5 OBJECTS:=./src/pool_main.c ./src/pool_thread.c ./src/pool_que.c
6
7 $(EXE_DIR)/main:$(OBJECTS) $(INCLUDES)
8     gcc -g -o [email protected] $(OBJECTS) -I$(INC_DIR) -lpthread//编译命令

6.执行

7.总结

这个程序只是实现了一个简单的加法运算,当然也可以把多线程执行的任务换成其他的任务。只是通过这一个实例来说明多线程的意义和构建过程。

线程池:创建线程池以后,首先创建若干个线程。当任务队列中有任务出现时,指定一个线程为这个任务服务,若每个线程都在执行,那么任务队列中的任务只能等待。反之,如果任务队列为空,线程也只能等待。在客户端向服务器请求过程中,虽然线程不像进程那样占用很多资源,但是线程本身的创建和销毁在线程数量多的情况下仍然是很大的工作量。线程池实现了线程的复用,每个线程执行完一个任务后,可以继续执行另一个任务,这大大提高了线程处理任务的效率。

时间: 2024-12-12 06:45:37

一个简单的线程池程序设计(消费者和生产者)的相关文章

Linux C 一个简单的线程池程序设计

实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在linux环境下构建三个文件夹(include,src,bin) include:包含该程序所需要的头文件.

一个简单的线程池实现

前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个.跟大家分享,同时也整理整理思路.   对线程池的要求: 1.用于处理大量短暂的任务. 2.动态增加线程,直到达到最大允许的线程数量. 3.动态销毁线程.   线程池的实现类似于"消费者--生产者"模型: 用一个队列存放任务(仓库,缓存) 主线程添加任务(生产者生产任务) 新建线程函数执行任务(消费者执行任务) 由于任务队列是全部线程共享的,就涉及到同步问题.这里采用条件变量和互斥锁来实现. ------------

Linux C 实现一个简单的线程池

线程池的定义 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动. 什么时

一个简单的线程池

/** * * @author hc * @version 1.0 * * @param <Job> */ public interface ThreadPool<Job extends Runnable>{ //执行一个job void execute(Job job); //关闭线程 void shutdown(); //增加工作者线程 void addWorkers(int num); //减少工作者线程 void removeWorkers(int num); //正在等待

一个最简单的线程池

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**  * 一个最简单的线程池,这个模型很简单,但是很有用  *  * @author leizhimin 2014/8/22 20:21  */ public class Test3 {     private static final ExecutorService threadPool = Executors.newFix

自己动手实现简单的线程池

为了节省系统在多线程并发情况下不断的创建新和销毁线程所带来的性能浪费,就需要引入线程池. 线程池的基本功能就是线程复用.每当系统提交一个任务时,会尝试从线程池内取出空闲线程来执行它.如果没有空闲线程,这时候再创建新的线程.任务执行完毕,线程也不会立即销毁,而是加入到线程池中以便下次复用. Java提供了多种线程池的实现,以满足不同业务的需求.为了理解它们,下面给出一个最简单的线程池的实现. 线程池主要分为两大部分,线程池和一些永不退出的线程 首先是线程池部分: package optimisti

一个简单的线程锁------pthread和win32的临界区(Critical Section)

临界区: 临界区是指一个小代码段,在代码能够执行前,它必须独占对某些资源的访问权.这是让若干代码能够"以原子操作方式"来使用资源的一种方法. 所谓原子(atomic)操作方式,是指这段代码知道没有别的线程要访问这个资源. 说明: 1.  MacOSX,Windows有自己的线程模型, pthread可以说是跨平台的线程编程模型解决方案,当然对pthread不熟悉的也可以使用本地线程模型, 其实pthread的win32版本也是基于本地线程模型的, pthread-win32的mutex

一个Windows下线程池的实现(C++)

前言 本文配套代码:https://github.com/TTGuoying/ThreadPool 先看看几个概念: 线程:进程中负责执行的执行单元.一个进程中至少有一个线程. 多线程:一个进程中有多个线程同时运行,根据cpu切换轮流工作,在多核cpu上可以几个线程同时在不同的核心上同时运行. 线程池:基本思想还是一种对象池思想,开辟一块内存空间,里面存放一些休眠(挂起Suspend)的线程.当有任务要执行时,从池中取一个空闲的线程执行任务,执行完成后线程休眠放回池中.这样可以避免反复创建线程对

Python简单的线程池

class ThreadPool(object): def __init__(self, max_num=20): # 创建一个队列,队列里最多只能有10个数据 self.queue = queue.Queue(max_num) # 在队列里填充线程类 # [线程类.线程类.线程类.线程类.线程类.线程类.线程类] for i in range(max_num): self.queue.put(threading.Thread) def get_thread(self): # 去队列里去数据,