简单线程池实现 (C版本)

1. 概述

在谈到线程池之前,我们看看并发还有哪几种方式.

多进程,包括一次启动多个进程,然后在进程间传递描述符,或者连接来了以后fork子进程处理.

多线程,同理也可以像多进程一样,要么建立一堆放在那里,要么就有连接以后"即时建立,即时销毁"

我们看上面的方法并没有什么不妥,特殊的场景总是能够派上用场的.

比如一个并发量只有两位数甚至个位数,那么上面的这些方式应付起来也很轻松.但是如果要举一个极端的例子,十万的并发,首先不可能建立十万个进程等着这个并发,也不可能即时创建即时销毁.这些都无形中加重了系统的负担.

考虑一个问题,可能10W个并发,每个连接请求的服务仅仅1秒甚至更短就结束了.以上的方式显然面对这个场景而言,有点粗放了.这个时候,我们需要一个可以应付高并发,并且没有那么吃资源的并发方式.那么线程池也就脱颖而出了.

2. 设计

线程池,顾名思义,就是一个池子,里面放一堆线程.没事的时候待着,个个心怀鬼胎,摩拳擦掌,有事了谁抢到算谁的...

那么线程池该怎么实现呢.

1. 入口

正如上面说的,线程池就是一堆线程在一个池子里面,那么问题来了,该怎么保证这些线程没事的时候安静的待着.简单的mutex是做不到这一点的,需要结合cond才能达到这个效果.

        pthread_mutex_lock(pthread_pool->pmutex);
        // while避免虚假唤醒
        while (pthread_pool->pthread_job->job_num == 0 && 0 == pthread_pool->need_destroy)
        {
            pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex);
        }

这样就可以保证线程没事的时候不会折腾了.这里解释一下while的作用,就如注释所说,避免虚假唤醒,就是可能没有pthread_cond_signal或者pthread_cond_broadcast别调用,但是线程还是从pthread_cond_wait返回了,如果没有while这个判断,线程将会一直错下去,这显然不是我们想看到的.

2. 任务分配

毫无疑问,我们得借助回调来实现任务的分配.这个回调函数完全可以模拟线程自身的回调.

// argv指定的地址由线程池释放
typedef struct tag_job
{
    thread_cb cb;
    void *argv;
    struct tag_job *next;
}job_t, *job_pt;

typedef struct
{
    int32_t job_num;
    job_pt head;
    job_pt tail;
}thread_job_t, *thread_job_pt;

对于线程池而言,其任务必然得是一个队列,也可以理解为任务池,任何一个被唤醒的线程就去这个池子里面取一个任务.然后执行.执行完就继续睡大觉.

3. 销毁

先说下我遇到的问题,我今天使用了几种销毁的方式,事实证明都是错的.最后借鉴网络上的一些例子,不过我现在找不到那个例子了...惭愧.

错误a. 直接调用 pthread_cancel,如果一个线程阻塞在cond那里,你无法用这种方式销毁它,如果恰巧它正在工作,然后你成功把它销毁了,还不如没有销毁,因为死锁了.

错误b. 使用pthread_kill发送SIGKILL.然后整个程序就跟着一块挂掉了.最后发现,信号是进程内共享的.如果不想被异常退出,要在主线程中捕获这个信号.然后在线程池也要有相应的信号处理函数.当然了,这个信号绝对不是SIGSTOP和SIGKILL.

错误c. pthread_cond_broadcast后直接释放mutex和cond.要知道,线程池可能并没有主线程退出的快,也就是说,主线程这边已经把cond和mutex释放了,线程池里面还有没成功退出的线程,此时它既可能调用unlock,也可能调用lock,无论那个都会导致程序异常崩溃.

好吧,说下正确的做法,当broad以后,要等待确认所有线程退出以后,再释放cond和mutex.这样才能使程序正常.

在我的线程池里面,我加了一个master,也就是说,如果创建一个10个线程的池子,我的线程池会创建11个线程,第11个线程就是用来保活的,如果线程池中的某个线程以为某个逻辑异常return了.就需要这个线程来重新启动它...

3. 实现

thread.h

