Linux线程池在server上简单应用

一、问题描写叙述

如今以C/S架构为例。client向server端发送要查找的数字,server端启动线程中的线程进行对应的查询。将查询结果显示出来。

二、实现方案

1. 整个project以client、server、lib组织。例如以下图所看到的:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >

2. 进入lib。

socket.h、socket.c

/**
  @file		socket.h
  @brief	Socket API header file

  TCP socket utility functions, it provides simple functions that helps
  to build TCP client/server.

  @author wangzhicheng
 */
#ifndef SOCKET_H
#define SOCKET_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <resolv.h>
#include <fcntl.h>

#define MAX_CONNECTION				20

int	TCPServerInit(int port, int *serverfd);
int	TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr);
int TCPServerSelect(int* serverfdlist, int num, int *clientfd, char *clientaddr);
int	TCPClientInit(int *clientfd);
int	TCPClientConnect(const int clientfd, const char *addr, int port);
int	TCPNonBlockRead(int clientfd, char* buf, int size);
int TCPBlockRead(int clientfd, char* buf, int size);
int	TCPWrite(int clientfd, char* buf, int size);
void TCPClientClose(int sockfd);
void TCPServerClose(int sockfd);

#endif

socket.c

#include "socket.h"
/*
 * @brief	initialize TCP server
 * @port		port number for socket
 * @serverfd	server socket fd
 * return server socked fd for success, on error return error code
 * */
int	TCPServerInit(int port, int *serverfd) {
	struct sockaddr_in dest;
	// create socket , same as client
	*serverfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
	if(*serverfd < 0) return -1;
	/// initialize structure dest
	memset((void*)&dest, '\0', sizeof(dest));
	dest.sin_family = PF_INET;
	dest.sin_port = htons(port);
	dest.sin_addr.s_addr = INADDR_ANY;
	// Assign a port number to socket
	bind( *serverfd, (struct sockaddr*)&dest, sizeof(dest));

	return *serverfd;
}
/*
 * @brief	wait client connect
 * @serverfd	server socket fd
 * @clientfd	client socket fd
 * @clientaddr	client address which connect to server
 * return client fd, on error return error code
 * */
int	TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr) {
	struct sockaddr_in client_addr;
	socklen_t addrlen = sizeof(client_addr);
	// make it listen to socket
	listen( serverfd, 20);
	// Wait and Accept connection
	*clientfd = accept(serverfd, (struct sockaddr*)&client_addr, &addrlen);
	strcpy( clientaddr, (const char *)( inet_ntoa( client_addr.sin_addr)));

	return *clientfd;
}
/*
 * @brief	initialize TCP client
 * @clientfd	client socket fd
 * return client socked fd for success, on error return error code
 */
int	TCPClientInit(int *clientfd) {
	*clientfd = socket(PF_INET, SOCK_STREAM, 0);

	return *clientfd;
}
/*
 * @brief	connect to TCP server
 * @clientfd	client socket fd
 * @addr		server address
 * @port		server port number
 * return 0 for success, on error -1 is returned
 */
int	TCPClientConnect(const int clientfd, const char *addr, int port) {
	struct sockaddr_in dest;
	// initialize value in dest
	memset(&dest, '\0', sizeof(dest));
	dest.sin_family = PF_INET;
	dest.sin_port = htons(port);
	inet_aton(addr, &dest.sin_addr);

	// Connecting to server
	return connect(clientfd, (struct sockaddr*)&dest, sizeof(dest));
}
/*
 * @brief	non-block read from TCP socket
 * @clientfd	socket fd
 * @buf	     	input buffer
 * @size		buffer size
 * return	    the length of read data
 */
int	TCPNonBlockRead(int clientfd, char* buf, int size) {
	int opts;
	opts = fcntl(clientfd, F_GETFL);
	opts = (opts | O_NONBLOCK);
	fcntl(clientfd, F_SETFL, opts);

	return recv(clientfd, buf, size, 0);
}
/*
 * @brief	block read from TCP socket
 * @clientfd	socket fd
 * @buf	  	    input buffer
 * @size		buf size
 * return	    the length of read data
 */
