一个线程池的简单的实现

线程池实现:

用于执行大量相对短暂的任务

当任务增加的时候能够动态的增加线程池中线程的数量直到达到一个阈值。

当任务执行完毕的时候,能够动态的销毁线程池中的线程

该线程池的实现本质上也是生产者与消费模型的应用。生产者线程向任务队列中添加任务,一旦队列有任务到来,如果有等待线程就唤醒来执行任务,如果没有等待线程并且线程数没有达到阈值,就创建新线程来执行任务。

contion.h

#ifndef _CONDITION_H_

#define _CONDITION_H_

#include <pthread.h>

typedef struct condition

{

pthread_mutex_t pmutex;

pthread_cond_t pcond;

} condition_t;

int condition_init(condition_t *cond);

int condition_lock(condition_t *cond);

int condition_unlock(condition_t *cond);

int condition_wait(condition_t *cond);

int condition_timedwait(condition_t *cond, const struct timespec *abstime);

int condition_signal(condition_t *cond);

int condition_broadcast(condition_t *cond);

int condition_destroy(condition_t *cond);

#endif /* _CONDITION_H_ */

condition.c

#include "condition.h"

int condition_init(condition_t *cond)

{

int status;

if ((status = pthread_mutex_init(&cond->pmutex, NULL)))

return status;

if ((status = pthread_cond_init(&cond->pcond, NULL)))

return status;

return 0;

}

int condition_lock(condition_t *cond)

{

return pthread_mutex_lock(&cond->pmutex);

}

int condition_unlock(condition_t *cond)

{

return pthread_mutex_unlock(&cond->pmutex);

}

int condition_wait(condition_t *cond)

{

return pthread_cond_wait(&cond->pcond, &cond->pmutex);

}

int condition_timedwait(condition_t *cond, const struct timespec *abstime)

{

return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);

}

int condition_signal(condition_t *cond)

{

return pthread_cond_signal(&cond->pcond);

}

int condition_broadcast(condition_t* cond)

{

return pthread_cond_broadcast(&cond->pcond);

}

int condition_destroy(condition_t* cond)

{

int status;

if ((status = pthread_mutex_destroy(&cond->pmutex)))

return status;

if ((status = pthread_cond_destroy(&cond->pcond)))

return status;

return 0;

}

threadpool.h

#ifndef _THREAD_POOL_H_

#define _THREAD_POOL_H_

#include "condition.h"

// 任务结构体,将任务放入队列由线程池中的线程来执行

typedef struct task

{

void *(*run)(void *arg);
// 任务回调函数

void *arg;
// 回调函数参数

struct task *next;

} task_t;

// 线程池结构体

typedef struct threadpool

{

condition_t ready;
//任务准备就绪或者线程池销毁通知

task_t *first;
//任务队列头指针

task_t *last;
//任务队列尾指针

int counter;
//线程池中当前线程数

int idle;
//线程池中当前正在等待任务的线程数

int max_threads;
//线程池中最大允许的线程数

int quit;
//销毁线程池的时候置1

} threadpool_t;

// 初始化线程池

void threadpool_init(threadpool_t *pool, int threads);

// 往线程池中添加任务

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);

// 销毁线程池

void threadpool_destroy(threadpool_t *pool);

#endif /* _THREAD_POOL_H_ */

threadpool.c

#include "threadpool.h"

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <errno.h>

#include <time.h>

void *thread_routine(void *arg)

{

struct timespec abstime;

int timeout;

printf("thread 0x%x is starting\n", (int)pthread_self());

threadpool_t *pool = (threadpool_t *)arg;

while (1)

{

timeout = 0;

condition_lock(&pool->ready);

pool->idle++;

// 等待队列有任务到来或者线程池销毁通知

while (pool->first == NULL && !pool->quit)

{

printf("thread 0x%x is waiting\n", (int)pthread_self());

//condition_wait(&pool->ready);

clock_gettime(CLOCK_REALTIME, &abstime);

abstime.tv_sec += 2;

int status = condition_timedwait(&pool->ready, &abstime);

if (status == ETIMEDOUT)

{

printf("thread 0x%x is wait timed out\n", (int)pthread_self());

timeout = 1;

break;

}

}

// 等待到条件,处于工作状态

pool->idle--;

// 等待到任务

if (pool->first != NULL)

{

// 从队头取出任务

task_t *t = pool->first;

pool->first = t->next;

// 执行任务需要一定的时间,所以要先解锁,以便生产者进程

// 能够往队列中添加任务,其它消费者线程能够进入等待任务

condition_unlock(&pool->ready);

t->run(t->arg);

free(t);

condition_lock(&pool->ready);

}

// 如果等待到线程池销毁通知, 且任务都执行完毕

if (pool->quit && pool->first == NULL)

{

pool->counter--;

if (pool->counter == 0)

condition_signal(&pool->ready);

condition_unlock(&pool->ready);

// 跳出循环之前要记得解锁

break;

}

if (timeout && pool->first == NULL)

{

pool->counter--;

condition_unlock(&pool->ready);

// 跳出循环之前要记得解锁

break;

}

condition_unlock(&pool->ready);

}

printf("thread 0x%x is exting\n", (int)pthread_self());

return NULL;

}

// 初始化线程池

void threadpool_init(threadpool_t *pool, int threads)

{

// 对线程池中的各个字段初始化

condition_init(&pool->ready);

pool->first = NULL;

pool->last = NULL;

pool->counter = 0;

pool->idle = 0;

pool->max_threads = threads;

pool->quit = 0;

}

