Linux C++线程池实例

想做一个多线程服务器测试程序,因此参考了github的一些实例,然后自己动手写了类似来加深理解。

目前了解的线程池实现有2种思路:

第一种:

主进程创建一定数量的线程,并将其全部挂起,此时线程状态为idle,并将running态计数为0,等到任务可以执行了,就唤醒线程,此时线程状态为running,计数增加,如果计数达到最大线程数,就再创建一组空闲线程,等待新任务,上一组线程执行完退出,如此交替。

第二种:

采用生成者-消费者模式,主进程作为生成者,创建FIFO队列,在任务队列尾部添加任务,线程池作为消费者在队列头部取走任务执行,这之间有人会提到无锁环形队列,在单生成者单消费者的模式下是有效的,但是线程池肯定是多消费者同时去队列取任务,环形队列会造成死锁。

我的实例采用第二种方式实现,在某些应用场景下,允许一定时间内,任务排队的情况,重复利用已有线程会比较合适。

代码比较占篇幅,因此折叠在下面。

task.h:

 1 #ifndef TASK_H
 2 #define TASK_H
 3
 4 #include <list>
 5 #include <pthread.h>
 6
 7 using std::list;
 8
 9 struct task {
10    void (*function) (void *);
11    void *arguments;
12    int id;
13 };
14
15 struct work_queue {
16    work_queue(){
17         pthread_mutex_init(&queue_lock, NULL);
18         pthread_mutex_init(&queue_read_lock, NULL);
19         pthread_cond_init(&queue_read_cond, NULL);
20         qlen = 0;
21    }
22
23    ~work_queue() {
24        queue.clear();
25        pthread_mutex_destroy(&queue_read_lock);
26        pthread_mutex_destroy(&queue_lock);
27        pthread_cond_destroy(&queue_read_cond);
28    }
29
30    void push(task *tsk);
31    task *pull();
32    void post();
33    void wait();
34
35 private:
36    int qlen;
37    list< task * > queue;
38    pthread_mutex_t queue_lock;
39    pthread_mutex_t queue_read_lock;
40    pthread_cond_t  queue_read_cond;
41 };
42
43 #endif

task.cpp

#include "task.h"

void work_queue::push(task *tsk) {
   pthread_mutex_lock(&queue_lock);
   queue.push_back(tsk);
   qlen++;

   pthread_cond_signal(&queue_read_cond);
   pthread_mutex_unlock(&queue_lock);
}

task* work_queue::pull() {
    wait();

    pthread_mutex_lock(&queue_lock);
    task* tsk = NULL;
    if (qlen > 0) {
        tsk = *(queue.begin());
        queue.pop_front();
        qlen--;

        if (qlen > 0)
            pthread_cond_signal(&queue_read_cond);
    }
    pthread_mutex_unlock(&queue_lock);
    return tsk;
}

void work_queue::post() {
    pthread_mutex_lock(&queue_read_lock);
    pthread_cond_broadcast(&queue_read_cond);
    pthread_mutex_unlock(&queue_read_lock);
}

void work_queue::wait() {
    pthread_mutex_lock(&queue_read_lock);
    pthread_cond_wait(&queue_read_cond, &queue_read_lock);
    pthread_mutex_unlock(&queue_read_lock);
}

threadpool.h

 1 #ifndef THREAD_POOL_H
 2 #define THREAD_POOL_H
 3
 4 #include "task.h"
 5 #include <vector>
 6
 7 using std::vector;
 8
 9 #define safe_delete(p)  if (p) { delete p;  p = NULL; }
10
11 struct threadpool {
12     threadpool(int size) : pool_size(size)
13                          , thread_list(size, pthread_t(0))
14                          , queue(NULL)
15                          , finish(false)
16                          , ready(0) {
17
18         pthread_mutex_init(&pool_lock, NULL);
19     }
20
21     ~threadpool() {
22         thread_list.clear();
23         safe_delete(queue);
24         pthread_mutex_destroy(&pool_lock);
25     }
26
27     void init();
28     void destroy();
29     static void* thread_run(void *tp);
30
31     void incr_ready();
32     void decr_ready();
33     bool close() const;
34
35     work_queue *queue;
36
37 private:
38     int pool_size;
39     int ready;
40     bool finish;
41     pthread_mutex_t pool_lock;
42     vector <pthread_t> thread_list;
43 };
44
45
46 #endif

threadpool.cpp

 1 /*
 2  * threadpool.cpp
 3  *
 4  *  Created on: 2017年3月27日
 5  *      Author: Administrator
 6  */
 7
 8 #include "threadpool.h"
 9
