多线程编程—线程池的实现

执行与任务分离的组件— 线程池

多线程技术主要解决了处理器单元内多个线程执行的问题,它可以显著的减少处理器单元的闲置时间,增加处理器单元的吞吐能力。线程池是多线程编程的一个必要组件,并且对于很多编程人员都是透明的,更是神秘的。有幸能为大家解析其中缘由,尚有不妥之处,欢迎大家抛砖。

线程池的概念,是一个用来管理一组执行任务线程的工具。既然是管理工具,那么该工具管理是用来管理任务与执行的。如图一线程池组件拓扑图,执行队列(Workers),任务队列(Jobs)和池管理(Pool Manager)三部分组成。

执行队列(Workers)是用来存放运行线程的队列。

任务队列(Jobs)是用来存放需要被执行的任务队列。

池管理(Pool Manager)主要是管理执行队列的执行顺序,执行任务的时间长短,对长时间没有使用的执行单元进行释放,执行单元满负荷运行的时及时添加执行单元;记录未执行的任务数量,对新任务入队,即将执行的任务出队等等。

图一 线程池组件拓扑图

执行队列(Workers)中的每一个执行单元(Worker)由哪些元素组成?线程ID,退出标志。

任务队列(Jobs)中的每一个任务(Jobs)的组成元素?执行每一个任务的具体执行函数,每一个任务的执行参数。

池管理(Pool Manager)由哪些元素组成?每一个新任务添加与执行时的移除用的互斥锁,每一个线程挂起的时所等待的条件变量。

根据分析如图二线程池的类图。

图二线程池的类图

到这里一个简单的线程池就已经可以呼之欲出了。以下为实现代码

/*
 * Author: WangBoJing
 * email: [email protected] 
 * github: https://github.com/wangbojing
 */
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define LL_ADD(item, list) do {  item->prev = NULL;    item->next = list;    list = item;    } while(0)

#define LL_REMOVE(item, list) do {         if (item->prev != NULL) item->prev->next = item->next;  if (item->next != NULL) item->next->prev = item->prev;  if (list == item) list = item->next;      item->prev = item->next = NULL;       } while(0)

typedef void (*JOB_CALLBACK)(void *);
struct NTHREADPOOL;

typedef struct NWORKER {
 pthread_t thread;
 int terminate;
 struct NTHREADPOOL *pool;
 struct NWORKER *next;
 struct NWORKER *prev;
} nWorker;

typedef struct NJOB {
 JOB_CALLBACK job_func;
 void *arg;
 struct NJOB *next;
 struct NJOB *prev;
} nJob;

typedef struct NTHREADPOOL {
 struct NWORKER *workers;
 struct NJOB *jobs;
 pthread_mutex_t jobs_mtx;
 pthread_cond_t jobs_cond;
} nThreadPool;

void *ntyWorkerThread(void *arg) {

 nWorker *worker = (nWorker*)arg;
 
 while (1) {
  pthread_mutex_lock(&worker->pool->jobs_mtx);
  while (worker->pool->jobs == NULL) {
   if (worker->terminate) break;
   pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);
  }
  
  if (worker->terminate) break;
 
  nJob *job = worker->pool->jobs;
  if (job != NULL) {
   LL_REMOVE(job, worker->pool->jobs);
  }
  
  pthread_mutex_unlock(&worker->pool->jobs_mtx);
  if (job == NULL) continue;
  job->job_func(job);
  
  usleep(1);
 }
 
 free(worker);
 pthread_exit(NULL);
 
}

int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) {

 if (pool == NULL) return 1;
 if (numWorkers < 1) numWorkers = 1;
 
 memset(pool, 0, sizeof(nThreadPool));
 
 pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
 memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond));
 
 pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
 memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx));
 
 int i = 0;
 for (i = 0;i < numWorkers;i ++) {
 
  nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
  if (worker == NULL) {
   perror("malloc");
   return 1;
  }
  
  memset(worker, 0, sizeof(nWorker));
  worker->pool = pool;
  
  int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker);
  if (ret) {
   perror("pthread_create");
   free(worker);
   return 1;
  }
  
  LL_ADD(worker, worker->pool->workers);
 }
}

