C语言实现线程池

以前写过一篇关于如何使用多线程推升推送速度(http://www.cnblogs.com/bai-jimmy/p/5177433.html),能够到达5000qps,其实已经可以满足现在的业务,不过在看nginx的说明文档时,又提到nginx支持线程池来提升响应速度, 一直对如何实现线程池很感兴趣,利用周末的时间参考别人的代码,自己写了一个初级版,并且调通了,还没在实际开发中应用,不知道效果如何

代码如下:

pd_log.h

#ifndef __pd_log_
#define __pd_log_

#define LOG_DEBUG_PATH "debug.log"
#define LOG_ERROR_PATH "error.log"

/**
 * define log level
 */
enum log_level {
        DEBUG = 0,
        ERROR = 1
};

#define error(...) \
        logger(ERROR, __LINE__, __VA_ARGS__)

#define debug(...) \
        logger(DEBUG, __LINE__, __VA_ARGS__)

#define assert(expr, rc)         if(!(expr)){                    error(#expr"is null or 0");                     return rc;              }
#endif

pd_log.c

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <time.h>

#include "pd_log.h"

/**
 * get now timestr
 */
static void get_time(char *time_str, size_t len) {
    time_t tt;
    struct tm local_time;
    time(&tt);
    localtime_r(&tt, &local_time);
    strftime(time_str, len, "%m-%d %H:%M:%S", &local_time);
}

/**
 * log
 */
static void logger(int flag, int line, const char *fmt, ...) {
    FILE *fp = NULL;
    char time_str[20 + 1];
    va_list args;
    get_time(time_str, sizeof(time_str));

    switch (flag) {
        case DEBUG:
            fp = fopen(LOG_DEBUG_PATH, "a");
            if (!fp) {
                return;
            }
            fprintf(fp, "%s DEBUG (%d:%d) ", time_str, getpid(), line);
            break;
        case ERROR:
            fp = fopen(LOG_ERROR_PATH, "a");
            if (!fp) {
                return;
            }
            fprintf(fp, "%s ERROR (%d:%d) ", time_str, getpid(), line);
            break;
        default:
            return;
    }

    va_start(args, fmt);
    vfprintf(fp, fmt, args);
    va_end(args);
    fprintf(fp, "\n");

    fclose(fp);
    return;
}

pd_pool.h

/**
 * 线程池头文件
 * @author jimmy
 * @date 2016-5-14
 */
#ifndef __PD_POOL_
#define __PD_POOL_

/*任务链表*/
typedef struct task_s{
        void (*routine)(void *);
        void *argv;
        struct task_s *next;
} pd_task_t;

/*任务队列*/
typedef struct queue_s{
        pd_task_t *head;
        pd_task_t **tail;
        size_t max_task_num;
        size_t cur_task_num;
}pd_queue_t;

/*线程池*/
typedef struct pool_s{
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        pd_queue_t queue;
        size_t thread_num;
        //size_t thread_stack_size;
}pd_pool_t;

/*初始化线程池*/
//pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_stack_size, size_t thread_max_num);
#endif

pd_poo.c

/**
 * 线程池
 * @author jimmy
 * @date 2016-5-14
 */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#include <pthread.h>

#include "pd_log.h"
#include "pd_log.c"
#include "pd_pool.h"

/*tsd*/
pthread_key_t key;

void *pd_worker_dispatch(void *argv){
        ushort exit_flag = 0;
        pd_task_t *a_task;
        pd_pool_t *a_pool = (pd_pool_t *)argv;
        if(pthread_setspecific(key, (void *)&exit_flag) != 0){
                return NULL;
        }
        /*动态从任务列表中获取任务执行*/
        while(!exit_flag){
                pthread_mutex_lock(&a_pool->mutex);
                /*如果此时任务链表为空,则需要等待条件变量为真*/
                while(a_pool->queue.head == NULL){
                        pthread_cond_wait(&a_pool->cond, &a_pool->mutex);
                }
                /*从任务链表中任务开支执行*/
                a_task = a_pool->queue.head;
                a_pool->queue.head = a_task->next;
                a_pool->queue.cur_task_num--;
                if(a_pool->queue.head == NULL){
                        a_pool->queue.tail = &a_pool->queue.head;
                }
                /*解锁*/
                pthread_mutex_unlock(&a_pool->mutex);
                /*执行任务*/
                a_task->routine(a_task->argv);
//core
                free(a_task);
                a_task = NULL;
        }
        pthread_exit(0);
}

/**
 * 根据线程数创建所有的线程
 */
static int pd_pool_create(pd_pool_t *a_pool){
        int i;
        pthread_t tid;
        for(i = 0; i < a_pool->thread_num; i++){
                pthread_create(&tid, NULL, pd_worker_dispatch, a_pool);
        }
        return 0;
}

/**
 * 线程退出函数
 */
void pd_pool_exit_cb(void *argv){
        unsigned int *lock = argv;
        ushort *exit_flag_ptr = pthread_getspecific(key);
        *exit_flag_ptr = 1;
        pthread_setspecific(key, (void *)exit_flag_ptr);
        *lock = 0;
}

/**
 * 线程池初始化
 */
pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_max_num){
        pd_pool_t *a_pool = NULL;
        a_pool = calloc(1, sizeof(pd_pool_t));
        if(!a_pool){
                error("pool_init calloc fail: %s", strerror(errno));
                return NULL;
        }
        a_pool->thread_num = thread_num;
        //初始化队列参数
        a_pool->queue.max_task_num = thread_max_num;
        a_pool->queue.cur_task_num = 0;
        a_pool->queue.head = NULL;
        a_pool->queue.tail = &a_pool->queue.head;
        //初始化tsd
        if(pthread_key_create(&key, NULL) != 0){
                error("pthread_key_create fail: %s", strerror(errno));
                goto err;
        }
        //初始化互斥锁
        if(pthread_mutex_init(&a_pool->mutex, NULL) != 0){
                error("pthread_mutex_init fail: %s", strerror(errno));
                pthread_key_delete(key);
                goto err;
        }
        //初始化条件变量
        if(pthread_cond_init(&a_pool->cond, NULL) != 0){
                error("pthread_cond_init fail: %s", strerror(errno));
                pthread_mutex_destroy(&a_pool->mutex);
                goto err;
        }
        //创建线程池
        if(pd_pool_create(a_pool) != 0){
                error("pd_pool_create fail: %s", strerror(errno));
                pthread_mutex_unlock(&a_pool->mutex);
                pthread_cond_destroy(&a_pool->cond);
                goto err;
        }
        return a_pool;
err:
        free(a_pool);
        return NULL;
}