10
11 void* threadpool::thread_run(void *tp) {
12     threadpool *pool = (threadpool *) tp;
13     pool->incr_ready();
14
15     while(1) {
16         task* tsk = pool->queue->pull();
17         if (tsk) {
18             (tsk->function)(tsk->arguments);
19             delete tsk;
20             tsk = NULL;
21         }
22
23         if (pool->close())
24             break;
25     }
26
27     pool->decr_ready();
28
29     return NULL;
30 }
31
32 void threadpool::incr_ready() {
33     pthread_mutex_lock(&pool_lock);
34     ready++;
35     pthread_mutex_unlock(&pool_lock);
36 }
37
38 void threadpool::decr_ready() {
39     pthread_mutex_lock(&pool_lock);
40     ready--;
41     pthread_mutex_unlock(&pool_lock);
42 }
43
44 bool threadpool::close() const {
45     return finish;
46 }
47
48 void threadpool::init() {
49     queue = new work_queue;
50     if (!queue) {
51         return;
52     }
53
54     for(int i; i<pool_size; i++) {
55         pthread_create(&thread_list[i], NULL, threadpool::thread_run, (void *)this);
56     }
57
58     while(ready != pool_size) {}
59 }
60
61 void threadpool::destroy() {
62     finish = true;
63
64     while(ready) {
65         if(queue) {
66             queue->post();
67         }
68     }
69 }

main.cpp

 1 //============================================================================
 2 // Name        : thread_pool.cpp
 3 // Author      : dancy
 4 // Version     :
 5 // Copyright   : Your copyright notice
 6 // Description : Hello World in C++, Ansi-style
 7 //============================================================================
 8
 9 #include <iostream>
10 #include "threadpool.h"
11 #include <pthread.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14
15 using namespace std;
16
17 void job(void *tsk){
18     printf("job %-2d working on Thread #%u\n", ((task *)tsk)->id, (int)pthread_self());
19 }
20
21 task *make_task(void (*func) (void *), int id) {
22     task *tsk = new task;
23     if (!tsk)
24         return NULL;
25
26     tsk->function = func;
27     tsk->arguments = (void *)tsk;
28     tsk->id = id;
29
30     return tsk;
31 }
32
33 int main() {
34     threadpool tp(4);
35     tp.init();
36
37     for(int i=0; i<40; i++)
38         tp.queue->push(make_task(&job, i+1));
39
40     tp.destroy();
41     printf("all task has completed\n");
42     return 0;
43 }

时间: 2024-10-20 16:26:47

Linux C++线程池实例的相关文章

java线程池实例

目的         了解线程池的知识后,写个线程池实例,熟悉多线程开发,建议看jdk线程池源码,跟大师比,才知道差距啊O(∩_∩)O 线程池类 1 package thread.pool2; 2 3 import java.util.LinkedList; 4 5 public class ThreadPool { 6 //最大线程数 7 private int maxCapacity; 8 //初始线程数 9 private int initCapacity; 10 //当前线程数 11 p

LINUX c++线程池框架

版权声明:原文地址及作者不详,如有侵权,请联系: 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单. 为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,

线程池实例:使用Executors和ThreadPoolExecutor

线程池负责管理工作线程,包含一个等待执行的任务队列.线程池的任务队列是一个Runnable集合,工作线程负责从任务队列中取出并执行Runnable对象. java.util.concurrent.executors 提供了 java.util.concurrent.executor 接口的一个Java实现,可以创建线程池.下面是一个简单示例: 首先创建一个Runable 类: WorkerThread.java package com.journaldev.threadpool; public

Linux C++线程池

BaseTask.h 任务基类 1 #ifndef MYTASK_H 2 #define MYTASK_H 3 #include "BaseTask.h" 4 5 class MyTask : public BaseTask 6 { 7 public: 8 virtual void run(void); 9 }; 10 11 #endif MyTask,h 其中一个实现的任务类 1 #ifndef MYTASK_H 2 #define MYTASK_H 3 #include "

Java newFixedThreadPool线程池实例及讲解

闲话不多说,直接上代码. <span style="font-size:18px;">import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MyThreadPool { private ExecutorService exe; private static final int POOL_SIZE = 4; public MyThread

Linux下线程池的理解与简单实现

首先,线程池是什么?顾名思义,就是把一堆开辟好的线程放在一个池子里统一管理,就是一个线程池. 其次,为什么要用线程池,难道来一个请求给它申请一个线程,请求处理完了释放线程不行么?也行,但是如果创建线程和销毁线程的时间比线程处理请求的时间长,而且请求很多的情况下,我们的CPU资源都浪费在了创建和销毁线程上了,所以这种方法的效率比较低,于是,我们可以将若干已经创建完成的线程放在一起统一管理,如果来了一个请求,我们从线程池中取出一个线程来处理,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的

Linux简单线程池实现(带源码)

这里给个线程池的实现代码,里面带有个应用小例子,方便学习使用,代码 GCC 编译可用.参照代码看下面介绍的线程池原理跟容易接受,百度云下载链接: http://pan.baidu.com/s/1i3zMHDV 一.线程池简介 为什么使用线程池? 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任

JAVA四种线程池实例

1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java 1 2 3 4 5 6 7 new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及

[转]简单Linux C线程池

转自:http://www.cnblogs.com/venow/archive/2012/11/22/2779667.html 贴原文章过来,提示有敏感词..那就不贴了. 以下为本博客作者注: 在threadpool_function函数中有这段代码, while ((pool->queue_cur_num == 0) && !pool->pool_close)   //队列为空时,就等待队列非空         {      pthread_cond_wait(&(p