void ntyThreadPoolShutdown(nThreadPool *pool) {

 nWorker *worker = NULL;
 for (worker = pool->workers;worker != NULL;worker = worker->next) {
  worker->terminate = 1;
 }
 
 pthread_mutex_lock(&pool->jobs_mtx);
 
 pool->workers = NULL;
 pool->jobs = NULL;
 pthread_cond_broadcast(&pool->jobs_cond);
 
 pthread_mutex_unlock(&pool->jobs_mtx);
 
}

void ntyThreadPoolPush(nThreadPool *pool, nJob *job) {

 pthread_mutex_lock(&pool->jobs_mtx);
 LL_ADD(job, pool->jobs);
 
 pthread_cond_signal(&pool->jobs_cond);
 pthread_mutex_unlock(&pool->jobs_mtx);
 
}

/********************************* debug thread pool *********************************/

#define KING_MAX_THREADS  80
#define KING_COUNTER_SIZE 1000

void king_counter(void *arg) {
 nJob *job = (nJob*)arg;
 
 int index = *(int *)job->arg;
 printf("index: %d, selfid:%lu\n", index, pthread_self());
 free(job->arg);
 free(job);
}

int main(int argc, char *argv[]) {

 nThreadPool pool;
 ntyThreadPoolCreate(&pool, KING_MAX_THREADS);
 
 int i = 0;
 for (i = 0;i < KING_COUNTER_SIZE;i ++) {
 
  nJob *job = (nJob*)malloc(sizeof(nJob));
  if (job == NULL) {
   perror("malloc");
   exit(1);
  }
  
  job->job_func = king_counter;
  job->arg = malloc(sizeof(int));
  
  *(int*)job->arg = i;
  ntyThreadPoolPush(&pool, job);
  
 }
 getchar();
 printf("You are very good !!!!\n");
 
}

这样的线程池还是只是一个Demo,原因有如下几点需要我们值得改进的。

  1. 线程池的线程数量是确定的,不能随着系统任务请求数量放缩线程池的大小。
  2. 任务数量的统计,并没有对任务队列进行统计
  3. 执行任务中的线程数量,等待执行的任务数量进行统计
  4. 每一个执行任务的时间没有做限制,
  5. IO密集型与计算密集型区分,线程池非常常用,但是根据不同的业务场景需要设置不同配置
  6. 在用户任务执行函数里,用户主动的调用了pthread_exit退出线程的保护机制

针对于以上几点问题,改进了一版线程池

/*
 * Author: WangBoJing
 * email: [email protected] 
 * github: https://github.com/wangbojing
 */
 
 
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>

typedef void (*JOB_CALLBACK)(void *);

typedef struct NJOB {
 struct NJOB *next;
 JOB_CALLBACK func;
 void *arg;
} nJob;

typedef struct NWORKER {
 struct NWORKER *active_next;
 pthread_t active_tid;
} nWorker;

typedef struct NTHREADPOOL {
 struct NTHREADPOOL *forw;
 struct NTHREADPOOL *back;
 pthread_mutex_t mtx;
 
 pthread_cond_t busycv;
 pthread_cond_t workcv;
 pthread_cond_t waitcv;
 
 nWorker *active;
 nJob *head;
 nJob *tail;
 pthread_attr_t attr;
 
 int flags;
 unsigned int linger;
 int minimum;
 int maximum;
 int nthreads;
 int idle;
 
} nThreadPool;

static void* ntyWorkerThread(void *arg);
#define NTY_POOL_WAIT   0x01
#define NTY_POOL_DESTROY  0x02

static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;
static sigset_t fillset;
nThreadPool *thread_pool = NULL;

static int ntyWorkerCreate(nThreadPool *pool) {
 sigset_t oset;
 pthread_t thread_id;
 pthread_sigmask(SIG_SETMASK, &fillset, &oset);
 int error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool);
 pthread_sigmask(SIG_SETMASK, &oset, NULL);
 return error;
}

static void ntyWorkerCleanup(nThreadPool * pool) {

 --pool->nthreads;
 if (pool->flags & NTY_POOL_DESTROY) {
  if (pool->nthreads == 0) {
   pthread_cond_broadcast(&pool->busycv);
  }
 } else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {
  pool->nthreads ++;
 }
 pthread_mutex_unlock(&pool->mtx);
 
}