// 往线程池中添加任务

void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)

{

// 生成新任务

task_t *newtask = (task_t *)malloc(sizeof(task_t));

newtask->run = run;

newtask->arg = arg;

newtask->next = NULL;

condition_lock(&pool->ready);

// 将任务添加到队列

if (pool->first == NULL)

pool->first = newtask;

else

pool->last->next = newtask;

pool->last = newtask;

// 如果有等待线程,则唤醒其中一个

if (pool->idle > 0)

condition_signal(&pool->ready);

else if (pool->counter < pool->max_threads)

{

// 没有等待线程,并且当前线程数不超过最大线程数,则创建一个新线程

pthread_t tid;

pthread_create(&tid, NULL, thread_routine, pool);

pool->counter++;

}

condition_unlock(&pool->ready);

}

// 销毁线程池

void threadpool_destroy(threadpool_t *pool)

{

if (pool->quit)

{

return;

}

condition_lock(&pool->ready);

pool->quit = 1;

if (pool->counter > 0)

{

if (pool->idle > 0)

condition_broadcast(&pool->ready);

// 处于执行任务状态中的线程,不会收到广播

// 线程池需要等待执行任务状态中的线程全部退出

while (pool->counter > 0)

condition_wait(&pool->ready);

}

condition_unlock(&pool->ready);

condition_destroy(&pool->ready);

}

main.c

#include "threadpool.h"

#include <unistd.h>

#include <stdio.h>

#include <stdlib.h>

void* mytask(void *arg)

{

printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);

sleep(1);

free(arg);

return NULL;

}

int main(void)

{

threadpool_t pool;

threadpool_init(&pool, 3);

int i;

for (i=0; i<10; i++)

{

int *arg = (int *)malloc(sizeof(int));

*arg = i;

threadpool_add_task(&pool, mytask, arg);

}

//sleep(15);

threadpool_destroy(&pool);

return 0;

}

makefile:

.PHONY:clean

CC=gcc

CFLAGS=-Wall -g

ALL=main

all:$(ALL)

OBJS=threadpool.o main.o condition.o

.c.o:

$(CC) $(CFLAGS) -c $<

main:$(OBJS)

$(CC) $(CFLAGS) $^ -o [email protected] -lpthread -lrt

clean:

rm -f $(ALL) *.o

一个线程池的简单的实现

时间: 2024-12-17 06:06:42

一个线程池的简单的实现的相关文章

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

线程池的简单实现

几个基本的线程函数: //线程操纵函数//创建:   int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);//终止自身    void pthread_exit(void *retval);//终止其他:   int pthread_cancel(pthread_t tid); 发送终止信号后目标线程不一定终止,要调用join函数等待//阻塞

死磕 java线程系列之自己动手写一个线程池

欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池. 属性分析 线程池,顾名思义它首先是一个"池",这个池里面放的是线程,线程是用来执行任务的. 首先,线程池中的线程应该是有类别的,有的是核心线程,有

二 Java利用等待/通知机制实现一个线程池

接着上一篇博客的 一Java线程的等待/通知模型 ,没有看过的建议先看一下.下面我们用等待通知机制来实现一个线程池 线程的任务就以打印一行文本来模拟耗时的任务.主要代码如下: 1  定义一个任务的接口. 1 /* 2 * 任务的接口 3 */ 4 public interface Task { 5 void doSomething(); 6 } 2  实现一个具体的任务. 1 /* 2 * 具体的任务 3 */ 4 public class PrintTask implements Task{

理解线程池,自己实现一个线程池

线程池本质是一个生产者-消费者模式,一边维护一些线程执行任务,一边由主线程添加一些任务.现在我们抛弃源码中一些繁杂的状态判断,自己写一个线程池. public class poolT { //可能频繁增删任务,链表队列效率较高 private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); private final HashSet<Work> workers

python创建一个线程和一个线程池

创建一个线程 1.示例代码 import time import threading def task(arg): time.sleep(2) while True: num = input('>>>') t = threading.Thread(target=task.args=(num,)) t.start() 创建一个线程池 1.示例代码 import time from concurrent.futures import ThreadPoolExecutor def task(m

死磕 java线程系列之自己动手写一个线程池(续)

(手机横屏看源码更方便) 问题 (1)自己动手写的线程池如何支持带返回值的任务呢? (2)如果任务执行的过程中抛出异常了该怎么处理呢? 简介 上一章我们自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,我们自己能否实现呢?必须可以,今天我们就一起来实现带返回值任务的线程池. 前情回顾 首先,让我们先回顾一下上一章写的线程池: (1)它包含四个要素:核心线程数.最大线程数.任务队列.拒绝策略: (2)它具有执行无返回值任务的能力: (3)它无法处理有返回值的任务: (4)它无法处理任务

线程池的简单Demo

服务器端: public class Server { private int port=8821; private ServerSocket serverSocket; private ExecutorService executorService;//线程池 private final int POOL_SIZE=10;//单个CPU线程池大小 public MultiThreadServer()throws IOException{ serverSocket=new ServerSocke

JAVA中线程池的简单使用

比如现在有10个线程,但每次只想运行3个线程,当这3个线程中的任何一个运行完后,第4个线程接着补上.这种情况可以使用线程池来解决,线程池用起来也相当的简单,不信,你看: package com.demo; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public