/**
 * 向线程池中添加任务..
 */
int pd_pool_add_task(pd_pool_t *a_pool, void (*routine)(void *), void *argv){
        pd_task_t *a_task = NULL;
        a_task = (pd_task_t *)calloc(1, sizeof(pd_task_t));
        if(!a_task){
                error("add task calloc faile: %s", strerror(errno));
                return -1;
        }
        a_task->routine = routine;
        a_task->argv = argv;
        a_task->next = NULL;
        /*加锁*/
        pthread_mutex_lock(&a_pool->mutex);
        if(a_pool->queue.cur_task_num >= a_pool->queue.max_task_num){
                error("cur_task_num >= max_task_num");
                goto err;
        }
        /*将任务放到末尾*/
        *(a_pool->queue.tail) = a_task;
        a_pool->queue.tail = &a_task->next;
        a_pool->queue.cur_task_num++;
        /*通知堵塞的线程*/
        pthread_cond_signal(&a_pool->cond);
        /*解锁*/
        pthread_mutex_unlock(&a_pool->mutex);
        return 0;
err:
        pthread_mutex_unlock(&a_pool->mutex);
        free(a_task);
        return -1;
}

void pd_pool_destroy(pd_pool_t *a_pool){
        unsigned int n;
        unsigned int lock;

        for(n = 0; n < a_pool->thread_num; n++){
                lock = 1;
                if(pd_pool_add_task(a_pool, pd_pool_exit_cb, &lock) != 0){
                        error("pd_pool_destroy fail: add_task fail");
                        return;
                }
                while(lock){
                        usleep(1);
                }
        }
        pthread_mutex_destroy(&a_pool->mutex);
        pthread_cond_destroy(&a_pool->cond);
        pthread_key_delete(key);
        free(a_pool);
}
/******************************************************************************************/

void testfun(void *argv){
        printf("testfun\n");
        sleep(1);
}

int main(){
        pd_pool_t *a_pool = pd_pool_init(9, 5);

        pd_pool_add_task(a_pool, testfun, NULL);
        pd_pool_add_task(a_pool, testfun, NULL);
        pd_pool_add_task(a_pool, testfun, NULL);

        pd_pool_destroy(a_pool);
}
时间: 2024-08-06 12:41:25

