使用c++11写个最简跨平台线程池(转载)

为什么需要多线程?

最简单的多线程长啥样?

为什么需要线程池,有什么问题?

实现的主要原理是什么?

带着这几个问题,我们依次展开。

1.为什么需要多线程?

大部分程序毕竟都不是计算密集型的,简单的说,正常情况下,以单线程的模式来写对程序员而言是最舒心的。因为所有的代码都是顺序执行,非常容易理解!函数一级一级往下调用,代码一行一行执行。但是,代码的世界里,虽然cpu还好,但是却经常需要用到io资源,或者是其他服务器的网络资源,比如像数据库,如果这个时候因此把进程卡住,不管是客户端还是客户端都对用户体验相当糟糕。当然了,计算密集型的运算就更需要多线程,防止主线程被卡住。

2.最简单的多线程长啥样?

举个最简单的例子,服务器采用阻塞式socket,有一个网络线程负责收发包(IO),然后有一个逻辑主线程负责相应的业务操作,主线程和网络线程之间通过最简单的消息队列进行交换,而这个消息队例明显是两个线程都要访问(轮询消息队列是否为空)到的,所以,我们需要给这个消息队列上锁(std::mutex),即可以解决问题。由于比较简单我们就不需要看这个怎么码了。这种模式虽然简单,但是在合适的岗位上,也是极好的!

3.那为什么需要线程池呢,有什么问题?

还以刚才的服务器举例,如果业务线程逻辑比较复杂,又或者他需要访问数据库或者是其他服务器的资源,读取文件等等呢?当然他可以采用异步的数据库接口,但是采用异步意味着业务代码被碎片化。异步是典型的讨厌他,但是又干不掉他的样子。离题了。回归。这个时候我们需要多个业务线程处理了。多个线程就意味着多一份处理能力!回到上个问题,我们的多线程采用轮询消息队列的方式来交换信息,那么这么多个线程,不断的上锁解锁,光这个成本就够了。这个时候,条件变量就上线了(std::condition_variable)就登场了

4.实现的主要原理是什么?

业务线程不要轮询消息队列了,而所有的业务线程处于等待状态,当有消息再来的时候,再由产生消息的人,在我们示例场景就是网络线程了,随便唤醒一个工人线程即可。看看最关键的代码


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

  //消费者

void consumer()

{

    //第一次上锁

    std::unique_lock < std::mutex > lck(mutex_);

    while (active_)

    {

        //如果是活动的,并且任务为空则一直等待

        while (active_ && task_.empty())

            cv_.wait(lck);

        //如果已经停止则退出

        if(!active_)

            break;

        T *quest = task_.front();

        task_.pop();

        //从任务队列取出后该解锁(任务队列锁)了

        lck.unlock();

        //执行任务后释放

        proc_(quest);

        //delete quest;   //在proc_已经释放该指针了

        //重新上锁

        lck.lock();

    }

}

  

算了,还是直接贴完整代码,看注释吧


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

#ifndef _WORKER_POOL_H_

#define _WORKER_POOL_H_

//file: worker_pool.h

//#define  _CRT_SECURE_NO_WARNINGS

// g++ -g -std=c++11 1.cc -D_GLIBCXX_USE_NANOSLEEP -lpthread */

#include <vector>

#include <queue>

#include <thread>

#include <mutex>

#include <condition_variable>

//#include <chrono>

template<typename T>

class WorkerPool

{

public:

    typedef WorkerPool<T> THIS_TYPE;

    typedef std::function<void(T*)> WorkerProc;

    typedef std::vector< std::thread* > ThreadVec;

    WorkerPool()

    {      

        active_ = false;

    }

    virtual ~WorkerPool()

    {

        for(ThreadVec::iterator it = all_thread_.begin();it != all_thread_.end();++it)

            delete *it;

        all_thread_.clear();

    }

    void Start(WorkerProc f,int worker_num=1)

    {

        active_ = true;    

        all_thread_.resize(worker_num);

        for (int i = 0; i < worker_num;i++ )

        {

            all_thread_[i] = new std::thread(std::bind(&THIS_TYPE::consumer,this));

        }

        proc_ = f;

    }

    //生产者

    void Push(T *t)

    {

        std::unique_lock < std::mutex > lck(mutex_);

        task_.push(t);

        cv_.notify_one();

    }

