linux 下c++线程池的简单实现(在老外代码上添加注释)

作为一个c++菜鸟,研究半天这个代码的实现原理,发现好多语法不太熟悉,因此加了一大堆注释,仅供参考。该段代码主要通过继承workthread类来实现自己的线程代码,通过thread_pool类来管理线程池,线程池不能够实现动态改变线程数目,存在一定局限性。目前可能还有缺陷,毕竟c++来封装这个东西,资源释放什么的必须想清楚,比如vector存储了基类指针实现多态,那么如何释放对象仍需要考虑,后续我可能会更进一步修改完善该代码,下面贡献一下自己的劳动成果。

#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>

using namespace std;
/*
WorkerThread class
This class needs to be sobclassed by the user.
*/
class WorkerThread{
public:
    int id;
    unsigned virtual executeThis()
	{
		return 0;
	}

    WorkerThread(int id) : id(id) {}
    virtual ~WorkerThread(){}
};

/*
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and synchronizations between all threads.
*/
class ThreadPool{
public:
    ThreadPool();
    ThreadPool(int maxThreadsTemp);
    virtual ~ThreadPool();

	void destroyPool(int maxPollSecs);

    bool assignWork(WorkerThread *worker);
    bool fetchWork(WorkerThread **worker);

	void initializeThreads();

    static void *threadExecute(void *param); // pthread_create()调用的函数必须为静态的
    static pthread_mutex_t mutexSync;
    static pthread_mutex_t mutexWorkCompletion;//工作完成个数互斥量

private:
    int maxThreads;

    pthread_cond_t  condCrit;
    sem_t availableWork;
    sem_t availableThreads;

    vector<WorkerThread *> workerQueue;

    int topIndex;
    int bottomIndex;
	int incompleteWork;
    int queueSize;
};
#include <stdlib.h>
#include "threadpool.h"
using namespace std;

//初始化类的静态成员必须加上类型和作用域,static数据成员必须在类定义体的外部定义,不像不同数据成员可以用构造函数初始化
//应该在定义时进行初始化,注意是定义,这个定义应该放在包含类的非内联成员函数定义的文件中。
//注:静态成员函数只能使用静态变量,非静态没有限制,静态变量必须在外部定义和初始化,没初始化就为默认数值
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;

ThreadPool::ThreadPool()
{
	ThreadPool(2);
}

ThreadPool::ThreadPool(int maxThreads)
{
   if (maxThreads < 1)
       maxThreads=1;

   pthread_mutex_lock(&mutexSync);
   this->maxThreads = maxThreads;
   this->queueSize = maxThreads;
   workerQueue.resize(maxThreads, NULL);//调整容器大小,然后用默认构造函数初始化新的空间
   topIndex = 0;
   bottomIndex = 0;
   incompleteWork = 0;
   sem_init(&availableWork, 0, 0); //工作队列信号量,表示已经加入队列的工作,初始时没有工作
   sem_init(&availableThreads, 0, queueSize);  //空闲线程信号量,一开始就有quisize个线程可以使用
   pthread_mutex_unlock(&mutexSync);
}

//调用pthread_create()让线程跑起来,threadExecute是类的静态函数,因为pthread_create()第三个参数必须为静态函数
void ThreadPool::initializeThreads()
{
   for(int i = 0; i<maxThreads; ++i)
	{
		pthread_t tempThread;
		pthread_create(&tempThread, NULL, ThreadPool::threadExecute, (void*)this );
   }
}

ThreadPool::~ThreadPool()
{
	//因为对于vector,clear并不真正释放内存(这是为优化效率所做的事),clear实际所做的是为vector中所保存的所有对象调用析构函数(如果有的话),
	//然后初始化size这些东西,让你觉得把所有的对象清除了。。。
    //真正释放内存是在vector的析构函数里进行的,所以一旦超出vector的作用域(如函数返回),首先它所保存的所有对象会被析构,
	//然后会调用allocator中的deallocate函数回收对象本身的内存。。。
    workerQueue.clear();
}

void ThreadPool::destroyPool(int maxPollSecs = 2)
{
	while(incompleteWork>0 )
	{
	    //cout << "Work is still incomplete=" << incompleteWork << endl;
		sleep(maxPollSecs);
	}
	cout << "All Done!! Wow! That was a lot of work!" << endl;
	sem_destroy(&availableWork);
	sem_destroy(&availableThreads);
    pthread_mutex_destroy(&mutexSync);
    pthread_mutex_destroy(&mutexWorkCompletion);

}

//分配人物到top,然后通知有任务需要执行。
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
    pthread_mutex_lock(&mutexWorkCompletion);
    incompleteWork++;
		//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
	pthread_mutex_unlock(&mutexWorkCompletion);
	sem_wait(&availableThreads);
	pthread_mutex_lock(&mutexSync);
    //workerVec[topIndex] = workerThread;
    workerQueue[topIndex] = workerThread;
    //cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
	if(queueSize !=1 )
		topIndex = (topIndex+1) % (queueSize-1);
    sem_post(&availableWork);
	pthread_mutex_unlock(&mutexSync);
	return true;
}

//当已经有人物放到队列里面后,就会受到通知,然后从底部拿走工作,在workerArg中返回
bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
	sem_wait(&availableWork);

	pthread_mutex_lock(&mutexSync);
	WorkerThread * workerThread = workerQueue[bottomIndex];
    workerQueue[bottomIndex] = NULL;
	*workerArg = workerThread;
	if(queueSize !=1 )
		bottomIndex = (bottomIndex+1) % (queueSize-1);
	sem_post(&availableThreads);
	pthread_mutex_unlock(&mutexSync);
    return true;
}

