linux网络编程学习笔记之五 -----并发机制与线程池

进程线程分配方式

简述下常见的进程和线程分配方式:(好吧,我只是举几个例子作为笔记。。。并发的水太深了,不敢妄谈。。。)

1、进程线程预分配

简言之,当I/O开销大于计算开销且并发量较大时,为了节省每次都要创建和销毁进程和线程的开销。可以在请求到达前预先进行分配。

2、进程线程延迟分配

预分配节省了处理时的负担,但操作系统管理这些进程线程也会带来一定的开销。由此,有个折中的方法是,当某个处理需要花费较长时间的时候,我们创建一个并发的进程或线程来处理该请求。实现也很简单,在主线程中定时,定时到期,开新分支。

3、前面两者结合

还是举个例子,比如可以:在启动时不进行预分配,某个处理太长,则创建从进程,任务结束后不退出。

多进程与多线程比较

可以参考这篇论文:Linux下多进程和多线程性能分析  和这篇Blog:多进程or多线程  总结起来,在任务执行效率上,在任务量较大(文中单次5k以上),多进程的效率高点,反之,多线程站优势,但整体上出入不大。而在创建和销毁的效率上,线程的优势明显,约为5~6倍。然后在服务器上,并发量不大(小于几千),预先创建线程也没太大优势,因为动态管理线程的开销亦不可忽略。

线程池

比较全面和概要的介绍可以参看:线程池的介绍及简单实现

基本思路是,预先创建一定数量的线程,让它们处于阻塞状态,占用很小的内存空间。当任务到来时,选择一个空闲的线程执行,完成后线程不退出,继续阻塞。池子的创建销毁和管理有一个线程单独完成。

进一步地,动态地对线程的数量进行管理,负载较大时,增加线程数量。负载小时,减少之,设法让一段时间不活跃的线程退出,比如让线程在等待下一个请求前先启动一个定时器,若请求到达前定时器到期,则线程退出。

对于处理时间短,处理数目巨大的情况,线程池有天然优势。尤其是对性能要求高的应用或者突发性大规模请求,比如电商秒杀神马的。

线程池的实现可以参考libthreadpool,一个开源的库,sourceforge上能找到源码

我用简单的模型实现了一个,功能上基本满足,主从线程的流程如下图所示,唤醒空闲线程的时候不加以区分,即steven说的惊群,这会一定程度上的损失性能。与之对应的是有主线程采取一定的方式对空闲线程的唤醒进行调度以均衡负载和工作量。下面的代码是我的1.0版(改得太乱了,看官们勿怪),更进一步的功能,如动态地改变池子的尺寸,后续继续完善

pthread_pool.h

#ifndef _PTHREAD_POOL_H_
#define _PTHREAD_POOL_H_

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>

#define MAX_PTHREAD_NUM 100

typedef struct task_node{
	void * (*func)(void * p);
	void * arg;
	struct task_node* next;
}task_node;

typedef struct p_pool{
	pthread_cond_t cond;
	pthread_mutex_t mutex;
	task_node *head, *tail;
	pthread_t *p_tid;			//mark , unsigned long
	int max_num;
	int current_num;
	int working_num;
	int if_destory;
	int decrease;
}p_pool;

p_pool * pool = NULL;

void pool_init(int pthread_num);
void *pthread_process(void *arg);
int add_task(void * (*func)(void *p), void *arg);
void pool_destory();

#endif

pthread_pool.c

/*
 *   a simple thread pool
 *				Mon Jun 9 21:44:36 CST 2014
 *              by  Simon Xia
 */
#include"pthread_pool.h"

