简单线程池原理和代码

线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务

本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理

代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了。

线程池代码见Github【点击】

由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明)

  • std::thread
  • std::mutex
  • std::condition_variable
  • std::move
  • std::lock_guard
  • std::unique_lock
  • lambda表达式

下面开始代码讲解:

先从入口说起:构造函数

template <unsigned _TCount>
FixedThreadPool<_TCount>::FixedThreadPool()
: m_jobsleft(0), m_isDone(false), m_isFinished(false) {
    for (int i = 0; i < _TCount; ++i) {
        m_threads[i] = std::move(std::thread([this, i]() {
            this->DoTask();
        }));
    }
}

在构造函数中,根据模板参数_TCount创建一定数量的线程,将所有线程存在了数组(std::array)中。

然后你会注意到,每个线程都会运行DoTask方法,注意:DoTask是运行于子线程中的

template <unsigned _TCount>
void FixedThreadPool<_TCount>::DoTask() {
    // Run in subthreads.
    // Take the next job in the queue and run it. Notify the main thread that a job has completed.
    while (!m_isDone) {
        this->NextJob()();
        -- m_jobsleft;
        // Notify the main thread that a job has completed.
        m_conditionWait.notify_one();
    }
}

不去看那些烦人的标记变量,先从大的方面理解其原理:

在循环中每次去一个任务(我猜是在队列里取,若队列为空则会block),取到任务后执行任务(即执行lambda表达式),jobsleft减少,然后通知给主线程“我又执行完一个任务”

这里有两个关注点:NextJob如何取任务?m_conditionWait都有谁在阻塞?
先看NextJob如何取任务?

template <unsigned _TCount>
typename FixedThreadPool<_TCount>::JobHandler FixedThreadPool<_TCount>::NextJob() {
    // Run in subthreads.
    // Get the next job; pop the first item in the queue, otherwise wait for a signal from the main thread.
    JobHandler handler;
    std::unique_lock<std::mutex> qlock(m_mutexQueue);

    // Wait for a job if we don‘t have any.
    m_conditionJob.wait(qlock, [this]()->bool {
        return m_queue.size() || m_isDone;
    });

    // Get job from the queue
    if (!m_isDone) {
        handler = m_queue.front();
        m_queue.pop_front();
    }
    else { // If we‘re bailing out, ‘inject‘ a job into the queue to keep jobsleft accurate.
        handler = []{};
        ++m_jobsleft;
    }
    return handler;
}

注意:这个函数也是运行在子线程中

希望你已经学会使用std::condition_variable了,简单来说m_conditionJob.wait就是在判断是否队列为空(先不要关心烦人的m_isDone)。

如果队列为空则会阻塞,然后就会一直等待,等待到啥时候呢?(我猜测当有新任务时一定会有通知notify_one()),通知来了检测满足条件就继续向下执行。

会看到从队列中取出一个任务,然后返回。

这里有个关注点:啥时候会有m_conditionJob的notify_xxx()?

在这里:

template <unsigned _TCount>
void FixedThreadPool<_TCount>::AddJob(JobHandler job) {
    // Add a new job to the pool. If there are no jobs in the queue, a thread is woken up to take the job. If all threads are busy, the job is added to the end of the queue.
    std::lock_guard<std::mutex> guard(m_mutexQueue);
    m_queue.emplace_back(job);
    ++ m_jobsleft;
    m_conditionJob.notify_one();
}

注意:这是主线程中由用户调用的方法

当然还有一处在JoinAll中,不过这对理解线程池运行流程关系不大。下面讨论另一个问题时在看。

现在你脑子中是否有线程池的运行流程了。

主线程:【创建子线程】->【AddJob】

子线程:【DoTask】->【NextJob】->【NextJob】...->【NextJob】

描述:子线程在DoTask中循环通过【NextJob】取任务,当没有任务时,会block在NextJob中,一直等待到主线程的【AddJob】调用后,会wakeup一个(只会唤醒一个线程)已经阻塞的NextJob,然后NextJob返回队列中的一个任务,交给DoTask执行,DoTask执行完成后通知又执行完一个任务(可用于判断所有任务是否都执行完成)。

到这里还比较简单一些,下面考虑退出的问题:

退出的问题在于让所有可能被阻塞住的子线程全部唤醒,然后顺利的走向销毁。

先看析构函数:

template <unsigned _TCount>
FixedThreadPool<_TCount>::~FixedThreadPool() {
    this->JoinAll();
}

JoinAll,听着就像thread的join嘛,看看:

template <unsigned _TCount>
void FixedThreadPool<_TCount>::JoinAll(bool wait) {
    if (m_isFinished) {
        return;
    }

    if (wait) {
        this->WaitAll();
    }

    // note that we‘re done, and wake up any thread that‘s
    // waiting for a new job
    m_isDone = true;
    m_conditionJob.notify_all();

    for(auto &x : m_threads) {
        if(x.joinable()) {
            x.join();
        }
    }
    m_isFinished = true;
}