/*!****************************************************************************
 * Coypright(C) 2014-2024 () technology Co., Ltd
 *
 * 文件名 : thread.h
 * 版本号 : 1.0
 * 描  述 : 线程池实现
 * 作  者 : cp3alai
 * 日  期 : 2016.05.31
 *****************************************************************************/

#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>

typedef void (*thread_cb)(void *argv);

// argv指定的地址由线程池释放
typedef struct tag_job
{
    thread_cb cb;
    void *argv;
    struct tag_job *next;
}job_t, *job_pt;

typedef struct
{
    int32_t job_num;
    job_pt head;
    job_pt tail;
}thread_job_t, *thread_job_pt;

int thread_job_init(thread_job_pt *pthread_job);
int thread_job_push(thread_job_pt thread_job, thread_cb cb, void *arg);
job_pt thread_job_pop(thread_job_pt thread_job);
void thread_job_destroy(thread_job_pt thread_job);

typedef struct
{
    int32_t need_destroy;
    int32_t thread_num;
    pthread_t pthread_master;
    pthread_t *pthread_id;

    thread_job_pt pthread_job;

    pthread_cond_t *pcond;
    pthread_mutex_t *pmutex;
}thread_pool_t, *thread_pool_pt;

int pthread_create_detach(pthread_t *thread, void *(*start_routine)(void*), void *arg);
void *thread_entry(void *arg);
void *thread_master(void *arg);
int thread_pool_init(thread_pool_t **pthread_pool, int num);
int thread_pool_add(thread_pool_pt pthread_pool, thread_cb, void *arg);
int thread_pool_kill();
int thread_pool_keepalive();
void thread_pool_destroy(thread_pool_pt pthread_pool);

thread.c

/*!****************************************************************************
 * Coypright(C) 2014-2024 () technology Co., Ltd
 *
 * 文件名 : /media/alai/work/workspace/CC++/mrtpoll/thread.c
 * 版本号 : 1.0
 * 描  述 :
 * 作  者 : cp3alai
 * 日  期 : 2016.05.31
 *****************************************************************************/

#include "thread.h"

int thread_job_init(thread_job_pt *pthread_job)
{
    int32_t ret;
    if (NULL == pthread_job)
    {
        ret = -1;
        goto ERROR;
    }

    *pthread_job = (thread_job_pt)malloc(sizeof(thread_job_t));
    if (NULL == *pthread_job)
    {
        ret = -1;
        goto ERROR;
    }
    bzero(*pthread_job, sizeof(thread_job_t));

    (*pthread_job)->head = NULL;
    (*pthread_job)->tail= NULL;

    ret = 0;
ERROR:

    return ret;
}

int thread_job_push(thread_job_pt thread_job, thread_cb cb, void *arg)
{
    int32_t ret = 0;
    job_pt pjob = NULL;
    if (NULL == thread_job || NULL == cb)
    {
        ret = -1;
        goto ERROR;
    }

    pjob = (job_pt)malloc(sizeof(job_t));
    if (NULL == pjob)
    {
        ret = -1;
        goto ERROR;
    }

    pjob->argv = arg;
    pjob->cb = cb;
    pjob->next = NULL;

    if (NULL != thread_job->head)
    {
        thread_job->tail->next = pjob;
        thread_job->tail = pjob;
    }
    else
    {
        thread_job->head = pjob;
        thread_job->tail = pjob;
    }
    thread_job->job_num++;

ERROR:

    return ret;
}

// 这里返回的job需要使用它的线程释放
job_pt thread_job_pop(thread_job_pt thread_job)
{
    job_pt job = NULL;
    if (NULL == thread_job)
    {
        return NULL;
    }

    if (NULL == thread_job->head)
    {
        return NULL;
    }

    job = thread_job->head;
    thread_job->head = thread_job->head->next;
    thread_job->job_num--;

    return job;
}

void thread_job_destroy(thread_job_pt thread_job)
{
    if (NULL == thread_job)
    {
        return;
    }

    while (NULL != thread_job->head)
    {
        job_pt pjob = thread_job->head;
        thread_job->head = pjob->next;
        free(pjob);
        pjob = NULL;
    }

    free(thread_job);
    thread_job = NULL;

    return;
}

int pthread_create_detach(pthread_t *thread, void *(*start_routine)(void*), void *arg)
{
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, 1);

    return pthread_create(thread, &attr, start_routine, arg);
}