    void Stop()

    {

        //等待所有的任务执行完毕

        mutex_.lock();

        while (!task_.empty())

        {  

            mutex_.unlock();

            std::this_thread::sleep_for(std::chrono::milliseconds(1000));

            cv_.notify_one();

            mutex_.lock();

        }

        mutex_.unlock();

        //关闭连接后,等待线程自动退出

        active_ = false;

        cv_.notify_all();

        for(ThreadVec::iterator it = all_thread_.begin();

            it != all_thread_.end();++it)

            (*it)->join();

    }

private:

    //消费者

    void consumer()

    {

        //第一次上锁

        std::unique_lock < std::mutex > lck(mutex_);

        while (active_)

        {

            //如果是活动的,并且任务为空则一直等待

            while (active_ && task_.empty())

                cv_.wait(lck);

            //如果已经停止则退出

            if(!active_)

                break;

            T *quest = task_.front();

            task_.pop();

            //从任务队列取出后该解锁(任务队列锁)了

            lck.unlock();

            //执行任务后释放

            proc_(quest);

            //delete quest;   //在proc_已经释放该指针了

            //重新上锁

            lck.lock();

        }

    }

    std::mutex mutex_;

    std::queue<T*> task_;

    std::condition_variable cv_;

    bool active_;

    std::vector< std::thread* > all_thread_;

    WorkerProc proc_;

};

#endif

  写一个类继承一下,并写一个工作函数和回调函数处理


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

#include "worker_pool.h"

#include <iostream>

//为了多耗点cpu,计算斐波那契数列吧

static int fibonacci(int a)

{

    //ASSERT(a > 0);

    if (a == 1 || a == 2)

        return 1;

    return fibonacci(a-1) + fibonacci(a-2);

}

//异步计算任务

struct AsyncCalcQuest

{

    AsyncCalcQuest():num(0),result(0)

    {}

    //计算需要用到的变量

    int num;

    int result;

};

//为了测试方便,引入全局变量用于标识线程池已将所有计算完成

const int TOTAL_COUNT = 1000000;

int now_count = 0;

//继承一下线程池类,在子类处理计算完成的业务,在我们这里,只是打印一下计算结果

class CalcWorkerPool:public WorkerPool<AsyncCalcQuest>

{

public:

    CalcWorkerPool(){}

    virtual ~CalcWorkerPool()

    {

    }

    //在工人线程中执行

    void DoWork(AsyncCalcQuest *quest)

    {

        //算了,不算这个了,根本算不出来

        quest->result = fibonacci(quest->num);       

        //quest->result = quest->num*0.618;

        //并将已完成任务返回到准备回调的列表

        std::unique_lock<std::mutex > lck(mutex_callbacks_);

        callbacks_.push_back(quest);

    }

    //在主线程执行

    void DoCallback()

    {

        //组回调任务上锁

        std::unique_lock<std::mutex > lck(mutex_callbacks_);

        while (!callbacks_.empty())

        {

            auto *quest = callbacks_.back();           

            {//此处为业务代码打印一下吧

                std::cout << quest->num << " " << quest->result << std::endl;

                now_count ++;

            }

            delete quest;       //TODO:这里如果采用内存池就更好了

            callbacks_.pop_back();

        }

    }

private:

    //这里是准备给回调的任务列表

    std::vector<AsyncCalcQuest*> callbacks_;

    std::mutex mutex_callbacks_;

};

int main()

{

    CalcWorkerPool workers;

    //工厂开工了 8个工人喔

    workers.Start(std::bind(&CalcWorkerPool::DoWork,&workers,std::placeholders::_1),8);

    

    //开始产生任务了

    for (int i=0; i<TOTAL_COUNT; i++)

    {

        AsyncCalcQuest *quest = new AsyncCalcQuest;

        quest->num = i%40+1;

        workers.Push(quest);

    }

    while (now_count != TOTAL_COUNT)

    {

        workers.DoCallback();

    }

    workers.Stop();

    return 0;

}

时间: 2024-10-24 23:28:33

使用c++11写个最简跨平台线程池(转载)的相关文章

c++11 实现半同步半异步线程池

