c++线程池的实现

线程池编程简介:

在 我们的服务端的程序中运用了大量关于池的概念,线程池、连接池、内存池、对象池等等。使用池的概念后可以高效利用服务器端的资源,比如没有大量的线程在系 统中进行上下文的切换,一个数据库连接池,也只需要维护一定里的连接,而不是占用很多数据库连接资源。同时它们也避免了一些耗时的操作,比如创建一个线 程,申请一个数据库连接,而且可能就只使用那么一次,然后就立刻释放刚申请的资源,效率很低。

在我的上一篇blog中已经实现一个线程基类了,在这里我们只需要实现一个线程池类ThreadPool和该线程池调度的工作线程类WorkThread即可,而且WorkThread是继承自Thread类的。

实现思路:

一个简单的线程池的实现思路一般如下:

  1. 在ThreadPool中创建多个线程(WorkThreadk对象),每个线程均处于阻塞状态,等待任务的到来
  1. ThreadPool提供一个提交任务的接口,如post_job(ProcCallback func, void* data); post_job后会立即返回,不会阻塞
  2. ThreadPool维护一个空闲线程队列,当客户程序调用post_job()后,如果空闲队列中有空闲线程,则取出一个线程句柄,并设置任务再给出新任务通知事件即可,处理等待的线程捕捉到事件信号后便开始执行任务,执行完后将该线程句柄重新push到空闲线程队列中
  3. 该线程池采用回调函数方式

首先我们实现一个WorkThread类:

#include <pthread.h>
#include <vector>
#include <iostream>
using namespace std;

typedef void(*ProcCallBack)(void *);

class WorkThread;

class ThreadPool
{
    friend class WorkThread;
public:
    ThreadPool(){}
    ~ThreadPool();
    int    start_thread_pool(size_t thread_num = 5);        //启动thread_num个线程
    int    stop_thread_pool();                                      //线束线程池
    void    destroy();                                                //销毁线程池所申请的资源
    void    post_job(ProcCallBack func, void* data);        //提交任务接口,传入回调函数地址和参数

protected:
    WorkThread*    get_idle_thread();                              //从获得空闲队列中取得一个线程句柄
    void        append_idle_thread(WorkThread* pthread);    //加入到thread_vec_和idl_que_中
    void        move_to_idle_que(WorkThread* idlethread);    //将线程句柄加入到idle_que_中

private:
    size_t                thr_num_;                      //线程数目
    vector<WorkThread*>        thr_vec_;        //线程句柄集合
    vector<WorkThread*>    idle_que_;     //空闲线程队列

private:
    // not implement
    ThreadPool(const ThreadPool&);
    ThreadPool&    operator=(const ThreadPool&);
};

class WorkThread
{
    friend class ThreadPool;
public:
    WorkThread(ThreadPool* pthr_pool)
    {
        hr_pool_ = pthr_pool;
        cb_func_ = NULL;
        param_ = NULL;
    }
    ~WorkThread(){}
    void set_job(ProcCallBack func, void *param)
    {
        cb_func_ = func;
        param_ = param;
        //notify();
    }
    void run()
    {
        if (cb_func_)
        {
            cb_func_(param_);
        }
        cb_func_ = NULL;
        param_ = NULL;
        hr_pool_->move_to_idle_que(this);
    }
private:
    ThreadPool * hr_pool_;
    ProcCallBack cb_func_;
    void*        param_;
};

线程池实现的关键是如何创建多个线程,并且当任务来临时可以从线程池中取一个线程(也就是去得到其中一个线程的指针),然后提交任务并执行。还有一点就是 当任务执行完后,应该将该线程句柄重新加入到空闲线程队列,所以我们将ThreadPool的指针传入给了WorkThread,thr_pool_最后 可以调用move_to_idle_que(this)来将该线程句柄移到空闲队列中。

ThreadPool中一些关键代码的实现:

#include"threadpool.h"
#include<cstdio>