static void ntyNotifyWaiters(nThreadPool *pool) {
 
 if (pool->head == NULL && pool->active == NULL) {
  pool->flags &= ~NTY_POOL_WAIT;
  pthread_cond_broadcast(&pool->waitcv);
 }
 
}

static void ntyJobCleanup(nThreadPool *pool) {
 
 pthread_t tid = pthread_self();
 nWorker *activep;
 nWorker **activepp;
 
 pthread_mutex_lock(&pool->mtx);
 for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {
  *activepp = activep->active_next;
  break;
 }
 if (pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool);
 
}

static void* ntyWorkerThread(void *arg) {
 nThreadPool *pool = (nThreadPool*)arg;
 nWorker active;
 
 int timeout;
 struct timespec ts;
 JOB_CALLBACK func;
 
 pthread_mutex_lock(&pool->mtx);
 pthread_cleanup_push(ntyWorkerCleanup, pool);
 active.active_tid = pthread_self();
 
 while (1) {
 
  pthread_sigmask(SIG_SETMASK, &fillset, NULL);
  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  
  timeout = 0;
  pool->idle ++;
  
  if (pool->flags & NTY_POOL_WAIT) {
   ntyNotifyWaiters(pool);
  }
  
  while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {
   if (pool->nthreads <= pool->minimum) {
    
    pthread_cond_wait(&pool->workcv, &pool->mtx);
    
   } else {
   
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += pool->linger;
    if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {
    
     timeout = 1;
     break;
    }
   }
  }
  pool->idle --;
  
  if (pool->flags & NTY_POOL_DESTROY) break;
  
  nJob *job = pool->head;  
  if (job != NULL) {
  
   timeout = 0;
   func = job->func;
   
   void *job_arg = job->arg;
   pool->head = job->next;
   
   if (job == pool->tail) {
    pool->tail == NULL;
   }
   
   active.active_next = pool->active;
   pool->active = &active;
   
   pthread_mutex_unlock(&pool->mtx);
   pthread_cleanup_push(ntyJobCleanup, pool);
   
   free(job);
   func(job_arg);
   
   pthread_cleanup_pop(1);
  }
  
  if (timeout && (pool->nthreads > pool->minimum)) {
   break;
  }
 }
 pthread_cleanup_pop(1);
 
 return NULL;
 
}

static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) {

 struct sched_param param;
 void *addr;
 size_t size;
 int value;
 
 pthread_attr_init(new_attr);
 
 if (old_attr != NULL) {
 
  pthread_attr_getstack(old_attr, &addr, &size);
  pthread_attr_setstack(new_attr, NULL, size);
  
  pthread_attr_getscope(old_attr, &value);
  pthread_attr_setscope(new_attr, value);
  
  pthread_attr_getinheritsched(old_attr, &value);
  pthread_attr_setinheritsched(new_attr, value);
  
  pthread_attr_getschedpolicy(old_attr, &value);
  pthread_attr_setschedpolicy(new_attr, value);
  
  pthread_attr_getschedparam(old_attr, &param);
  pthread_attr_setschedparam(new_attr, &param);
  
  pthread_attr_getguardsize(old_attr, &size);
  pthread_attr_setguardsize(new_attr, size);
  
 }
 pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);
 
}

nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) {

 sigfillset(&fillset);
 if (min_threads > max_threads || max_threads < 1) {
  errno = EINVAL;
  return NULL;
 }
 
 nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool));
 if (pool == NULL) {
  errno = ENOMEM;
  return NULL;
 }
 
 pthread_mutex_init(&pool->mtx, NULL);
 pthread_cond_init(&pool->busycv, NULL);
 pthread_cond_init(&pool->workcv, NULL);
 pthread_cond_init(&pool->waitcv, NULL);
 
 pool->active = NULL;
 pool->head = NULL;
 pool->tail = NULL;
 pool->flags = 0;
 pool->linger = linger;
 pool->minimum = min_threads;
 pool->maximum = max_threads;
 pool->nthreads = 0;
 pool->idle = 0;
 
 ntyCloneAttributes(&pool->attr, attr);
 pthread_mutex_lock(&nty_pool_lock);
 
 if (thread_pool == NULL) {
  pool->forw = pool;
  pool->back = pool;
  
  thread_pool = pool;
  
 } else {
 
  thread_pool->back->forw = pool;
  pool->forw = thread_pool;
  pool->back = pool->back;
  thread_pool->back = pool;
  
 }
 
 pthread_mutex_unlock(&nty_pool_lock);
 return pool;
 
}