/* each worker thread's thread function to handle the task */
void *pthread_process(void *arg)
{
	task_node *tmp = NULL;

//	printf("Now in the %lu thread\n", pthread_self());
	while (1) {
		pthread_mutex_lock(&pool->mutex);
//		printf("%lu thread get lock\n", pthread_self());

		while (!pool->head && !pool->if_destory/* && !pool->decrease*/) { //While !  用是否有任务来控制
//			printf("%lu thread will wait\n", pthread_self());
			pthread_cond_wait(&pool->cond, &pool->mutex);
		}

//		printf("%lu thread: signal is coming\n", pthread_self());
		if (pool->if_destory /*|| pool->decrease*/)
			break;

		tmp = pool->head;

		pool->head = pool->head->next;
	//		pool->working_num++;

		pthread_mutex_unlock(&pool->mutex);
//		printf("%lu thread pick task from queue\n", pthread_self());
		(tmp->func)(tmp->arg);
//		printf("%lu thread finish task\n", pthread_self());
		free(tmp);
		tmp = NULL; //mark

		/*
			pthread_mutex_lock(&pool->mutex);
			pool->working_num--;
			pthread_mutex_unlock(&pool->mutex);
			*/

	}
	pthread_mutex_unlock(&pool->mutex);//先解锁!!
	printf("%lu thread will exit\n", pthread_self());
	pthread_exit(NULL);
}

/* main thread function to manage the thread pool */
/*
void *pthread_main(void *arg)
{
	printf("This is main thread\n");
	int i;
	while (1)
	{
		usleep(50000);
		pthread_mutex_lock(&pool->mutex);
		if (pool->if_destory)
			break;
		if (pool->working_num == pool->current_num) {
			for (i = pool->current_num; i < 2 * pool->current_num; i++)
				pthread_create(&pool->p_tid[i], NULL, pthread_process, NULL);
			pool->current_num *= 2;
			printf("The number of thread has been enlarged to %d\n", pool->current_num);
		}
		else if (pool->working_num <= pool->current_num / 4){
			pool->decrease = 1;
			pthread_mutex_unlock(&pool->mutex);
			for (i = 0; i < pool->current_num / 2; i++)
				pthread_cond_signal(&pool->cond);
			pool->current_num /= 2;
			pool->decrease = 0;
			printf("The number of thread has been decrease to %d\n", pool->current_num);
		}
		pthread_mutex_unlock(&pool->mutex);
	}
	pthread_exit(NULL);
}
*/

/* Initialize the thread pool
 * Input: number of worker thread
 */
void pool_init(int pthread_num)
{
	int i = 0;

	pool = (p_pool*)malloc(sizeof(p_pool));
	pthread_mutex_init(&pool->mutex, NULL);
	pthread_cond_init(&pool->cond, NULL);
	pool->head = pool->tail = NULL;
	pool->max_num = MAX_PTHREAD_NUM;
	pool->current_num = pthread_num;
	pool->working_num = 0;
	pool->if_destory = 0;
	pool->decrease = 0;
	pool->p_tid = (pthread_t*)malloc(pthread_num * sizeof(pthread_t));

//	pthread_create(&pool->p_tid[i], NULL, pthread_main, NULL);

	for (i = 0; i < pthread_num; i++)
		pthread_create(&pool->p_tid[i], NULL, pthread_process, NULL);

}

/* add task into task queue */
int add_task(void * (*func)(void *p), void *arg)
{
	task_node *tmp = (task_node*)malloc(sizeof(task_node));
	tmp->func = *func; //Mark
	tmp->arg = arg;
	tmp->next = NULL;

	pthread_mutex_lock(&pool->mutex);
	if (pool->head) {
		pool->tail = pool->tail->next = tmp;
	}
	else {
		pool->tail = pool->head = tmp;
	}

	pthread_mutex_unlock(&pool->mutex);
	//不加不行?
	//printf("Add task %d success!\n",*(int*)tmp->arg);
	//sleep(1);
	pthread_cond_signal(&pool->cond);

	tmp = NULL; //can't free
	return 0;
}

/* destory the pool after all work */
void pool_destory()
{
	int i;

//	pthread_mutex_lock(&pool->mutex);
	pool->if_destory = 1;
//	pthread_mutex_unlock(&pool->mutex);

	pthread_cond_broadcast(&pool->cond);

	for (i = 0; i < pool->current_num; i++)
	{
		if (!pthread_join(pool->p_tid[i], NULL))
			printf("Success to collect thread %lu\n", pool->p_tid[i]);
		else
			printf("Fail to collect thread %lu\n", pool->p_tid[i]);
	}

	free(pool->p_tid);
	free(pool->head);
	free(pool->tail);

	pthread_cond_destroy(&pool->cond);
	pthread_mutex_destroy(&pool->mutex);

	free(pool);
	pool = NULL;
}

