简介Linux C的线程池

  前言:前面有篇博客已经介绍了线程、线程的信号量和互斥锁,请参考博客:http://www.cnblogs.com/liudw-0215/p/8966645.html,接下来将介绍线程池。

  一、理解

  线程池能有效的处理多个线程的并发问题,避免大量的线程因为互相强占系统资源导致阻塞现象,能够有效的降低频繁创建和销毁线程对性能所带来的开销。

  大多数的网络服务器,包括Web服务器都具有一个特点,就是单位时间内必须处理数目巨大的连接请求,但是处理时间却是比较短的。在传统的多线程服务器模型中是这样实现的:一旦有个请求到达,就创建一个新的线程,由该线程执行任务,任务执行完毕之后,线程就退出。这就是"即时创建,即时销毁"的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数非常频繁,那么服务器就将处于一个不停的创建线程和销毁线程的状态。这笔开销是不可忽略的,尤其是线程执行的时间非常非常短的情况。

  线程池就是为了解决上述问题的,它的实现原理是这样的:在应用程序启动之后,就马上创建一定数量的线程,放入空闲的队列中。这些线程都是处于阻塞状态,这些线程只占一点内存,不占用CPU。当任务到来后,线程池将选择一个空闲的线程,将任务传入此线程中运行。当所有的线程都处在处理任务的时候,线程池将自动创建一定的数量的新线程,用于处理更多的任务。执行任务完成之后线程并不退出,而是继续在线程池中等待下一次任务。当大部分线程处于阻塞状态时,线程池将自动销毁一部分的线程,回收系统资源。

  下面是一个简单线程池的实现,这个线程池的代码是我参考网上的一个例子实现的,并进行了加工和修改。

  二、示例

  主要由三个文件组成:threadpool.h头文件、threadpool.c源文件和mainpool.c组成。源码中已有重要的注释,就不加以分析了。

  • threadpool.h头文件:
#include "my.h"
struct job
{
    void* (*callback_function)(void *arg);    //线程回调函数

void *arg;                                //回调函数参数
    struct job *next;
};

struct threadpool
{
    int thread_num;                   //线程池中开启线程的个数
    int queue_max_num;                //队列中最大job的个数
    struct job *head;                 //指向job的头指针
    struct job *tail;                 //指向job的尾指针
    pthread_t *pthreads;              //线程池中所有线程的pthread_t
    pthread_mutex_t mutex;            //互斥信号量
    pthread_cond_t queue_empty;       //队列为空的条件变量
    pthread_cond_t queue_not_empty;   //队列不为空的条件变量
    pthread_cond_t queue_not_full;    //队列不为满的条件变量
    int queue_cur_num;                //队列当前的job个数
    int queue_close;                  //队列是否已经关闭
    int pool_close;                   //线程池是否已经关闭
};

