[APUE] 线程池

1 线程池技术简介

“池”化技术通常都是为了应对“小”的特点而开发出来的,比如:

内存池是针对小块内存的申请和释放过于频繁导致的效率低下问题。先分配一定量的内存,按照大小分类,当程序需要小块内存(这里的小是相对而言的,看实现方式)时,就从某个大块内存中截取小块内存,用完了之后,就再放入大块内存中。当然,这里说的只是基本的思想,在实现的时候,有针对不同的分配方式的优化方案。

线程池是针对大量短任务申请线程和释放线程过于频繁导致的效率低下问题,如果任务运行时间较短,而申请线程和释放线程的时间较长,那么频繁地进行线程的申请和释放必将降低效率。先分配一定数量的线程,如果有任务到达,就从线程池中取出一个线程处理任务,处理完成后,将线程归还给线程池。于是,就避免了过于频繁线程的申请和释放导致的效率低下问题。而且,还可以根据当前线程的负载进行线程的增加和减少,当现有的线程比较忙碌时,可以增加线程,但是,线程也不能无限增加,因为过多的线程会增大系统开销,当现有的线程多数处于空闲时,可以减少线程。

当然,还有其它的池化技术,不过,原理大体都是类似的。

2 线程池的实现方案

初始化时,线程池创建默认个数的线程,并将线程ID存放到工作者队列中。

在线程池中设置了一个finish的结束变量,当用户调用了threadpool_destroy()时,就设置该变量,并通知所有的工作者,然后等待所有的工作者线程退出。

每个线程循环检测工作队列,当工作队列不为空时,就从该队列中取出一个工作执行,如果工作队列为空并且设置了finish变量,就退出工作者线程,如果工作队列为空,但是没有设置finish变量,则继续等待工作。

本实现没有考虑负载均衡。

3 线程池的源代码及注释

/* threadpool.h */
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H

#include <pthread.h>

#define	THREADPOOL_MAX_THREADS	8 /* 最大线程数 */
#define	THREADPOOL_MIN_THREADS	1 /* 最小线程数 */
#define	THREADPOOL_DEF_THREADS	4 /* 默认线程数 */

struct work_queue_s {
	void* (*routine)(void *); /* 工作例程 */
	void *arg; /* 工作例程参数 */
	struct work_queue_s *next;
};

typedef struct work_queue_s work_queue_t; /* 工作队列 */

struct worker_queue_s {
	pthread_t id; /* 线程ID */
	struct worker_queue_s *next;
};

typedef struct worker_queue_s worker_queue_t; /* 工作者队列 */

struct threadpool_s {
	int finish; /* 是否结束 */
	int cur_thread_num; /* 当前线程数 */
	worker_queue_t *workers; /* 工作者队列 */
	work_queue_t *first; /* 工作队列首指针 */
	work_queue_t *last; /* 工作队列尾指针 */
	pthread_cond_t queue_nonempty; /* 工作队列是否为空的条件变量 */
	pthread_mutex_t queue_lock; /* 工作队列锁 */
};

typedef struct threadpool_s threadpool_t; /* 线程池结构 */

/* 创建线程池 */
threadpool_t* threadpool_create();

/* 向线程池中添加工作 */
int threadpool_insert_work(threadpool_t*, void* (*)(void *), void*);

/* 创建工作者 */
int threadpool_create_worker(threadpool_t*);

/* 销毁工作者 */
int threadpool_destroy_worker(threadpool_t*);

/* 销毁线程池 */
int threadpool_destroy(threadpool_t*);

#endif
/* threadpool.c */
#include <stdio.h>
#include <stdlib.h>
#include "threadpool.h"

/* 创建线程,分配线程池结构,初始化结构中的各个元素,创建默认个数的线程 */
threadpool_t* threadpool_create()
{
	threadpool_t *tp = (threadpool_t *)malloc(sizeof(threadpool_t));
	if(tp == NULL) {
		fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
		return tp;
	}

	tp->finish = 0;
	tp->cur_thread_num = 0;
	tp->workers = NULL;
	tp->first = NULL;
	tp->last = NULL;

	if(pthread_cond_init(&tp->queue_nonempty, NULL) != 0) {
		fprintf(stderr, "%s: pthread_cond_init failed\n", __FUNCTION__);
		free(tp);
		return NULL;
	}

	if(pthread_mutex_init(&tp->queue_lock, NULL) != 0) {
		fprintf(stderr, "%s: pthread_mutex_init failed\n", __FUNCTION__);
		free(tp);
		return NULL;
	}

	int cnt = THREADPOOL_DEF_THREADS;
	while(cnt--) {
		threadpool_create_worker(tp);
	}
	printf("create threadpool success\n");
	printf("contain %d threads\n", THREADPOOL_DEF_THREADS);

	return tp;
}