int	TCPBlockRead(int clientfd, char* buf, int size) {
	int opts;
	opts = fcntl(clientfd, F_GETFL);
	opts = (opts & ~O_NONBLOCK);
	fcntl(clientfd, F_SETFL, opts);

	return recv(clientfd, buf, size, 0);
}
/*
 * @brief	write to TCP socket
 * @clientfd	socket fd
 * @buf		    output buf
 * @size		output buf length
 * return	    the length of the actual written data, -1: disconnected
 */
int	TCPWrite(int clientfd, char* buf, int size) {
	int len= 0;
	/* set socket to nonblock */
	int ret = fcntl(clientfd, F_GETFL);
	ret |= O_NONBLOCK;
	if (fcntl(clientfd, F_SETFL, ret) < 0 ) {
		printf("set socket to nonblock fail [%d] !\n", errno);
	}
	len = send(clientfd, buf, size, MSG_NOSIGNAL);

	return len;
}
/*
 * @brief	close the tcp connection
 * @sockfd	socket fd
 * return	none
 */
void TCPConnectionClose(int sockfd) {
	close(sockfd);
}

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
struct job
{
    void* (*callback_function)(void *arg);    //线程回调函数
    void *arg;                                //回调函数參数
    struct job *next;
};

struct threadpool
{
    int thread_num;                   //线程池中开启线程的个数
    int queue_max_num;                //队列中最大job的个数
    struct job *head;                 //指向job的头指针
    struct job *tail;                 //指向job的尾指针
    pthread_t *pthreads;              //线程池中全部线程的pthread_t
    pthread_mutex_t mutex;            //相互排斥信号量
    pthread_cond_t queue_empty;       //队列为空的条件变量
    pthread_cond_t queue_not_empty;   //队列不为空的条件变量
    pthread_cond_t queue_not_full;    //队列不为满的条件变量
    int queue_cur_num;                //队列当前的job个数
    int queue_close;                  //队列是否已经关闭
    int pool_close;                   //线程池是否已经关闭
};

//================================================================================================
//函数名:                   threadpool_init
//函数描写叙述:                 初始化线程池
//输入:                    [in] thread_num     线程池开启的线程个数
//                         [in] queue_max_num  队列的最大job个数
//输出:                    无
//返回:                    成功:线程池地址 失败:NULL
//================================================================================================
struct threadpool* threadpool_init(int thread_num, int queue_max_num);

//================================================================================================
//函数名:                    threadpool_add_job
//函数描写叙述:                  向线程池中加入任务
//输入:                     [in] pool                  线程池地址
//                          [in] callback_function     回调函数
//                          [in] arg                     回调函数參数
//输出:                     无
//返回:                     成功:0 失败:-1
//================================================================================================
int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);

//================================================================================================
//函数名:                    threadpool_destroy
//函数描写叙述:                   销毁线程池
//输入:                      [in] pool                  线程池地址
//输出:                      无
//返回:                      成功:0 失败:-1
//================================================================================================
int threadpool_destroy(struct threadpool *pool);

//================================================================================================
//函数名:                    threadpool_function
//函数描写叙述:                  线程池中线程函数
//输入:                     [in] arg                  线程池地址
//输出:                     无
//返回:                     无
//================================================================================================
void* threadpool_function(void* arg);
#endif

threadpool.c

#include "threadpool.h"