//================================================================================================
////函数名:                   threadpool_init
////函数描述:                 初始化线程池
////输入:                    [in] thread_num     线程池开启的线程个数
////                         [in] queue_max_num  队列的最大job个数
////输出:                    无
////返回:                    成功:线程池地址 失败:NULL
////================================================================================================
//struct threadpool* threadpool_init(int thread_num, int queue_max_num);
//
////================================================================================================
////函数名:                    threadpool_add_job
////函数描述:                  向线程池中添加任务
////输入:                     [in] pool                  线程池地址
////                          [in] callback_function     回调函数
////                          [in] arg                     回调函数参数
////输出:                     无
////返回:                     成功:0 失败:-1
////================================================================================================
int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);
//
////================================================================================================
////函数名:                    threadpool_destroy
////函数描述:                   销毁线程池
////输入:                      [in] pool                  线程池地址
////输出:                      无
////返回:                      成功:0 失败:-1
////================================================================================================
int threadpool_destroy(struct threadpool *pool);
//
////================================================================================================
////函数名:                    threadpool_function
////函数描述:                  线程池中线程函数
////输入:                     [in] arg                  线程池地址
////输出:                     无
////返回:                     无
////================================================================================================
void* threadpool_function(void* arg);
  •   threadpool.c源文件:

    #include "threadpool.h"
    #include "my.h"
    
    struct threadpool* threadpool_init(int thread_num, int queue_max_num)
    {
        struct threadpool *pool = NULL;
        do
        {
        pool = malloc(sizeof(struct threadpool));
        if (NULL == pool)
        {
            printf("failed to malloc threadpool!\n");
            break;
        }
        pool->thread_num = thread_num;
        pool->queue_max_num = queue_max_num;
        pool->queue_cur_num = 0;
        pool->head = NULL;
        pool->tail = NULL;
        if (pthread_mutex_init(&(pool->mutex), NULL))
        {
            printf("failed to init mutex!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_empty), NULL))
        {
            printf("failed to init queue_empty!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_empty), NULL))
        {
            printf("failed to init queue_not_empty!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_full), NULL))
        {
            printf("failed to init queue_not_full!\n");
            break;
        }
        pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
        if (NULL == pool->pthreads)
        {
            printf("failed to malloc pthreads!\n");
            break;
        }
        pool->queue_close = 0;
        pool->pool_close = 0;
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
            pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
        }
    
        return pool;
        } while (0);
    
        return NULL;
    }
    
    int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg)
    {
        assert(pool != NULL);
        assert(callback_function != NULL);
        assert(arg != NULL);
    
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
        {
        pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));   //队列满的时候就等待
        }
        if (pool->queue_close || pool->pool_close)    //队列关闭或者线程池关闭就退出
        {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
        }
        struct job *pjob =(struct job*) malloc(sizeof(struct job));
        if (NULL == pjob)
        {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
        }
        pjob->callback_function = callback_function;
        pjob->arg = arg;
        pjob->next = NULL;
        if (pool->head == NULL)
        {
        pool->head = pool->tail = pjob;
        pthread_cond_broadcast(&(pool->queue_not_empty));  //队列空的时候,有任务来时就通知线程池中的线程:队列非空
        }
        else
        {
        pool->tail->next = pjob;
        pool->tail = pjob;
        }
        pool->queue_cur_num++;
        pthread_mutex_unlock(&(pool->mutex));
        return 0;
    }
    
    void* threadpool_function(void* arg)
    {
        struct threadpool *pool = (struct threadpool*)arg;
        struct job *pjob = NULL;
        while (1)  //死循环
        {
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == 0) && !pool->pool_close)   //队列为空时,就等待队列非空
        {
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
        }
        if (pool->pool_close)   //线程池关闭,线程就退出
        {
            pthread_mutex_unlock(&(pool->mutex));
            pthread_exit(NULL);
        }
        pool->queue_cur_num--;
        pjob = pool->head;
        if (pool->queue_cur_num == 0)
        {
            pool->head = pool->tail = NULL;
        }
        else
        {
            pool->head = pjob->next;
        }
        if (pool->queue_cur_num == 0)
        {
            pthread_cond_signal(&(pool->queue_empty));        //队列为空,就可以通知threadpool_destroy函数,销毁线程函数
        }
        if (pool->queue_cur_num == pool->queue_max_num - 1)
        {
            pthread_cond_broadcast(&(pool->queue_not_full));  //队列非满,就可以通知threadpool_add_job函数,添加新任务
        }
        pthread_mutex_unlock(&(pool->mutex));
    
        (*(pjob->callback_function))(pjob->arg);   //线程真正要做的工作,回调函数的调用
        free(pjob);
        pjob = NULL;
        }
    }
    int threadpool_destroy(struct threadpool *pool)
    {
        assert(pool != NULL);
        pthread_mutex_lock(&(pool->mutex));
        if (pool->queue_close || pool->pool_close)   //线程池已经退出了,就直接返回
        {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
        }
    
        pool->queue_close = 1;        //置队列关闭标志
        while (pool->queue_cur_num != 0)
        {
        pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));  //等待队列为空
        }    
    
        pool->pool_close = 1;      //置线程池关闭标志
        pthread_mutex_unlock(&(pool->mutex));
        pthread_cond_broadcast(&(pool->queue_not_empty));  //唤醒线程池中正在阻塞的线程
        pthread_cond_broadcast(&(pool->queue_not_full));   //唤醒添加任务的threadpool_add_job函数
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
        pthread_join(pool->pthreads[i], NULL);    //等待线程池的所有线程执行完毕
        }
    
        pthread_mutex_destroy(&(pool->mutex));          //清理资源
        pthread_cond_destroy(&(pool->queue_empty));
        pthread_cond_destroy(&(pool->queue_not_empty));
        pthread_cond_destroy(&(pool->queue_not_full));
        free(pool->pthreads);
        struct job *p;
        while (pool->head != NULL)
        {
        p = pool->head;
        pool->head = p->next;
        free(p);
        }
        free(pool);
        return 0;
    }

    mainpool.c文件:

  • #include "threadpool.h"
    
    void* work(void* arg)
    {
        char *p = (char*) arg;
        printf("threadpool callback fuction : %s.\n", p);
        sleep(1);
    }
    
    int main(void)
    {
        struct threadpool *pool = (struct threadpool *)threadpool_init(10, 20);
        threadpool_add_job(pool, work, "1");
        threadpool_add_job(pool, work, "2");
        threadpool_add_job(pool, work, "3");
        threadpool_add_job(pool, work, "4");
        threadpool_add_job(pool, work, "5");
        threadpool_add_job(pool, work, "6");
        threadpool_add_job(pool, work, "7");
        threadpool_add_job(pool, work, "8");
        threadpool_add_job(pool, work, "9");
        threadpool_add_job(pool, work, "10");
        threadpool_add_job(pool, work, "11");
        threadpool_add_job(pool, work, "12");
        threadpool_add_job(pool, work, "13");
        threadpool_add_job(pool, work, "14");
        threadpool_add_job(pool, work, "15");
        threadpool_add_job(pool, work, "16");
        threadpool_add_job(pool, work, "17");
        threadpool_add_job(pool, work, "18");
        threadpool_add_job(pool, work, "19");
        threadpool_add_job(pool, work, "20");
        threadpool_add_job(pool, work, "21");
        threadpool_add_job(pool, work, "22");
        threadpool_add_job(pool, work, "23");
        threadpool_add_job(pool, work, "24");
        threadpool_add_job(pool, work, "25");
        threadpool_add_job(pool, work, "26");
        threadpool_add_job(pool, work, "27");
        threadpool_add_job(pool, work, "28");
        threadpool_add_job(pool, work, "29");
        threadpool_add_job(pool, work, "30");
        threadpool_add_job(pool, work, "31");
        threadpool_add_job(pool, work, "32");
        threadpool_add_job(pool, work, "33");
        threadpool_add_job(pool, work, "34");
        threadpool_add_job(pool, work, "35");
        threadpool_add_job(pool, work, "36");
        threadpool_add_job(pool, work, "37");
        threadpool_add_job(pool, work, "38");
        threadpool_add_job(pool, work, "39");
        threadpool_add_job(pool, work, "40");
    
        sleep(5);
        threadpool_destroy(pool);
        return 0;
    }
    