/* 往线程池中添加工作 */
int threadpool_insert_work(threadpool_t *tp, void* (*routine)(void *), void* arg)
{
	work_queue_t *wq = malloc(sizeof(work_queue_t));
	if(wq == NULL) {
		fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
		return -1;
	}

	wq->routine = routine;
	wq->arg = arg;
	wq->next = NULL;

	pthread_mutex_lock(&tp->queue_lock);
	if(tp->first == NULL && tp->last == NULL) {
		tp->first = wq;
		tp->last = wq;
	}
	else {
		tp->last->next = wq;
		tp->last = wq;
	}
	pthread_mutex_unlock(&tp->queue_lock);
	pthread_cond_signal(&tp->queue_nonempty);

	return 0;
}

/* 工作者线程的执行函数 */
void* thread_routine(void* arg)
{
	work_queue_t *wq = NULL;
	threadpool_t *tp = (threadpool_t *)arg;

	while(1) {
		pthread_mutex_lock(&tp->queue_lock);
		while(tp->finish == 0 && tp->first == NULL) {
			pthread_cond_wait(&tp->queue_nonempty, &tp->queue_lock);
		}
		if(tp->finish && tp->first == NULL) {
			pthread_mutex_unlock(&tp->queue_lock);
			pthread_exit(NULL);
		}

		work_queue_t *wq = NULL;
		wq = tp->first;
		tp->first = wq->next;
		if(wq->next == NULL) {
			tp->last = NULL;
		}
		pthread_mutex_unlock(&tp->queue_lock);

		wq->routine(wq->arg);
		printf("current thread: %u\n", (unsigned int)pthread_self());
		free(wq);
	}
}

/* 创建工作者,并将工作者的线程ID记录到工作者队列中 */
int threadpool_create_worker(threadpool_t *tp)
{
	pthread_t tid;
	if(pthread_create(&tid, NULL, thread_routine, tp) != 0) {
		fprintf(stderr, "%s: pthread_create failed\n", __FUNCTION__);
		return -1;
	}

	worker_queue_t *worker = (worker_queue_t *)malloc(sizeof(worker_queue_t));
	if(worker == NULL) {
		fprintf(stderr, "%s: malloc failed\n", __FUNCTION__);
		return -1;
	}
	worker->id = tid;
	worker->next = NULL;

	worker->next = tp->workers;
	tp->workers = worker;
	tp->cur_thread_num++;
	printf("create worker %u\n", (unsigned int)tid);

	return 0;
}

/* 销毁工作者 */
int threadpool_destroy_worker(threadpool_t *tp)
{
	worker_queue_t *worker = NULL;
	if(tp->workers == NULL) {
		return -1;
	}
	worker = tp->workers;
	tp->workers = worker->next;
	tp->cur_thread_num--;

	pthread_t tid = worker->id;
	free(worker);

	if(pthread_join(tid, NULL) != 0) {
		fprintf(stderr, "%s: pthread_join failed\n", __FUNCTION__);
		return -1;
	}
	printf("destroy %u success\n", (unsigned int)tid);

	return 0;
}

/* 销毁线程池,等待所有线程结束 */
int threadpool_destroy(threadpool_t *tp)
{
	pthread_mutex_lock(&tp->queue_lock);
	tp->finish = 1;
	pthread_mutex_unlock(&tp->queue_lock);
	pthread_cond_broadcast(&tp->queue_nonempty);
	int cnt = tp->cur_thread_num;

	printf("ready to destroy %d worker\n", cnt);

	while(cnt--) {
		threadpool_destroy_worker(tp);
	}

	free(tp);

	return 0;
}
/* test_threadpool.c */
#include <stdio.h>
#include "threadpool.h"

void *routine(void *arg)
{
	printf("%d\n", (int)arg);
	return NULL;
}

int main(int argc, char const *argv[])
{
	threadpool_t *tp = threadpool_create();

	int i = 0;
	while(i < 10) {
		threadpool_insert_work(tp, routine, (void*)i);
		++i;
	}

	threadpool_destroy(tp);

	return 0;
}

4 程序的执行结果

上述程序的执行结果是:

create worker 3076107072
create worker 3067714368
create worker 3059321664
create worker 3050928960
create threadpool success
contain 4 threads
0
ready to destroy 4 worker
1
current thread: 3067714368
2
current thread: 3067714368
3
current thread: 3067714368
4
current thread: 3067714368
5
current thread: 3067714368
6
current thread: 3067714368
7
current thread: 3067714368
8
current thread: 3067714368
9
current thread: 3067714368
current thread: 3076107072
destroy 3050928960 success
destroy 3059321664 success
destroy 3067714368 success
destroy 3076107072 success