void *thread_master(void *arg)
{
    if (NULL == arg)
    {
        return NULL;
    }

    thread_pool_pt pthread_pool = (thread_pool_pt)arg;

    int i;
    while (1)
    {
        for (i = 0; i < pthread_pool->thread_num; i++)
        {
            if (0 == pthread_pool->pthread_id[i])
            {
                pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void*)pthread_pool);
            }

            if ((pthread_kill(pthread_pool->pthread_id[i], 0)) < 0)
            {
                if (errno == ESRCH)
                {
                    pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void*)pthread_pool);
                }
            }

            sleep(1);
        }
    }

    return NULL;
}

void *thread_entry(void *arg)
{
    if (NULL == arg)
    {
        return NULL;
    }

    thread_pool_pt pthread_pool = (thread_pool_pt)arg;

    while (1)
    {
        pthread_mutex_lock(pthread_pool->pmutex);
        // 貌似这样写也可以避免虚假唤醒
        // while (pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex))
        while (pthread_pool->pthread_job->job_num == 0 && 0 == pthread_pool->need_destroy)
        {
            pthread_cond_wait(pthread_pool->pcond, pthread_pool->pmutex);
        }

        // 如果销毁线程,则这里直接退出
        if (0 != pthread_pool->need_destroy)
        {
            pthread_pool->thread_num--;
            pthread_mutex_unlock(pthread_pool->pmutex);
            break;
        }

        job_pt pjob = thread_job_pop(pthread_pool->pthread_job);
        if (NULL == pjob)
        {
            continue;
        }
        pthread_mutex_unlock(pthread_pool->pmutex);

        pjob->cb(pjob->argv);

        // 用完了要释放...
        free(pjob);
        pjob = NULL;
    }

    return NULL;
}

int thread_pool_init(thread_pool_t **pthread_pool, int num)
{
    int ret = 0;
    int i;

    if (NULL == pthread_pool || 0 == num)
    {
        ret = -1;
        goto ERROR;
    }

    *pthread_pool = (thread_pool_pt)malloc(sizeof(thread_pool_t));
    if (NULL == *pthread_pool)
    {
        ret = -1;
        goto ERROR;
    }
    bzero((*pthread_pool), sizeof(thread_pool_t));

    (*pthread_pool)->pmutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
    if (NULL == (*pthread_pool)->pmutex)
    {
        ret = -1;
        goto ERROR;
    }
    (*pthread_pool)->pcond = (pthread_cond_t *)malloc(sizeof(pthread_cond_t));
    if (NULL == (*pthread_pool)->pcond)
    {
        ret = -1;
        goto ERROR;
    }
    pthread_mutex_init((*pthread_pool)->pmutex, NULL);
    pthread_cond_init((*pthread_pool)->pcond, NULL);
    if ((ret = thread_job_init(&(*pthread_pool)->pthread_job)) < 0)
    {
        ret = -1;
        goto ERROR;
    }

    (*pthread_pool)->pthread_id = (pthread_t*)malloc(num * sizeof(pthread_t));
    if (NULL == (*pthread_pool)->pthread_id)
    {
        ret = -1;
        goto ERROR;
    }
    bzero((*pthread_pool)->pthread_id, sizeof(pthread_t) * num);

    (*pthread_pool)->thread_num = num;
    for (i = 0; i < num; i++)
    {
        ret = pthread_create_detach(&(*pthread_pool)->pthread_id[i], thread_entry, (void *)(*pthread_pool));
        if (ret < 0)
        {
            (*pthread_pool)->pthread_id[i] = 0;
        }
    }

    ret = pthread_create_detach(&(*pthread_pool)->pthread_master, thread_master, (void *)(*pthread_pool));
    if (ret < 0)
    {
        ret = -1;
        goto ERROR;
    }

    ret = 0;
ERROR:
    if (ret < 0)
    {
        if (NULL != (*pthread_pool))
        {
            thread_pool_destroy((*pthread_pool));
        }
    }

    return ret;
}