class ThreadPool;
int ThreadPool::start_thread_pool(size_t thread_num)
{
    thr_num_ = thread_num;
    int    ret = 0;
    for (size_t i = 0; i < thr_num_; ++i)
    {
        WorkThread*    pthr = new WorkThread(this);
        //pthr->set_thread_id(i);
        if ((ret = pthr->hr_pool_->start_thread_pool()) != 0)
        {
            printf("start_thread_pool: failed when create a work thread: %d\n", i);
            delete pthr;
            return i;
        }
        append_idle_thread(pthr);
    }
    return thr_num_;
}
int ThreadPool::stop_thread_pool()
{
    for (size_t i = 0; i < thr_vec_.size(); ++i)
    {
        WorkThread* pthr = thr_vec_[i];
        //pthr->join();
        delete pthr;
    }
    thr_vec_.clear();
    idle_que_.clear();
    return 0;
}
void ThreadPool::destroy()
{
    stop_thread_pool();
}
void ThreadPool::append_idle_thread(WorkThread* pthread)
{
    thr_vec_.push_back(pthread);
    idle_que_.push_back(pthread);
}
void ThreadPool::move_to_idle_que(WorkThread* idlethread)
{
    idle_que_.push_back(idlethread);
}
WorkThread* ThreadPool::get_idle_thread()
{
    WorkThread*    pthr = NULL;
    if (!idle_que_.empty())
    {
        vector<WorkThread*>::iterator it = idle_que_.end();
        pthr = *it;
        idle_que_.pop_back();
    }

    return pthr;
}
void ThreadPool::post_job(ProcCallBack func, void* data)
{
    WorkThread* pthr = get_idle_thread();
    while (pthr == NULL)
    {
        //Sleep(500000);
        pthr = get_idle_thread();
    }
    pthr->set_job(func, data);
}

void count(void* param)
{
    // do some your work, like:
    int* pi = static_cast<int*>(param);
    int val = *pi + 1;
    printf("val=%d\n", val);
    delete pi;
}
int main()
{
    //程序中使用如下:
    ThreadPool* ptp = new ThreadPool();
    ptp->start_thread_pool(3);        //启动3 个线程
    ptp->post_job(count, new int(1));        //提交任务
    ptp->post_job(count, new int(2));
    ptp->post_job(count, new int(3));
    //程序线束时
    ptp->stop_thread_pool();
    return 0;
}
时间: 2024-10-24 15:00:16

c++线程池的实现的相关文章

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

线程的控制和线程池

一.WaitHandle: ”.Net 中提供了一些线程间更自由通讯的工具,他们提供了通过"信号"进行通讯的机制 可以通过ManualResetEvent,AutoResetEvent(他是在开门并且一个 WaitOne 通过后自动关门)来进行线程间的通讯 waitOne:    等待开门 Set:           开门 Reset:       关门 static void Main(string[] args) { ManualResetEvent mre = new Manu

内存池、进程池、线程池

首先介绍一个概念"池化技术 ".池化技术 一言以蔽之就是:提前保存大量的资源,以备不时之需以及重复使用. 池化技术应用广泛,如内存池,线程池,连接池等等.内存池相关的内容,建议看看Apache.Nginx等开源web服务器的内存池实现. 起因:由于在实际应用当中,分配内存.创建进程.线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作.           因此,当程序中需要频繁的进行内存申请释放,进程.线程创建销毁等操作时,通常会使用内存池.进程池.

缓冲池,线程池,连接池

SSH:[email protected]:unbelievableme/object-pool.git   HTTPS:https://github.com/unbelievableme/object-pool.git 缓冲池 设计要点:包含三个队列:空缓冲队列(emq),装满输入数据的输入的队列(inq),装满输出数据的输出队列(outq),输入程序包括收容输入(hin),提取输入(sin),输出程序包括收容输出(hout)和提取输出(sout). 注意点:输入程序和输出程序会对缓冲区并发访

记5.28大促压测的性能优化&mdash;线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

线程池的创建

package com.newer.cn; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test1 { public static void main(String[] args) { // 创建线程池的方式 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程. E

Java底层技术系列文章-线程池框架

一.线程池结构图    二.示例 定义线程接口 public class MyThread extends Thread { @Override publicvoid run() { System.out.println(Thread.currentThread().getName() + "正在执行"); }}   1:newSingleThreadExecutor ExecutorService pool = Executors. newSingleThreadExecutor()

线程池中的线程的排序问题

1 package org.zln.thread.poolqueue; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 6 import java.util.Comparator; 7 import java.util.UUID; 8 import java.util.concurrent.*; 9 10 /** 11 * 线程池中的线程的排序问题 12 * Created by sherry on 16/11/4

多线程篇七:通过Callable和Future获取线程池中单个务完成后的结果

使用场景:如果需要拿到线程的结果,或者在线程完成后做其他操作,可以使用Callable 和 Futrue 1.定义一个线程池,向线程池中提交单个callable任务 ExecutorService threadPools=Executors.newSingleThreadExecutor(); Future<String> future=threadPools.submit(new Callable<String>() { @Override public String call(

多线程篇六:线程池

1.固定大小的线程池 ExecutorService threadPools1=Executors.newFixedThreadPool(3); for(int i=1;i<=10;i++){ final int task=i; //循环10次,一共往线程池里面放10个任务 threadPools1.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().ge