struct threadpool* threadpool_init(int thread_num, int queue_max_num) {
    struct threadpool *pool = NULL;
    do
    {
        pool = malloc(sizeof(struct threadpool));
        if (NULL == pool)
        {
            printf("failed to malloc threadpool!\n");
            break;
        }
        pool->thread_num = thread_num;
        pool->queue_max_num = queue_max_num;
        pool->queue_cur_num = 0;
        pool->head = NULL;
        pool->tail = NULL;
        if (pthread_mutex_init(&(pool->mutex), NULL))
        {
            printf("failed to init mutex!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_empty), NULL))
        {
            printf("failed to init queue_empty!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_empty), NULL))
        {
            printf("failed to init queue_not_empty!\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_full), NULL))
        {
            printf("failed to init queue_not_full!\n");
            break;
        }
        pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
        if (NULL == pool->pthreads)
        {
            printf("failed to malloc pthreads!\n");
            break;
        }
        pool->queue_close = 0;
        pool->pool_close = 0;
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
            pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
        }

        return pool;
    } while (0);

    return NULL;
}
int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg) {
	if(pool == NULL || callback_function == NULL || arg == NULL) return -1;

    pthread_mutex_lock(&(pool->mutex));
    while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
    {
        pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));   //队列满的时候就等待
    }
    if (pool->queue_close || pool->pool_close)    //队列关闭或者线程池关闭就退出
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    struct job *pjob =(struct job*) malloc(sizeof(struct job));
    if (NULL == pjob)
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    pjob->callback_function = callback_function;
    pjob->arg = arg;
    pjob->next = NULL;
    if (pool->head == NULL)
    {
        pool->head = pool->tail = pjob;
        pthread_cond_broadcast(&(pool->queue_not_empty));  //队列空的时候,有任务来时就通知线程池中的线程:队列非空
    }
    else
    {
        pool->tail->next = pjob;
        pool->tail = pjob;
    }
    pool->queue_cur_num++;
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

void* threadpool_function(void* arg) {
    struct threadpool *pool = (struct threadpool*)arg;
    struct job *pjob = NULL;
    while (1)  //死循环
    {
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == 0) && !pool->pool_close)   //队列为空时,就等待队列非空
        {
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
        }
        if (pool->pool_close)   //线程池关闭,线程就退出
        {
            pthread_mutex_unlock(&(pool->mutex));
            pthread_exit(NULL);
        }
        pool->queue_cur_num--;
        pjob = pool->head;
        if (pool->queue_cur_num == 0)
        {
            pool->head = pool->tail = NULL;
        }
        else
        {
            pool->head = pjob->next;
        }
        if (pool->queue_cur_num == 0)
        {
            pthread_cond_signal(&(pool->queue_empty));        //队列为空,就能够通知threadpool_destroy函数,销毁线程函数
        }
        if (pool->queue_cur_num == pool->queue_max_num - 1)
        {
            pthread_cond_broadcast(&(pool->queue_not_full));  //队列非满。就能够通知threadpool_add_job函数,加入新任务
        }
        pthread_mutex_unlock(&(pool->mutex));

        (*(pjob->callback_function))(pjob->arg);   //线程真正要做的工作,回调函数的调用
        free(pjob);
        pjob = NULL;
    }
}
int threadpool_destroy(struct threadpool *pool) {
	if(pool == NULL) return -1;
    pthread_mutex_lock(&(pool->mutex));
    if (pool->queue_close || pool->pool_close)   //线程池已经退出了,就直接返回
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }

    pool->queue_close = 1;        //置队列关闭标志
    while (pool->queue_cur_num != 0)
    {
        pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));  //等待队列为空
    }    

    pool->pool_close = 1;      //置线程池关闭标志
    pthread_mutex_unlock(&(pool->mutex));
    pthread_cond_broadcast(&(pool->queue_not_empty));  //唤醒线程池中正在堵塞的线程
    pthread_cond_broadcast(&(pool->queue_not_full));   //唤醒加入任务的threadpool_add_job函数
    int i;
    for (i = 0; i < pool->thread_num; ++i)
    {
        pthread_join(pool->pthreads[i], NULL);    //等待线程池的全部线程运行完成
    }

    pthread_mutex_destroy(&(pool->mutex));          //清理资源
    pthread_cond_destroy(&(pool->queue_empty));
    pthread_cond_destroy(&(pool->queue_not_empty));
    pthread_cond_destroy(&(pool->queue_not_full));
    free(pool->pthreads);
    struct job *p;
    while (pool->head != NULL)
    {
        p = pool->head;
        pool->head = p->next;
        free(p);
    }
    free(pool);
    return 0;
}

3.进入client

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >

client.c

/*************************************************************************
    > File Name: test.c
    > Author: wangzhicheng
    > Mail: [email protected]
    > Created Time: Fri 03 Oct 2014 09:43:59 PM WST
 ************************************************************************/