//每个线程运行的静态函数实体,executeThis 方法将会被继承累从写,之后实现具体线程的工作。
void *ThreadPool::threadExecute(void *param)
{
	WorkerThread *worker = NULL;
	while(((ThreadPool *)param)->fetchWork(&worker))
	{
		if(worker)
        {
			worker->executeThis();
            //cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
            delete worker;
            worker = NULL;
        }

		pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
        //cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
	 	((ThreadPool *)param)->incompleteWork--;
		pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
	}
	return 0;
}
#include <iostream>
#include "threadpool.h"

using namespace std;

#define ITERATIONS 20

class SampleWorkerThread : public WorkerThread
{
public:
    int id;
	unsigned virtual executeThis()
	{
	// Instead of sleep() we could do anytime consuming work here.
	// Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
		cout<<"This is SampleWorkerThread sleep 2s"<<endl;
		sleep(2);
		return(0);
	}

    SampleWorkerThread(int id) : WorkerThread(id), id(id)
    {
//       cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
    }

    ~SampleWorkerThread()
    {
//       cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
    }
};

int main(int argc, char **argv)
{

	cout<<"Thread pool"<<endl;
	ThreadPool* myPool = new ThreadPool(25);
	//pthread_create()执行,开始等待任务分配
	myPool->initializeThreads();

	//用来计算时间间隔。
    time_t t1=time(NULL);

	//分配具体工作到线程池
	for(unsigned int i=0;i<ITERATIONS;i++){
		SampleWorkerThread* myThreathreadExecuted = new SampleWorkerThread(i);
		myPool->assignWork(myThreathreadExecuted);
	}

	//销毁钱等待所有线程结束,等待间隔为2秒。
    myPool->destroyPool(2);

    time_t t2=time(NULL);
    cout << t2-t1 << " seconds elapsed\n" << endl;
	delete myPool;

    return 0;
}

ubuntu 12.04下运行成功,编译命令如下:g++ -g main.cpp thread_pool.cpp -o thread_pool -lpthread

时间: 2024-10-10 14:34:58

linux 下c++线程池的简单实现(在老外代码上添加注释)的相关文章

linux下的线程池

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了.如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了. 下面是Linux系统下用C语言创建的一个线程池.线程池会维护一个任务链表(每个CThread_worker结构就是一个任务).   pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函

一个Linux下C线程池的实现

在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包.这个线程只服务于这个用户 , 当 用户与服务器端关闭连接以后, 服务器端销毁这个线程.然而频繁地开辟与销毁线程极大地占用了系统的资源.而且在大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序

Linux下简易线程池

线程池简介 简易线程池实现 线程池头文件threadpool.h如下: 1 #ifndef THREADPOOL_H 2 #define THREADPOOL_H 3 4 #include <stdio.h> 5 #include <stdlib.h> 6 #include <unistd.h> 7 #include <pthread.h> 8 9 /** 10 * 线程体数据结构 11 */ 12 typedef struct runner 13 { 14

线程池的简单实现

几个基本的线程函数: //线程操纵函数//创建:   int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);//终止自身    void pthread_exit(void *retval);//终止其他:   int pthread_cancel(pthread_t tid); 发送终止信号后目标线程不一定终止,要调用join函数等待//阻塞

Linux下Java线程详细监控和其dump的分析使用----分析Java性能瓶颈

这里对linux下.sun(oracle) JDK的线程资源占用问题的查找步骤做一个小结: linux环境下,当发现java进程占用CPU资源很高,且又要想更进一步查出哪一个java线程占用了CPU资源时,按照以下步骤进行查找: (一):通过[top -p 12377 -H] 查看java进程的有哪些线程的运行情况:       和通过[jstack 12377 > stack.log]生成Java线程的dump详细信息: 先用top命令找出占用资源厉害的java进程id,如图:# top 如上

linux下minicom的配置和简单使用

安装配置minicom--------------------------------------------------# lsmod | grep usbserial (如果直接使用串口线,而没有用到USB转串口设备,此步可以跳过)   如果有usbserial,说明系统支持USB转串口. 安装minicom (Fedora自带有minicom,此步可以跳过)   apt-get install minicom   apt-get install lrzsz配置minicom   # min

Linux 下GDB的使用之简单入门

Linux 下程序崩溃.先要生成Core文件方可调试(这里Test为被调试程序) 1.查看Core文件(相当于Windows下的dump)大小,如果为0,则不会生成core文件 ulimit -c 查看core文件大小 ulimit -c filesize 设置大小为filesize ulimit -c unlimited 设置core大小为无限制 2.启动被调试程序 进入到被调试程序目录,输入gdb ./Test  回车 如果被调试程序有参数需设置,则 set args xxxx 回车 3.设

一个线程池的简单的实现

线程池实现: 用于执行大量相对短暂的任务 当任务增加的时候能够动态的增加线程池中线程的数量直到达到一个阈值. 当任务执行完毕的时候,能够动态的销毁线程池中的线程 该线程池的实现本质上也是生产者与消费模型的应用.生产者线程向任务队列中添加任务,一旦队列有任务到来,如果有等待线程就唤醒来执行任务,如果没有等待线程并且线程数没有达到阈值,就创建新线程来执行任务. contion.h #ifndef _CONDITION_H_ #define _CONDITION_H_ #include <pthrea

linux下bochs的安装及简单运行

http://bochs.sourceforge.net/    bochs官网.关于bochs不再累述. 我是在UBUNTU下用包管理软件apt-get装的bochs.命令 apt-get install bochs 这是一个没有调试环境的安装,先感受一下小成功的滋味..呵呵 这样的安装后,在命令行下执行$bochs会跳出一个配置界面,表害怕,这说安装成功啦.仔细阅读他的提示会发现是没有配置文件 (配置文件bochsrc,ROMIMAGE,VGAROMIMAGE.这些不解释.自己查).你可以配