C语言实现线程池的相关文章

【转】linux下C语言使用线程池(附带编码)

原文链接: linux下C语言使用线程池(附带编码) - china_sky - 博客频道 - CSDN.NEThttp://blog.csdn.net/feige2008/article/details/7827390 使用一个东西,我们要明白为什么使用它,如何使用它,使用它能达到什么效果 在写本文章时,我也借鉴了网上的部分资源,因为是之前很早搜索到的资料无法追踪源头,所以在此不再写来源,谨感谢各位大神. 1    使用线程池的原因 通常使用多线程都是在需要的时候创建一个新的线程,然后执行任务

阶段1 语言基础+高级_1-3-Java语言高级_05-异常与多线程_第5节 线程池_2_线程池的代码实现

JDK1.5之后提供的 Execituors生产线程池的工厂类 线程池的接口类:ExecutorService 1创建线程池的工厂类 创建类实现Runnable接口,重写里面的run方法 传递线程池任务,执行 线程池里一共就两个线程 销毁线程池 线程池销毁后 程序自动停止 线程池毁后再来执行线程任务就会报错. 原文地址:https://www.cnblogs.com/wangjunwei/p/11261573.html

JAVA线程池例子

用途及用法 网络请求通常有两种形式:第一种,请求不是很频繁,而且每次连接后会保持相当一段时间来读数据或者写数据,最后断开,如文件下载,网络流媒体等.另一种形式是请求频繁,但是连接上以后读/写很少量的数据就断开连接.考虑到服务的并发问题,如果每个请求来到以后服务都为它启动一个线程,那么这对服务的资源可能会造成很大的浪费,特别是第二种情况.因为通常情况下,创建线程是需要一定的耗时的,设这个时间为T1,而连接后读/写服务的时间为T2,当T1>>T2时,我们就应当考虑一种策略或者机制来控制,使得服务对

.NET线程池

摘要 深度探索 Microsoft .NET提供的线程池, 揭示什么情况下你需要用线程池以及 .NET框架下的线程池是如何实现的,并告诉你如何去使用线程池. 内容 介绍 .NET中的线程池 线程池中执行的函数 使用定时器 同步对象的执行 异步I/O操作 监视线程池 死锁 有关安全性 结束 介绍 如 果你有在任何编程语言下的多线程编程经验的话,你肯定已经非常熟悉一些典型的范例.通常,多线程编程与基于用户界面的应用联系在一起,它们需要在不影响终 端用户的情况下,执行一些耗时的操作.取出任何一本参考书

Java并发编程——Executor接口及线程池的使用

在如今的程序里,单线程的程序,应该已经比较少了,而Java语言是内置支持多线程并发的,大家都说Java语言内置支持多线程,非常非常的强大和方便,但一直没有深入研究jdk内concurrent包.今天就认真学习了一下java.util.concurrent包,发现jdk多线程编程果然是强大和方便.本文是学习java.util.concurrent包内线程池及相关接口的一些总结. 任务接口抽象 Runnable接口 在java.lang包内,为多线程提供了Runnable接口. public int

java 多线程和线程池

● 多线程 多线程的概念很好理解就是多条线程同时存在,但要用好多线程确不容易,涉及到多线程间通信,多线程共用一个资源等诸多问题. 使用多线程的优缺点: 优点: 1)适当的提高程序的执行效率(多个线程同时执行). 2)适当的提高了资源利用率(CPU.内存等). 缺点: 1)占用一定的内存空间. 2)线程越多CPU的调度开销越大. 3)程序的复杂度会上升. 对于多线程的示例代码感兴趣的可以自己写Demo啦,去运行体会,下面我主要列出一些多线程的技术点. synchronized 同步块大家都比较熟悉

linux下的线程池

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了.如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了. 下面是Linux系统下用C语言创建的一个线程池.线程池会维护一个任务链表(每个CThread_worker结构就是一个任务).   pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函

基于C++11的线程池,简洁且可以带任意多的参数

咳咳.C++11 加入了线程库,从此告别了标准库不支持并发的历史.然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池.信号量等.线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:"管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复." 貌似没有问题吧.但是写起程序来的时候就出问题了. 废话不多说,先上实现,然后再啰嗦.(dont talk, show me ur code !) 代码实现 1

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把