int thread_pool_add(thread_pool_pt pthread_pool, thread_cb cb, void *arg)
{
    int32_t ret = 0;
    if (NULL == pthread_pool || NULL == cb)
    {
        ret = -1;
        goto ERROR;
    }

    if (NULL == pthread_pool->pthread_job)
    {
        ret = -1;
        goto ERROR;
    }

    pthread_mutex_lock(pthread_pool->pmutex);
    ret = thread_job_push(pthread_pool->pthread_job, cb, arg);
    if (ret < 0)
    {
        ret = -1;
        goto ERROR;
    }

    pthread_cond_signal(pthread_pool->pcond);
    pthread_mutex_unlock(pthread_pool->pmutex);

    ret = 0;
ERROR:

    return ret;
}

int thread_pool_kill(thread_pool_pt pthread_pool, int32_t thread_no, int32_t signo)
{
    int32_t ret;
    int32_t i;
    if (NULL == pthread_pool)
    {
        ret = -1;
        goto ERROR;
    }

    if (0 == thread_no)
    {
        for (i = 0; i < pthread_pool->thread_num; i++)
        {
            pthread_kill(pthread_pool->pthread_id[i], signo);
        }
    }
    else
    {
        ret = pthread_kill(pthread_pool->pthread_id[thread_no], signo);
        if (ret < 0)
        {
            ret = -1;
            goto ERROR;
        }
    }

    ret = 0;
ERROR:
    return ret;
}

int thread_pool_keepalive(thread_pool_pt pthread_pool)
{
    int32_t ret;
    int32_t i;
    if (NULL == pthread_pool)
    {
        return -1;
    }

    for (i = 0; i < pthread_pool->thread_num; i++)
    {
        ret = pthread_kill(pthread_pool->pthread_id[i], 0);
        if (ret < 0)
        {
            if (errno == ESRCH)
            {
                pthread_create_detach(&pthread_pool->pthread_id[i], thread_entry, (void *)pthread_pool);
            }
        }
    }

    return 0;
}

void thread_pool_destroy(thread_pool_pt pthread_pool)
{
    int32_t i;
    if (NULL == pthread_pool)
    {
        return;
    }

    if (0 != pthread_pool->pthread_master)
    {
        pthread_cancel(pthread_pool->pthread_master);
    }

    if (NULL != pthread_pool->pthread_id)
    {
        pthread_pool->need_destroy = 1;
        pthread_cond_broadcast(pthread_pool->pcond);

        free(pthread_pool->pthread_id);
        pthread_pool->pthread_id = NULL;
    }

    if (NULL != pthread_pool->pthread_job)
    {
        thread_job_destroy(pthread_pool->pthread_job);
    }

    // 等待线程自销毁完毕
    while (pthread_pool->thread_num > 0)
    {
        usleep(0);
    }

    if (NULL != pthread_pool->pmutex)
    {
        pthread_mutex_destroy(pthread_pool->pmutex);
        free(pthread_pool->pmutex);
        pthread_pool->pmutex = NULL;
    }

    if (NULL != pthread_pool->pcond)
    {
        pthread_cond_destroy(pthread_pool->pcond);
        free(pthread_pool->pcond);
        pthread_pool->pcond = NULL;
    }
}

main.c

#include <stdio.h>
#include "thread.h"

void thread_routine1(void *argv)
{
    fprintf(stderr, "this is thread 1\n");
    return;
}

void thread_routine2(void *argv)
{
    fprintf(stderr, "this is thread 2\n");
    return;
}

int main(int argc, char **argv)
{
    int32_t ret;
    thread_pool_pt pthread_pool;

    ret = thread_pool_init(&pthread_pool, 30);
    if (ret < 0)
    {
        return -1;
    }

    thread_pool_add(pthread_pool, thread_routine1, NULL);
    thread_pool_add(pthread_pool, thread_routine2, NULL);
    thread_pool_add(pthread_pool, thread_routine2, NULL);
    thread_pool_add(pthread_pool, thread_routine2, NULL);
    thread_pool_add(pthread_pool, thread_routine2, NULL);
    thread_pool_add(pthread_pool, thread_routine2, NULL);
    thread_pool_add(pthread_pool, thread_routine1, NULL);
    thread_pool_add(pthread_pool, thread_routine1, NULL);
    thread_pool_add(pthread_pool, thread_routine1, NULL);
    thread_pool_add(pthread_pool, thread_routine1, NULL);

    sleep(5);
    thread_pool_destroy(pthread_pool);

    sleep(5);

    return 0;
}

