线程池简单代码

condition.h

#ifndef _CONDITION_H_
#define _CONDITION_H_

#include <pthread.h>

//封装一个互斥量和条件变量作为状态
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
}condition_t;

//对状态的操作函数
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t* cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);

#endif

condition.c

#include "condition.h"

//初始化
int condition_init(condition_t *cond)
{
    int status;
    if((status = pthread_mutex_init(&cond->pmutex, NULL)))
        return status;

    if((status = pthread_cond_init(&cond->pcond, NULL)))
        return status;

    return 0;
}

//加锁
int condition_lock(condition_t *cond)
{
    return pthread_mutex_lock(&cond->pmutex);
}

//解锁
int condition_unlock(condition_t *cond)
{
    return pthread_mutex_unlock(&cond->pmutex);
}

//等待
int condition_wait(condition_t *cond)
{
    return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

//固定时间等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
    return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

//唤醒一个睡眠线程
int condition_signal(condition_t* cond)
{
    return pthread_cond_signal(&cond->pcond);
}

//唤醒所有睡眠线程
int condition_broadcast(condition_t *cond)
{
    return pthread_cond_broadcast(&cond->pcond);
}

//释放
int condition_destroy(condition_t *cond)
{
    int status;
    if((status = pthread_mutex_destroy(&cond->pmutex)))
        return status;

    if((status = pthread_cond_destroy(&cond->pcond)))
        return status;

    return 0;
}

然后是线程池对应的threadpool.h和threadpool.c
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

//线程池头文件

#include "condition.h"

//封装线程池中的对象需要执行的任务对象
typedef struct task
{
    void *(*run)(void *args);  //函数指针,需要执行的任务
    void *arg;              //参数
    struct task *next;      //任务队列中下一个任务
}task_t;

//下面是线程池结构体
typedef struct threadpool
{
    condition_t ready;    //状态量
    task_t *first;       //任务队列中第一个任务
    task_t *last;        //任务队列中最后一个任务
    int counter;         //线程池中已有线程数
    int idle;            //线程池中kongxi线程数  idel = 0 则该线程在忙碌;>0 则表示该线程在等待唤醒
    int max_threads;     //线程池最大线程数
    int quit;            //是否退出标志
}threadpool_t;

//线程池初始化
void threadpool_init(threadpool_t *pool, int threads);

//往线程池中加入任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);

//摧毁线程池
void threadpool_destroy(threadpool_t *pool);

#endif---------------------------------------------------------------------------
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>

//创建的线程执行
void *thread_routine(void *arg)
{
    struct timespec abstime;
    int timeout;
    printf("thread %d is starting\n", (int)pthread_self());
    threadpool_t *pool = (threadpool_t *)arg;
    while(1)
    {
        timeout = 0;
        //访问线程池之前需要加锁
        condition_lock(&pool->ready);
        //空闲
        pool->idle++;
        //等待队列有任务到来 或者 收到线程池销毁通知
        while(pool->first == NULL && !pool->quit)
        {
            //否则线程阻塞等待
            printf("thread %d is waiting\n", (int)pthread_self());
            //获取从当前时间,并加上等待时间, 设置进程的超时睡眠时间
            clock_gettime(CLOCK_REALTIME, &abstime);
            abstime.tv_sec += 2;
            int status;
            status = condition_timedwait(&pool->ready, &abstime);  //该函数会解锁,允许其他线程访问,当被唤醒时,加锁
            if(status == ETIMEDOUT)
            {
                printf("thread %d wait timed out\n", (int)pthread_self());
                timeout = 1;
                break;
            }
        }

        pool->idle--;
        if(pool->first != NULL)
        {
            //取出等待队列最前的任务,移除任务,并执行任务
            task_t *t = pool->first;
            pool->first = t->next;
            //由于任务执行需要消耗时间,先解锁让其他线程访问线程池
            condition_unlock(&pool->ready);
            //执行任务
            t->run(t->arg);
            //执行完任务释放内存
            free(t);
            //重新加锁
            condition_lock(&pool->ready);
        }

        //退出线程池
        if(pool->quit && pool->first == NULL)
        {
            pool->counter--;//当前工作的线程数-1
            //若线程池中没有线程,通知等待线程(主线程)全部任务已经完成
            if(pool->counter == 0)
            {
                condition_signal(&pool->ready);
            }
            condition_unlock(&pool->ready);
            break;
        }
        //超时,跳出销毁线程
        if(timeout == 1)
        {
            pool->counter--;//当前工作的线程数-1
            condition_unlock(&pool->ready);
            break;
        }

        condition_unlock(&pool->ready);
    }

    printf("thread %d is exiting\n", (int)pthread_self());
    return NULL;

}

//线程池初始化
void threadpool_init(threadpool_t *pool, int threads)
{

    condition_init(&pool->ready);
    pool->first = NULL;
    pool->last =NULL;
    pool->counter =0;
    pool->idle =0;
    pool->max_threads = threads;
    pool->quit =0;

}

//增加一个任务到线程池
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
    //产生一个新的任务
    task_t *newtask = (task_t *)malloc(sizeof(task_t));
    newtask->run = run;
    newtask->arg = arg;
    newtask->next=NULL;//新加的任务放在队列尾端

    //线程池的状态被多个线程共享,操作前需要加锁
    condition_lock(&pool->ready);

    if(pool->first == NULL)//第一个任务加入
    {
        pool->first = newtask;
    }
    else
    {
        pool->last->next = newtask;
    }
    pool->last = newtask;  //队列尾指向新加入的线程

    //线程池中有线程空闲,唤醒
    if(pool->idle > 0)
    {
        condition_signal(&pool->ready);
    }
    //当前线程池中线程个数没有达到设定的最大值,创建一个新的线性
    else if(pool->counter < pool->max_threads)
    {
        pthread_t tid;
        pthread_create(&tid, NULL, thread_routine, pool);
        pool->counter++;
    }
    //结束,访问
    condition_unlock(&pool->ready);
}