#include "socket.h"
const char * serveraddr = "127.0.0.1";
#define TCPPORT 4001
int main() {
	int clientfd = -1;
	char buf[256];
	strcpy(buf, "1");
	if(TCPClientInit(&clientfd) < 0) {
		perror("client init failed...!\n");
		exit(EXIT_FAILURE);
	}
	if(TCPClientConnect(clientfd, serveraddr, TCPPORT)) {
		perror("can not connect to server...!\n");
		exit(EXIT_FAILURE);
	}
	if(TCPWrite(clientfd, buf, strlen(buf) == 1)) {
		printf("send successfully...!\n");
	}
	else printf("send failed...!\n");

	return 0;
}

Makefile

CC=gcc
LIBRARY=../lib
CFLAGS=-I$(LIBRARY)
CXXFLAGS=
OBJS1=client.o  socket.o 

all:	client 

client: $(OBJS1)
	$(CC) -o   [email protected] $(OBJS1) 

socket.o: $(LIBRARY)/socket.c
	$(CC) -c $(LIBRARY)/socket.c

clean:
	rm *.o client  > /dev/null 2>&1

4. 进入server

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >

server.c

/*************************************************************************
    > File Name: server.c
    > Author: ma6174
    > Mail: [email protected]
    > Created Time: Sat 04 Oct 2014 09:46:30 PM WST
 ************************************************************************/

#include "socket.h"
#include "threadpool.h"

#define TCPPORT 4001
#define SIZE 256
#define N 10
int array[N] = {1, 2, 6, 8, 12, 88, 208, 222, 688, 1018};
int find(int low, int high, int m) {
	int mid;
	if(low <= high) {
		mid = (low + high) >> 1;
		if(array[mid] == m) return 1;
		else if(array[mid] > m) return find(low, mid - 1, m);
		else return find(mid + 1, high, m);
	}
	return 0;
}
void* work(void* arg)
{
    int *p = (int *) arg;
	int m = *p;
	if(find(0, N - 1, m)) printf("%d has been found...!\n", m);
	else printf("%d has not been found...!\n", m);
    sleep(1);
}
int main() {
	int serverfd = -1, clientfd = -1;
	char clientaddr[SIZE];
	char buf[SIZE];
	int num;
    struct threadpool *pool = NULL;
	TCPServerInit(TCPPORT, &serverfd);
	if(serverfd < 0) {
		perror("server init failed...!\n");
		exit(EXIT_FAILURE);
	}
    pool = threadpool_init(10, 20);
	while(1) {
		TCPServerWaitConnection(serverfd, &clientfd, clientaddr);
		if(clientfd < 0) {
			perror("can not connect the clients...!\n");
			exit(EXIT_FAILURE);
		}
		if(TCPBlockRead(clientfd, buf, SIZE) <= 0) {
			perror("can not read from client...!\n");
			sleep(1);
		}
		else {
			num = atoi(buf);
			threadpool_add_job(pool, work, &num);
		}
	}
    threadpool_destroy(pool);

	return 0;
}

Makefile

CC=gcc
LIBRARY=../lib
CFLAGS=-I$(LIBRARY)
CXXFLAGS=
OBJS1=server.o  socket.o threadpool.o

all:	server

server: $(OBJS1)
	$(CC) -o   [email protected] $(OBJS1) -lpthread

socket.o: $(LIBRARY)/socket.c
	$(CC) -c $(LIBRARY)/socket.c

threadpool.o: $(LIBRARY)/threadpool.c
	$(CC) -c $(LIBRARY)/threadpool.c
clean:
	rm *.o client  > /dev/null 2>&1

三、測试

四、有关线程池的说明

当线程池被创建时,线程池中有些“空”的线程。即不运行任务,每当一个任务被增加进来时,任务就被组织成任务队列,线程依照队列队头出。队尾进的原则取出头任务运行。

任务队列中所含任务数必须控制在一个上限内。超过上限时。任务被堵塞。当全部任务被运行完,销毁线程池。

时间: 2024-10-01 03:05:21