注意:JoinAll会在主线程执行

奥,m_isFinished用来保证JoinAll只执行一次的。

wait嘛,WaitAll看名字就像等待所有任务执行完毕嘛,而且必须要阻塞住调用WaitAll的线程,否则怎么能叫Wait呢!

下面看看m_isDone=true,然后通知所有(notify_all())的m_conditionJob.wait,那就是通知所有线程中的m_conditionJob.wait呀,先不管继续往下看。

下面就是遍历所有的子线程,然后全部join掉,这可是会阻塞主线程的!主线程会等待所有join的子线程执行完才能回到主线程,不过若所有任务执行完了,join之后子线程不就over了嘛

    // Wait for a job if we don‘t have any.
    m_conditionJob.wait(qlock, [this]()->bool {
        return m_queue.size() || m_isDone;
    });

还记得这里吧,NextJob方法,运行于子线程中。

当JoinAll中notify_all时,这里就会被唤醒,由于m_isDone为true,不管你队列是否为空都会继续执行下去。子线程要退出,那么就不能被阻塞住,所以这里就是用来唤醒子线程,让子线程顺利退出的。

    // Get job from the queue
    if (!m_isDone) {
        handler = m_queue.front();
        m_queue.pop_front();
    }
    else { // If we‘re bailing out, ‘inject‘ a job into the queue to keep jobsleft accurate.
        handler = []{};
        ++m_jobsleft;
    }

所以就到了下面这个语句块,返回一个空的handler。 都要退出了,为了处理一致,返回空的也无可厚非。

下面再看看WaitAll是什么鬼:

template <unsigned _TCount>
void FixedThreadPool<_TCount>::WaitAll() {
    // Waits until all jobs have finshed executing.
    if (m_jobsleft > 0) {
        std::unique_lock<std::mutex> lock(m_mutexWait);
        m_conditionWait.wait(lock, [this]()->bool {
            return this->m_jobsleft == 0;
        });
        lock.unlock();
    }
}

奥,原来如此,WaitAll果然就是阻塞住你,然后等待剩余的任务数为0时,才会被唤醒(结合DoTask中的notify_one)。

这么看来,在JoinAll中:

如果wait=true,那么就会等待所有任务自然的执行完成后join所有线程退出。

如果wait=false,那么就会让所有阻塞在等待任务上的线程直接执行一个空任务,然后退出。或者让正在执行任务的线程执行完任务后退出。

到这里你明白了吗?

好好看看代码,碰到了不会的地方在来找找灵感吧。

时间: 2024-10-12 05:52:43

简单线程池原理和代码的相关文章

线程池原理和简单实现

1.线程池原理 :伪代码 在线程池中假设最多开3个线程,当小于三个,进行创建,添加到集合中,然后不停的轮训线程集合进行执行,直到为空时,进入等待状态 public class ThreadPool { int maxCount = 3;//假设最多开只能开三个线程 AtomicInteger count =new AtomicInteger(0);// 当前开的线程数 count=0,为了线程同步,使用此api LinkedList<Runnable> runnables = new Link

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

简单线程池的实现

1. 什么是线程池 线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务.如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可. 线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销.一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为 1.     计算密集型任务: 2.     I/O密集型任务. 计算密集型任务

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

线程池原理

在面向对象编程中,对象创建和销毁是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是对一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池就是遵循这一思想而产生的,下面将介绍的线程池技术同样符合这一思想. 多线程技术主要解决处

线程池;java实现线程池原理

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动. 组成部分 1.线程池

C++版简单线程池

需求 之前写过一个C#版本的简单线程池http://blog.csdn.net/ylbs110/article/details/51224979 由于刚刚学习了C++11新特性中的future,于是想到用它来实现一个线程池. 实现 思路基本和C#版本的一样,主要区别是委托的实现,线程句柄的不同和线程锁: 本来C++有function模板,但是实现起来比较麻烦,这里主要是实现线程池,所以动态参数的委托就不实现了,直接使用typedef void(*Func)();来实现一个无参数无返回值的函数指针

Java线程池原理

转自:https://www.jianshu.com/p/a166944f1e73 本篇文章主要介绍Java线程池的原理以及源码的分析 线程池的介绍 Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池. 线程池的优点 第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行. 第三:提高线程的可管理性.线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会

含源码解析,深入Java 线程池原理

从池化技术到底层实现,一篇文章带你贯通线程池技术. 1.池化技术简介 在系统开发过程中,我们经常会用到池化技术来减少系统消耗,提升系统性能. 在编程领域,比较典型的池化技术有: 线程池.连接池.内存池.对象池等. 对象池通过复用对象来减少创建对象.垃圾回收的开销:连接池(数据库连接池.Redis连接池和HTTP连接池等)通过复用TCP连接来减少创建和释放连接的时间.线程池通过复用线程提升性能.简单来说,池化技术就是通过复用来提升性能. 线程.内存.数据库的连接对象都是资源,在程序中,当你创建一个