感受: 随着深入学习,现代c++给我带来越来越多的惊喜- c++真的变强大了. 半同步半异步线程池: 其实很好理解,分为三层 同步层:通过IO复用或者其他多线程多进程等不断的将待处理事件添加到队列中,这个过程是同步进行的. 队列层:所有待处理事件都会放到这里.上一层事件放到这里,下一层从这里获取事件 异步层:事先创建好线程,让瞎猜呢和嗯不断的去处理队列层的任务,上层不关心这些,它只负责把任务放到队列里,所以对上层来说这里是异步的. 看张图: 如果你不熟悉c++11的内容 以下文章仅供参考 c++

手写线程池 (一)

前言准备 1.jdk线程池的使用:https://www.cnblogs.com/jtfr/p/10187419.html 2.线程池核心:线程的复用. 运行的线程是线程池的核心,被添加的任务需要实现过Runnable接口,主要是保证有run方法.运行时候 对象.run() . 一.手写线程池注意要点 1.线程池需要添加任务,任务是放置在一个队列(FIFO)当中,具体只要保证FIFO,或优先级保证(Map集合)先执行.2.线程池运行,需要一个容器存放创建的线程,可数组或集合,可以自己设计思考.3

李小文院士:随性而为 科学应该追求简单性原则(转载)

我属于那种调皮的小孩 经济观察报:成为科学家跟你小时候的成长经历有关系吗? 李小文:基本上没什么关系.我家算是小知识分子家庭,父亲是工程师,母亲是会计,小时候,家教虽然严,但他们的工作都很忙,没时间管.四岁的时候,他们没地方放我,就把我放到小学里去了.初中的时候,我上的是一所很破烂的中学,我属于那种调皮的小孩,上学也没有动力,从来不想去考高分,也从来不在班里争什么名次,但我做题比较灵,也比较快,我交卷的最快纪录是老师刚在黑板上写完题,我就交卷出去玩儿了,好在每次考试我都能刚好及格,成绩能一直保持

基于C++11的线程池,简洁且可以带任意多的参数

咳咳.C++11 加入了线程库,从此告别了标准库不支持并发的历史.然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池.信号量等.线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:"管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复." 貌似没有问题吧.但是写起程序来的时候就出问题了. 废话不多说,先上实现,然后再啰嗦.(dont talk, show me ur code !) 代码实现 1

c++11线程池实现

咳咳.c++11 加入了线程库,从此告别了标准库不支持并发的历史.然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池.信号量等.线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:"管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复." 貌似没有问题吧.但是写起程序来的时候就出问题了. 废话不多说,先上实现,然后再啰嗦.(dont talk, show me ur code !) #ifnde

11 java 线程池 使用实例

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果. 1 线程池做什么 网络请求通常有两种形式: 第一种,请求不是很频繁,而且每次连接后会保持相当一段时间来读数据或

死磕 java线程系列之自己动手写一个线程池

欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池. 属性分析 线程池,顾名思义它首先是一个"池",这个池里面放的是线程,线程是用来执行任务的. 首先,线程池中的线程应该是有类别的,有的是核心线程,有

C++11 —— 使用 thread 实现线程池

1. 引言 在新的 C++11 标准中,引入并发编程的一些基础组件:线程(thread).互斥锁(mutex).条件变量(condition_variable) 等,凭借这些,就足够我设计一个平台无关的 线程池 组件了.下面就详细介绍一下这个线程池组件. 2. 结构设计图 需要特别说明的是,这个线程池组件,在增加了"存在关联性的任务对象顺序执行"的功能后,原本的任务队列就分成了两级任务队列,目的是为了降低 "任务提交" 与 "任务提取" 之间(属

C++11线程池的实现

什么是线程池 处理大量并发任务,一个请求一个线程来处理请求任务,大量的线程创建和销毁将过多的消耗系统资源,还增加了线程上下文切换开销. 线程池通过在系统中预先创建一定数量的线程,当任务请求到来时从线程池中分配一个预先创建的线程去处理任务,线程在处理任务之后还可以重用,不用销毁,从而节省系统资源.对于多核处理器,线程会被分配到多个CPU,提高并行处理效率.每个线程独立阻塞,防止主线程被阻塞而使主流程被阻塞 半同步半异步线程池 三层 第一层:同步服务层,处理上层任务请求 第二层:同步排队层,上层的任