简单的Makefile

###################################################
###			Make with me
###################################################

CC := gcc

TARGET := pool_test

OBJ := main.o 	thread.o

INCS := -I.

LIBS := -lpthread

CFLAGS := -ggdb3 -O0

$(TARGET) : $(OBJ)
	$(CC) -o $(TARGET) $(OBJ) $(CFLAGS) $(INCS) $(LIBS)

.PHONY: clean
clean:
	@echo "cleaned"
	@rm -rf $(TARGET) $(OBJ)

执行结果如图 :

4. 后记

写到最后,其实这篇博客还有另外一个目的,就是希望各位看到的同学们,如果代码中有问题,请在评论中帮忙指出来.真的是不胜感激了...

时间: 2024-10-10 16:10:35

简单线程池实现 (C版本)的相关文章

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

C++版简单线程池

需求 之前写过一个C#版本的简单线程池http://blog.csdn.net/ylbs110/article/details/51224979 由于刚刚学习了C++11新特性中的future,于是想到用它来实现一个线程池. 实现 思路基本和C#版本的一样,主要区别是委托的实现,线程句柄的不同和线程锁: 本来C++有function模板,但是实现起来比较麻烦,这里主要是实现线程池,所以动态参数的委托就不实现了,直接使用typedef void(*Func)();来实现一个无参数无返回值的函数指针

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

LINUX下的简单线程池

前言 任何一种设计方式的引入都会带来额外的开支,是否使用,取决于能带来多大的好处和能带来多大的坏处,好处与坏处包括程序的性能.代码的可读性.代码的可维护性.程序的开发效率等. 线程池适用场合:任务比较多,需要拉起大量线程来处理:任务的处理时间相对比较短,按照线程的周期T1(创建阶段).T2(执行阶段).T3(销毁阶段)来算,执行阶段仅占用较少时间. 简单的线程池通常有以下功能:预创建一定数量的线程:管理线程任务,当工作线程没有事情可做时休眠自己:销毁线程池. 复杂一些的线程池有额外的调节功能:管

自实现简单线程池

线程池在现在的系统和框架中十分常见.明白线程池的思想原理,不仅对学习线程只是有很大的帮助.对理解一些系统的线程池实现也有很大的帮助.下面是我自己简单实现的一个线程池.用以对线程的简单理解. 线程的实现原理很简单: 线程池对象包含以下组件:工作者队列,Job队列: 用户通过线程池对象添加删除工作者,线程池对象维持工作者对象这个池和工作者的实际工作: 工作者池中的线程在用户没用明确关闭前不断的从Job队列拿取job执行job. 好了,一切看代码: 1.以接口编程,首先创建ThreadPool接口:

简单线程池原理和代码

线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务 本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理 代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了. 线程池代码见Github[点击] 由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明) std::

简单线程池的实现

1. 什么是线程池 线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务.如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可. 线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销.一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为 1.     计算密集型任务: 2.     I/O密集型任务. 计算密集型任务

自己实现一个简单线程池

先上原理图: 上代码之前,要先补充一下线程池构造的核心几个点 线程池里的核心线程数与最大线程数 线程池里真正工作的线程worker 线程池里用来存取任务的队列BlockingQueue 线程中的任务task 本例实现简化了一些,只实现了BlockingQueue存放任务,然后每个worker取任务并执行,下面看代码首先定义一个线程池ThreadExcutor class ThreadExcutor{ //创建 private volatile boolean RUNNING = true; //

Delphi ThreadPool 线程池(Delphi2009以上版本适用)

http://blog.sina.com.cn/s/blog_6250a9df0101kref.html 在网上查找Delphi线程池,结果发现寥寥无几. 看了半天源代码,弄得一头雾水,觉得不容易理解和使用,于是自己想写一个线程池. 什么样的线程池更好呢? 我觉得使用起来要可靠,并且一定要简单,这样才是更好的. 我写的线程池就是这样一个标准,使用非常简单,只传入自己要执行的方法就可以了, 其实大家最后就是关注自己要操作的方法,其余的交给线程池.全部源代码如下: { {单元:ThreadPoolU