int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) {

 nJob *job = (nJob*)malloc(sizeof(nJob));
 if (job == NULL) {
  errno = ENOMEM;
  return -1;
 }
 job->next = NULL;
 job->func = func;
 job->arg = arg;
 
 pthread_mutex_lock(&pool->mtx);
 if (pool->head == NULL) {
  pool->head = job;
 } else {
  pool->tail->next = job;
 }
 pool->tail = job;
 
 if (pool->idle > 0) {
  pthread_cond_signal(&pool->workcv);
 } else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {
  pool->nthreads ++;
 }
 
 pthread_mutex_unlock(&pool->mtx);
}

void nThreadPoolWait(nThreadPool *pool) {

 pthread_mutex_lock(&pool->mtx);
 pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);
 
 while (pool->head != NULL || pool->active != NULL) {
  pool->flags |= NTY_POOL_WAIT;
  pthread_cond_wait(&pool->waitcv, &pool->mtx);
 }
 
 pthread_cleanup_pop(1);
}

void nThreadPoolDestroy(nThreadPool *pool) {

 nWorker *activep;
 nJob *job;
 
 pthread_mutex_lock(&pool->mtx);
 pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);
 
 pool->flags |= NTY_POOL_DESTROY;
 pthread_cond_broadcast(&pool->workcv);
 
 for (activep = pool->active;activep != NULL;activep = activep->active_next) {
  pthread_cancel(activep->active_tid);
 }
 
 while (pool->nthreads != 0) {
  pthread_cond_wait(&pool->busycv, &pool->mtx);
 }
 
 pthread_cleanup_pop(1);
 pthread_mutex_lock(&nty_pool_lock);
 
 if (thread_pool == pool) {
  thread_pool = pool->forw;
 }
 
 if (thread_pool == pool) {
  thread_pool = NULL;
 } else {
  pool->back->forw = pool->forw;
  pool->forw->back = pool->back;
 }
 
 pthread_mutex_unlock(&nty_pool_lock);
 
 for (job = pool->head;job != NULL;job = pool->head) {
  pool->head = job->next;
  free(job);
 }
 pthread_attr_destroy(&pool->attr);
 free(pool);
 
}

/********************************* debug thread pool *********************************/

void king_counter(void *arg) {
 int index = *(int*)arg;
 printf("index : %d, selfid : %lu\n", index, pthread_self());
 
 free(arg);
 usleep(1);
}

#define KING_COUNTER_SIZE 1000

int main(int argc, char *argv[]) {

 nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);
 
 int i = 0;
 for (i = 0;i < KING_COUNTER_SIZE;i ++) {
 
  int *index = (int*)malloc(sizeof(int));
  
  memset(index, 0, sizeof(int));
  memcpy(index, &i, sizeof(int));
  
  ntyThreadPoolQueue(pool, king_counter, index);
  
 }
 
 
 getchar();
 printf("You are very good !!!!\n");
}
时间: 2024-10-15 02:59:55

多线程编程—线程池的实现的相关文章

Linux下简单的多线程编程--线程池的实现

/* 写在前面的话: 今天刚“开原”,选择了一篇关于线程池的文件与大家分享,希望能对您学习有所帮助,也希望能与大家共同学习! 选择在这个特殊的时候注册并发文章也是有一些我个人特殊的意义的,看我的id(西游小学生.45)就知道了,哈哈.在这里也很感谢博客园的员工,刚发申请两分钟就同意了. */ 最近由于要写一个类似于QQ的程序,所以想到要用到多线程.既然要用多线程,那何不写一个线程池?于是上网搜了搜多线程的代码,发现大多都不是很完善,或者有些小bug.所以,在这里贴出一个完整的,经过我多重测试的,

Java 线程池和多线程编程 ——线程池理解与创建