Linux线程池在server上简单应用的相关文章

Linux线程池在服务器上简单应用

一.问题描述 现在以C/S架构为例,客户端向服务器端发送要查找的数字,服务器端启动线程中的线程进行相应的查询,将查询结果显示出来. 二.实现方案 1. 整个工程以client.server.lib组织,如下图所示: 2. 进入lib, socket.h.socket.c /** @file socket.h @brief Socket API header file TCP socket utility functions, it provides simple functions that h

Linux下线程池的理解与简单实现

首先,线程池是什么?顾名思义,就是把一堆开辟好的线程放在一个池子里统一管理,就是一个线程池. 其次,为什么要用线程池,难道来一个请求给它申请一个线程,请求处理完了释放线程不行么?也行,但是如果创建线程和销毁线程的时间比线程处理请求的时间长,而且请求很多的情况下,我们的CPU资源都浪费在了创建和销毁线程上了,所以这种方法的效率比较低,于是,我们可以将若干已经创建完成的线程放在一起统一管理,如果来了一个请求,我们从线程池中取出一个线程来处理,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的

线程池的理解与简单实现

池 由于服务器的硬件资源"充裕",那么提高服务器性能的一个很直接的方法就是以空间换时间,即"浪费"服务器的硬件资源,以换取其运行效率.这就是池的概念. 池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化,这称为静态资源分配. 当服务器进入正式运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配.很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的. 当服务器处理完一个客户

【Android】线程池原理及Java简单实现

线程池简介 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力. 假设一个服务器完成一项任务所需时间为: T1 创建线程时间 T2 在线程中执行任务的时间 T3 销毁线程时间 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能. 一个线程池包括以下四个基本组成部分: 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务: 工作线程(PoolWorker):线程池中线程,

Linux C编程之二十二 Linux线程池实现

一.线程池实现原理 1. 管理者线程 (1)计算线程不够用 创建线程 (2) 空闲线程太多 a. 销毁 更新要销毁的线程个数 通过条件变量完成的 b. 如果空闲太多,任务不够 线程阻塞在该条件变量上 c. 发送信号 pthread_cond_signal 2. 线程池中的线程 (1)从任务队列中取数据 任务队列任务 执行任务 (2)销毁空闲的线程 让线程执行pthread_exit 阻塞空闲的线程收到信号: 解除阻塞          只有一个往下执行          在执行任务之前做了销毁操

linux线程池分析

一. 线程池学习文件 pool_test/  -> 线程池函数接口实现源码,简单实例. 系统编程项目接口设计说明书.doc  -> 详细说明了线程池各个函数的头文件/原型/参数/返回值... 线程池模型.jpg  -> 帮助大家理解线程池原理. 二. 学习线程池实现过程? 1. 什么是线程池? 线程池就是多个线程组合起来的一个集合,当有任务时,线程就会处理任务,当没有任务时,线程休息. 2. 分析线程池源码 thread_pool.c  -> 线程池函数接口源码 thread_po

linux线程池

typedef struct task_queue { pthread_mutex_t mutex; pthread_cond_t cond; /* when no task, the manager thread wait for ;when a new task come, signal. */ struct task_node *head; /* point to the task_link. */ int number; /* current number of task, includ

Linux线程池的实现

线程池的实现 1:自定义封装的条件变量 1 //condition.h 2 #ifndef _CONDITION_H_ 3 #define _CONDITION_H_ 4 5 #include <pthread.h> 6 7 typedef struct condition 8 { 9 pthread_mutex_t pmutex; 10 pthread_cond_t pcond; 11 }condition_t; 12 13 int condition_init(condition_t *c

C++11消息队列 + Qt线程池 + QRunnable执行任务简单模型

1.模板类queue,包含头文件<queue>中,是一个FIFO队列. queue.push():在队列尾巴增加数据 queue.pop():移除队列头部数据 queue.font():获取队列头部数据的引用... 2.Qt库的线程池,QThreadPool QThreadPool.setMaxThreadCount():设置线程池最大线程数 QThreadPool.start(new QRunnable(..)):开启线程池调用QRunnable 3.QRunnable执行任务 void r