//线程池销毁
void threadpool_destroy(threadpool_t *pool)
{
    //如果已经调用销毁,直接返回
    if(pool->quit)
    {
    return;
    }
    //加锁
    condition_lock(&pool->ready);
    //设置销毁标记为1
    pool->quit = 1;
    //线程池中线程个数大于0
    if(pool->counter > 0)
    {
        //对于等待的线程,发送信号唤醒
        if(pool->idle > 0)
        {
            condition_broadcast(&pool->ready);
        }
        //正在执行任务的线程,等待他们结束任务
        while(pool->counter)
        {
            condition_wait(&pool->ready);
        }
    }
    condition_unlock(&pool->ready);
    condition_destroy(&pool->ready);
}

测试代码:
#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>

void* mytask(void *arg)
{
    printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);
    sleep(1);
    free(arg);
    return NULL;
}

//测试代码
int main(void)
{
    threadpool_t pool;
    //初始化线程池,最多三个线程
    threadpool_init(&pool, 3);
    int i;
    //创建十个任务
    for(i=0; i < 10; i++)
    {
        int *arg = malloc(sizeof(int));
        *arg = i;
        threadpool_add_task(&pool, mytask, arg);

    }
    threadpool_destroy(&pool);
    return 0;
}
 

原文地址:https://www.cnblogs.com/huyupei/p/10043292.html

时间: 2024-10-10 02:06:06

线程池简单代码的相关文章

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

java threadPool 线程池简单分析

java 1.5 concurrent 工具包中提供了五类线程池的创建: ExecutorService executor=Executors.newCachedThreadPool(); ExecutorService cacheExecutor=Executors.newCachedThreadPool(new TestThreadFactory()); ExecutorService fixExecutor=Executors.newFixedThreadPool(10); Executo

线程池简单实现

实现了一个简化版的线程池. 实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收. 线程池代码: 1 package learnConcurrent; 2 3 import java.util.ArrayList; 4 import java.util.Collection; 5 import java.util.LinkedList; 6 import java.util.List; 7 import java.util.concurr

java线程及操作实例,线程池简单例子

java io.集合.线程.字符串.gc.jvm可谓是java中的最基本的知识,尤其是线程操作复杂,相应难懂,要想java基础知识扎实,上面提到的几个方面的知识点都要精通,这样方可以称自己掌握java方面基础知识. 总结一下java线程知识,平时接触过线程,尤其是在android开发中,线程可谓是无处不在,稍有不注意就会报错.在java中线程也是无处不在,main就是一个线程,只不过被包装好了,一般接触不到. 我的无数次的复习经历告诉我,学习知识最快,最深刻的方法就是从解题开始,不要先看概念,遇

用阻塞队列和线程池简单实现生产者和消费者场景

本例子仅仅是博主学习阻塞队列和后的一些小实践,并不是真正的应用场景! 生产者消费者场景是我们应用中最常见的场景,我们可以通过ReentrantLock的Condition和对线程进行wait,notify同通信来实现生产者和消费者场景,前者可以实现多生产者和多消费者模式,后者仅可以实现一生产者,一消费者模式. 今天我们就利用阻塞队列来实现下生产者和消费者模式(里面还利用了线程池). 看过我关于阻塞队列博文的朋友已经知道,阻塞队列其实就是由ReentrantLock实现的! 场景就不描述了,为简单

java 线程池简单例子

package com.hra.riskprice; import com.hra.riskprice.SysEnum.Factor_Type; import com.hra.riskprice.pojo.RskFactor; import com.hra.riskprice.service.impl.RskFactorBulkMapper; import org.springframework.boot.SpringApplication; import org.springframework

线程池案例代码:

import java.util.Collections; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Synchrono

转载一份C++线程池的代码,非常实用

#ifndef _ThreadPool_H_ #define _ThreadPool_H_ #pragma warning(disable: 4530) #pragma warning(disable: 4786) #include <cassert> #include <vector> #include <queue> #include <windows.h> using namespace std; class ThreadJob  //工作基类 { p

JAVA 线程与线程池简单小结

JAVA线程创建方式: 1.继承Thread类创建线程类 继承Thread类并重写该类的run方法,该un方法代表了线程要完成的任务. 2.通过Runnable接口创建线程类 实现runnable接口,重写该接口的run()方法,该run()方法的方法体同样是该线程的线程执行体.将Runnable实现类实例作为Thread的target来创建Thread对象,该Thread对象才是真正的线程对象. 3.通过Callable和Future创建线程 (1)实现Callable接口,重写call()方