原文地址:https://www.cnblogs.com/liudw-0215/p/8968873.html

时间: 2024-08-10 02:00:06

简介Linux C的线程池的相关文章

linux下的线程池

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了.如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了. 下面是Linux系统下用C语言创建的一个线程池.线程池会维护一个任务链表(每个CThread_worker结构就是一个任务).   pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函

Linux下简易线程池

线程池简介 简易线程池实现 线程池头文件threadpool.h如下: 1 #ifndef THREADPOOL_H 2 #define THREADPOOL_H 3 4 #include <stdio.h> 5 #include <stdlib.h> 6 #include <unistd.h> 7 #include <pthread.h> 8 9 /** 10 * 线程体数据结构 11 */ 12 typedef struct runner 13 { 14

Linux平台下线程池的原理及实现

转自:http://blog.csdn.net/lmh12506/article/details/7753952 前段时间在github上开了个库,准备实现自己的线程池的,因为换工作的事,一直也没有实现,参考这篇文章准备着手实现一下. 什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了.如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了. 下面是Linu

一个Linux下C线程池的实现

在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包.这个线程只服务于这个用户 , 当 用户与服务器端关闭连接以后, 服务器端销毁这个线程.然而频繁地开辟与销毁线程极大地占用了系统的资源.而且在大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

Linux系统编程——线程池

线程池基本原理 在传统服务器结构中,常是有一个总的监听线程监听有没有新的用户连接服务器,每当有一个新的用户进入,服务器就开启一个新的线程用户处理这 个用户的数据包.这个线程只服务于这个用户,当用户与服务器端关闭连接以后,服务器端销毁这个线程.(关于并发服务器更多详情,请看<并发服务器>). 然而频繁地开辟与销毁线程极大地占用了系统的资源,而且在大量用户的情况下,系统为了开辟和销毁线程将浪费大量的时间和资源.线程池提供了一个解决外部大量用户与服务器有限资源的矛盾. 线程池和传统的一个用户对应一个

linux 下c++线程池的简单实现(在老外代码上添加注释)

作为一个c++菜鸟,研究半天这个代码的实现原理,发现好多语法不太熟悉,因此加了一大堆注释,仅供参考.该段代码主要通过继承workthread类来实现自己的线程代码,通过thread_pool类来管理线程池,线程池不能够实现动态改变线程数目,存在一定局限性.目前可能还有缺陷,毕竟c++来封装这个东西,资源释放什么的必须想清楚,比如vector存储了基类指针实现多态,那么如何释放对象仍需要考虑,后续我可能会更进一步修改完善该代码,下面贡献一下自己的劳动成果. #include <pthread.h>

Linux中epoll+线程池实现高并发

服务器并发模型通常可分为单线程和多线程模型,这里的线程通常是指"I/O线程",即负责I/O操作,协调分配任务的"管理线程",而实际的请求和任务通常交由所谓"工作者线程"处理.通常多线程模型下,每个线程既是I/O线程又是工作者线程.所以这里讨论的是,单I/O线程+多工作者线程的模型,这也是最常用的一种服务器并发模型.我所在的项目中的server代码中,这种模型随处可见.它还有个名字,叫"半同步/半异步"模型,同时,这种模型也是生

Linux下简单的多线程编程--线程池的实现

/* 写在前面的话: 今天刚“开原”,选择了一篇关于线程池的文件与大家分享,希望能对您学习有所帮助,也希望能与大家共同学习! 选择在这个特殊的时候注册并发文章也是有一些我个人特殊的意义的,看我的id(西游小学生.45)就知道了,哈哈.在这里也很感谢博客园的员工,刚发申请两分钟就同意了. */ 最近由于要写一个类似于QQ的程序,所以想到要用到多线程.既然要用多线程,那何不写一个线程池?于是上网搜了搜多线程的代码,发现大多都不是很完善,或者有些小bug.所以,在这里贴出一个完整的,经过我多重测试的,