可以看到,有十个工作,但是只有两个线程执行了,而且一个线程执行了9个工作,另一个线程执行了1个工作。

如果将工作函数改成这样:

void *routine(void *arg)
{
	sleep(1);
	printf("%d\n", (int)arg);
	return NULL;
}

结果就变成:

create worker 3075525440
create worker 3067132736
create worker 3058740032
create worker 3050347328
create threadpool success
contain 4 threads
ready to destroy 4 worker
2
1
current thread: 3067132736
0
current thread: 3075525440
3
current thread: 3050347328
current thread: 3058740032
5
current thread: 3075525440
6
7
current thread: 3058740032
current thread: 3050347328
4
current thread: 3067132736
destroy 3050347328 success
9
current thread: 3058740032
8
current thread: 3075525440
destroy 3058740032 success
destroy 3067132736 success
destroy 3075525440 success

当线程执行工作时,调用了sleep(),线程休眠,此时,可以调度其它的线程执行工作。

5 实现过程中遇到的问题

(1)在编写多线程程序时,通常会用到锁。使用锁的时候,特别要注意的是:锁保护的是哪那个成员,那个成员是否有必要用锁保护。比如,这里的工作队列,由于工作队列可能会被多个线程使用,某些线程想要从中获取工作,某个线程想向其中添加工作,于是,需要用锁来进行保护。而这里的工作者队列呢?由于工作者队列不会被多个线程使用,它只能被主线程使用,因此,不需要用锁进行保护。

(2)关于条件变量。在使用条件变量时,如果有多个条件,在修改任意一个条件时,都要进行通知。

[APUE] 线程池,布布扣,bubuko.com

时间: 2024-10-21 10:24:05

[APUE] 线程池的相关文章

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

线程的控制和线程池

一.WaitHandle: ”.Net 中提供了一些线程间更自由通讯的工具,他们提供了通过"信号"进行通讯的机制 可以通过ManualResetEvent,AutoResetEvent(他是在开门并且一个 WaitOne 通过后自动关门)来进行线程间的通讯 waitOne:    等待开门 Set:           开门 Reset:       关门 static void Main(string[] args) { ManualResetEvent mre = new Manu

内存池、进程池、线程池

首先介绍一个概念"池化技术 ".池化技术 一言以蔽之就是:提前保存大量的资源,以备不时之需以及重复使用. 池化技术应用广泛,如内存池,线程池,连接池等等.内存池相关的内容,建议看看Apache.Nginx等开源web服务器的内存池实现. 起因:由于在实际应用当中,分配内存.创建进程.线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作.           因此,当程序中需要频繁的进行内存申请释放,进程.线程创建销毁等操作时,通常会使用内存池.进程池.

缓冲池,线程池,连接池

SSH:[email protected]:unbelievableme/object-pool.git   HTTPS:https://github.com/unbelievableme/object-pool.git 缓冲池 设计要点:包含三个队列:空缓冲队列(emq),装满输入数据的输入的队列(inq),装满输出数据的输出队列(outq),输入程序包括收容输入(hin),提取输入(sin),输出程序包括收容输出(hout)和提取输出(sout). 注意点:输入程序和输出程序会对缓冲区并发访

记5.28大促压测的性能优化&mdash;线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

线程池的创建

package com.newer.cn; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test1 { public static void main(String[] args) { // 创建线程池的方式 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程. E

Java底层技术系列文章-线程池框架

一.线程池结构图    二.示例 定义线程接口 public class MyThread extends Thread { @Override publicvoid run() { System.out.println(Thread.currentThread().getName() + "正在执行"); }}   1:newSingleThreadExecutor ExecutorService pool = Executors. newSingleThreadExecutor()

线程池中的线程的排序问题

1 package org.zln.thread.poolqueue; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 6 import java.util.Comparator; 7 import java.util.UUID; 8 import java.util.concurrent.*; 9 10 /** 11 * 线程池中的线程的排序问题 12 * Created by sherry on 16/11/4

多线程篇七:通过Callable和Future获取线程池中单个务完成后的结果

使用场景:如果需要拿到线程的结果,或者在线程完成后做其他操作,可以使用Callable 和 Futrue 1.定义一个线程池,向线程池中提交单个callable任务 ExecutorService threadPools=Executors.newSingleThreadExecutor(); Future<String> future=threadPools.submit(new Callable<String>() { @Override public String call(