基于这个线程池的服务端程序:

#include"simon_socket.h"

#define SERV_PORT 12345
#define THREAD_CNT 10

extern void pool_init(int );
extern int add_task(void* (*) (void*), void*);
extern void pool_destory();

typedef struct client_info{
	int fd;
	struct sockaddr_in addr;
	struct client_info *next;
}client_info;

void *process(void *arg)
{
	process_client(((client_info*)arg)->fd, &((client_info*)arg)->addr);
	return NULL;
}

void sig_int(int signo)
{
	pool_destory();
	exit(0);
}

int main()
{
	int sockfd, acfd;
	size_t sin_len;
	struct sockaddr_in client_addr;
	client_info *info_tmp, *info_head = NULL, *info_tail = NULL;

	signal(SIGINT, sig_int);

	sin_len = sizeof(struct sockaddr);
	sockfd = init_tcp_psock(SERV_PORT);
	pool_init(THREAD_CNT);

	for ( ; ; )
	{
		if ((acfd = accept(sockfd, (struct sockaddr *)&client_addr, &sin_len)) == -1)
		{
			perror("Accept request failed: ");
			return 1;
		}
		else
			printf("Get a connection from %s:%d !\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

		info_tmp = (client_info *)malloc(sizeof(client_info));
		memset(info_tmp, 0, sizeof(client_info));
		info_tmp->fd = acfd;
		info_tmp->addr = client_addr;
		info_tmp->next = NULL;

		if (info_head) {
			info_tail = info_tail -> next = info_tmp;
		}
		else{
			info_head = info_tail = info_tmp;
		}
		add_task(process, (void*)info_tmp);
	}

	for (info_tmp = info_head; info_tmp; free(info_tmp))
		info_head = info_head -> next;

	return 0;
}

吐槽下多线程的调试。。。gdb调试多线程有点蛋疼,单步时很容易pthread_cond_wait把pthread_cond_signal的信号错过,出现各种错误,比如Cannot find bounds of current function等,建议大家还是多做输出,或者用日志的方式调。

set scheduler-locking off|on|step  这个命令还是很有用的,因为用step或者continue命令调试当前被调试线程的时候,其他线程也是同时执行的。

具体地:off 不锁定任何线程,也就是所有线程都执行,这是默认值。 on 只有当前被调试程序会执行。 step 在单步的时候,除了next过一个函数的情况以外,只有当前线程会执行。

反正我最后是老老实实输出。。各位路过大牛有什么好方法,求指点 ~~

linux网络编程学习笔记之五 -----并发机制与线程池

时间: 2024-12-14 06:10:17

linux网络编程学习笔记之五 -----并发机制与线程池的相关文章

linux网络编程学习笔记之五 -----并发机制与线程?

进程线程分配方式 简述下常见的进程和线程分配方式:(好吧,我仅仅是举几个样例作为笔记...并发的水太深了,不敢妄谈...) 1.进程线程预分配 简言之,当I/O开销大于计算开销且并发量较大时,为了节省每次都要创建和销毁进程和线程的开销.能够在请求到达前预先进行分配. 2.进程线程延迟分配 预分配节省了处理时的负担,但操作系统管理这些进程线程也会带来一定的开销.由此,有个折中的方法是,当某个处理须要花费较长时间的时候,我们创建一个并发的进程或线程来处理该请求.实现也非常easy,在主线程中定时,定

linux网络编程学习笔记之四 -----多线程并发服务端

相对于使用进程实现并发,用线程的实现更加轻量.每个线程都是独立的逻辑流.线程是CPU上独立调度运行的最小单位,而进程是资源分配的单位.当然这是在微内核的操作系统上说的,简言之这种操作系统的内核是只提供最基本的OS服务,更多参看点击打开链接 每个线程有它自己的线程上下文,包括一个唯一的线程ID(linux上实现为unsigned long),栈,栈指针,程序计数器.通用目的寄存器和条件码,还有自己的信号掩码和优先级.同一个进程里的线程共享这个进程的整个虚拟地址空间,包括可执行的程序文本.程序的全局

linux网络编程学习笔记之六 -----I/O多路复用服务端

多进程和多线程的目的是在于最大限度地利用CPU资源,当某个进程不需要占用太多CPU资源,而是需要I/O资源时,可以采用I/O多路复用,基本思路是让内核把进程挂起,直到有I/O事件发生时,再把控制返回给程序.这种事件驱动模型的高效之处在于,省去了进程和线程上下文切换的开销.整个程序运行在单一的进程上下文中,所有的逻辑流共享整个进程的地址空间.缺点是,编码复杂,而且随着每个逻辑流并发粒度的减小,编码复杂度会继续上升. I/O多路复用典型应用场合(摘自UNP6.1) select的模型就是这样一个实现

linux网络编程学习笔记之二 -----错误异常处理和各种碎碎(更新中)

errno 在unix系统中对大部分系统调用非正常返回时,通常返回值为-1,并设置全局变量errno(errno.h),如socket(), bind(), accept(), listen().erron存放一个正整数来保存上次出错的错误值. 对线程而言,每个线程都有专用的errno变量,不必考虑同步问题. strerror converts to English (Note: use strerror_r for thread safety) perror is simplified str

linux网络编程学习笔记之三 -----多进程并发服务端

首先是fork()函数.移步APUE 8.3.  比較清晰的解释能够參考http://blog.csdn.net/lingdxuyan/article/details/4993883和http://www.oschina.net/question/195301_62902 补充一点是:fork返回后,原进程中的每一个文件或套接口描写叙述符的引用计数加1(相当于被多打开了一次),每调用一次close,引用计数减1,仅仅有当引用计数减到0时才会真正关闭该套接字. 可运行文件被linux运行的唯一方式

linux网络编程学习笔记之四 -----多-threaded服务器

对于使用过程中并发.通过实现更轻量级线程. 每个线程都是一个独立的逻辑流. 主题是CPU在执行调度的最小独立单位,这个过程是资源分配单元.当然,这是在微内核操作系统说.总之,这是唯一的一个操作系统内核提供了最重要的OS服务,许多人看点击打开链接 每一个线程有它自己的线程上下文.包含一个唯一的线程ID(linux上实现为unsigned long),栈,栈指针.程序计数器.通用目的寄存器和条件码,还有自己的信号掩码和优先级.同一个进程里的线程共享这个进程的整个虚拟地址空间,包含可运行的程序文本.程

转 网络编程学习笔记一:Socket编程

网络编程学习笔记一:Socket编程 “一切皆Socket!” 话虽些许夸张,但是事实也是,现在的网络编程几乎都是用的socket. ——有感于实际编程和开源项目研究. 我们深谙信息交流的价值,那网络中进程之间如何通信,如我们每天打开浏览器浏览网页时,浏览器的进程怎么与web服务器通信的?当你用QQ聊天时,QQ进程怎么与服务器或你好友所在的QQ进程通信?这些都得靠socket?那什么是socket?socket的类型有哪些?还有socket的基本函数,这些都是本文想介绍的.本文的主要内容如下:

嵌入式 Linux网络编程(四)——Select机制

嵌入式 Linux网络编程(四)--Select机制 一.select工作机制 poll和select,都是基于内核函数sys_poll实现的,不同在于在linux中select是从BSD Unix系统继承而来,poll则是从SYSTEM V Unix系统继承而来,因此两种方式相差不大.poll函数没有最大文件描述符数量的限制.poll和 select与一样,大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大. select需要驱动程序的支持,驱动

多线程编程学习笔记——使用并发集合(三)

接上文 多线程编程学习笔记——使用并发集合(一) 接上文 多线程编程学习笔记——使用并发集合(二) 四.   使用ConcurrentBag创建一个可扩展的爬虫 本示例在多个独立的即可生产任务又可消费任务的工作者间如何扩展工作量. 1.程序代码如下. using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Sy