JDK1.5 引入了 Executor框架 ,对任务提交和执行进行解耦 , 定义任务后交由线程池执行. 线程池是由java.util.concurrent 包中Executors类的工厂方法创建线程池. -------------------------------------------------------------------------------- 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程. public static ExecutorService n

C#多线程之线程池篇3

在上一篇C#多线程之线程池篇2中,我们主要学习了线程池和并行度以及如何实现取消选项的相关知识.在这一篇中,我们主要学习如何使用等待句柄和超时.使用计时器和使用BackgroundWorker组件的相关知识. 五.使用等待句柄和超时 在这一小节中,我们将学习如何在线程池中实现超时和正确地实现等待.具体操作步骤如下: 1.使用Visual Studio 2015创建一个新的控制台应用程序. 2.双击打开"Program.cs"文件,编写代码如下所示: 1 using System; 2 u

多线程及线程池学习心得

一.线程的应用与特点 多线程是程序员不可或缺的技术能力,多线程技术在各个方面都有应用,特别在性能优化上更是起到至关重要的作用.但是,如果多线程写得不好,往往会适得其反,特别是高并发时会造成阻塞.超时等现象.多线程具有以下特点:1.独立性,拥有自己独立的资源,拥有自己私有的地址空间:2.动态性,进程具有自己的生命周期和各种不同的状态:3.并发性,多个进程可以在单个处理器上并发执行,不会相互影响,并行是指同一时刻有多条指令在多个处理器上同时执行.线程是进程的组成部分,一个进程可以拥有多个线程,一个线

17.并发编程--线程池

并发编程线程池 合理利用线程池能够带来三个好处. 第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 第二:提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行. 第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.但是要做到合理的利用线程池,必须对其原理了如指掌. 1. Executor 框架简介 在 Java 5 之后,并发编程引入了一堆新的启动.调度和管理 线

C#多线程之线程池篇2

在上一篇C#多线程之线程池篇1中,我们主要学习了如何在线程池中调用委托以及如何在线程池中执行异步操作,在这篇中,我们将学习线程池和并行度.实现取消选项的相关知识. 三.线程池和并行度 在这一小节中,我们将学习对于大量的异步操作,使用线程池和分别使用单独的线程在性能上有什么差异性.具体操作步骤如下: 1.使用Visual Studio 2015创建一个新的控制台应用程序. 2.双击打开"Program.cs"文件,编写代码如下所示: 1 using System; 2 using Sys

Linux程序设计学习笔记----多线程编程线程同步机制之互斥量(锁)与读写锁

互斥锁通信机制 基本原理 互斥锁以排他方式防止共享数据被并发访问,互斥锁是一个二元变量,状态为开(0)和关(1),将某个共享资源与某个互斥锁逻辑上绑定之后,对该资源的访问操作如下: (1)在访问该资源之前需要首先申请互斥锁,如果锁处于开状态,则申请得到锁并立即上锁(关),防止其他进程访问资源,如果锁处于关,则默认阻塞等待. (2)只有锁定该互斥锁的进程才能释放该互斥锁. 互斥量类型声明为pthread_mutex_t数据类型,在<bits/pthreadtypes.h>中有具体的定义. 互斥量

Java并发编程、多线程、线程池…

Java多线程干货系列(1):Java多线程基础http://www.importnew.com/21136.html#comment-651146 40个Java多线程问题总结http://www.importnew.com/18459.html#comment-651217 Java线程面试题 Top 50http://www.importnew.com/12773.html Java并发编程:Thread类的使用http://www.cnblogs.com/dolphin0520/p/39

跟我学Java多线程——ThreadPoolExecutor(线程池)

什么是线程池 多线程开发中,由于线程数量多,并且每个线程执行一段时间就结束,所以要频繁的创建线程,但是这样频繁的创建线程会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间.在这种情况下,人们就想要一种可以线程执行完后不用销毁,同时该线程还可以去执行其他任务,在这样的情况下线程池就出现了. 线程池就是线程的池子,任务提交到线程池后,就从线程池中取出一个空闲的线程为之服务,服务完后不销毁该线程,而是将该线程还回到线程池中. 在线程池的编程模式下,任务是提交给整个线程池,而不是直接交给某个线程,