一、问题描写叙述
如今以C/S架构为例。client向server端发送要查找的数字,server端启动线程中的线程进行对应的查询。将查询结果显示出来。
二、实现方案
1. 整个project以client、server、lib组织。例如以下图所看到的:
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >
2. 进入lib。
socket.h、socket.c
/** @file socket.h @brief Socket API header file TCP socket utility functions, it provides simple functions that helps to build TCP client/server. @author wangzhicheng */ #ifndef SOCKET_H #define SOCKET_H #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <sys/socket.h> #include <sys/types.h> #include <resolv.h> #include <fcntl.h> #define MAX_CONNECTION 20 int TCPServerInit(int port, int *serverfd); int TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr); int TCPServerSelect(int* serverfdlist, int num, int *clientfd, char *clientaddr); int TCPClientInit(int *clientfd); int TCPClientConnect(const int clientfd, const char *addr, int port); int TCPNonBlockRead(int clientfd, char* buf, int size); int TCPBlockRead(int clientfd, char* buf, int size); int TCPWrite(int clientfd, char* buf, int size); void TCPClientClose(int sockfd); void TCPServerClose(int sockfd); #endif
socket.c
#include "socket.h" /* * @brief initialize TCP server * @port port number for socket * @serverfd server socket fd * return server socked fd for success, on error return error code * */ int TCPServerInit(int port, int *serverfd) { struct sockaddr_in dest; // create socket , same as client *serverfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if(*serverfd < 0) return -1; /// initialize structure dest memset((void*)&dest, '\0', sizeof(dest)); dest.sin_family = PF_INET; dest.sin_port = htons(port); dest.sin_addr.s_addr = INADDR_ANY; // Assign a port number to socket bind( *serverfd, (struct sockaddr*)&dest, sizeof(dest)); return *serverfd; } /* * @brief wait client connect * @serverfd server socket fd * @clientfd client socket fd * @clientaddr client address which connect to server * return client fd, on error return error code * */ int TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr) { struct sockaddr_in client_addr; socklen_t addrlen = sizeof(client_addr); // make it listen to socket listen( serverfd, 20); // Wait and Accept connection *clientfd = accept(serverfd, (struct sockaddr*)&client_addr, &addrlen); strcpy( clientaddr, (const char *)( inet_ntoa( client_addr.sin_addr))); return *clientfd; } /* * @brief initialize TCP client * @clientfd client socket fd * return client socked fd for success, on error return error code */ int TCPClientInit(int *clientfd) { *clientfd = socket(PF_INET, SOCK_STREAM, 0); return *clientfd; } /* * @brief connect to TCP server * @clientfd client socket fd * @addr server address * @port server port number * return 0 for success, on error -1 is returned */ int TCPClientConnect(const int clientfd, const char *addr, int port) { struct sockaddr_in dest; // initialize value in dest memset(&dest, '\0', sizeof(dest)); dest.sin_family = PF_INET; dest.sin_port = htons(port); inet_aton(addr, &dest.sin_addr); // Connecting to server return connect(clientfd, (struct sockaddr*)&dest, sizeof(dest)); } /* * @brief non-block read from TCP socket * @clientfd socket fd * @buf input buffer * @size buffer size * return the length of read data */ int TCPNonBlockRead(int clientfd, char* buf, int size) { int opts; opts = fcntl(clientfd, F_GETFL); opts = (opts | O_NONBLOCK); fcntl(clientfd, F_SETFL, opts); return recv(clientfd, buf, size, 0); } /* * @brief block read from TCP socket * @clientfd socket fd * @buf input buffer * @size buf size * return the length of read data */ int TCPBlockRead(int clientfd, char* buf, int size) { int opts; opts = fcntl(clientfd, F_GETFL); opts = (opts & ~O_NONBLOCK); fcntl(clientfd, F_SETFL, opts); return recv(clientfd, buf, size, 0); } /* * @brief write to TCP socket * @clientfd socket fd * @buf output buf * @size output buf length * return the length of the actual written data, -1: disconnected */ int TCPWrite(int clientfd, char* buf, int size) { int len= 0; /* set socket to nonblock */ int ret = fcntl(clientfd, F_GETFL); ret |= O_NONBLOCK; if (fcntl(clientfd, F_SETFL, ret) < 0 ) { printf("set socket to nonblock fail [%d] !\n", errno); } len = send(clientfd, buf, size, MSG_NOSIGNAL); return len; } /* * @brief close the tcp connection * @sockfd socket fd * return none */ void TCPConnectionClose(int sockfd) { close(sockfd); }
threadpool.h
#ifndef THREADPOOL_H #define THREADPOOL_H #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> struct job { void* (*callback_function)(void *arg); //线程回调函数 void *arg; //回调函数參数 struct job *next; }; struct threadpool { int thread_num; //线程池中开启线程的个数 int queue_max_num; //队列中最大job的个数 struct job *head; //指向job的头指针 struct job *tail; //指向job的尾指针 pthread_t *pthreads; //线程池中全部线程的pthread_t pthread_mutex_t mutex; //相互排斥信号量 pthread_cond_t queue_empty; //队列为空的条件变量 pthread_cond_t queue_not_empty; //队列不为空的条件变量 pthread_cond_t queue_not_full; //队列不为满的条件变量 int queue_cur_num; //队列当前的job个数 int queue_close; //队列是否已经关闭 int pool_close; //线程池是否已经关闭 }; //================================================================================================ //函数名: threadpool_init //函数描写叙述: 初始化线程池 //输入: [in] thread_num 线程池开启的线程个数 // [in] queue_max_num 队列的最大job个数 //输出: 无 //返回: 成功:线程池地址 失败:NULL //================================================================================================ struct threadpool* threadpool_init(int thread_num, int queue_max_num); //================================================================================================ //函数名: threadpool_add_job //函数描写叙述: 向线程池中加入任务 //输入: [in] pool 线程池地址 // [in] callback_function 回调函数 // [in] arg 回调函数參数 //输出: 无 //返回: 成功:0 失败:-1 //================================================================================================ int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg); //================================================================================================ //函数名: threadpool_destroy //函数描写叙述: 销毁线程池 //输入: [in] pool 线程池地址 //输出: 无 //返回: 成功:0 失败:-1 //================================================================================================ int threadpool_destroy(struct threadpool *pool); //================================================================================================ //函数名: threadpool_function //函数描写叙述: 线程池中线程函数 //输入: [in] arg 线程池地址 //输出: 无 //返回: 无 //================================================================================================ void* threadpool_function(void* arg); #endif
threadpool.c
#include "threadpool.h" struct threadpool* threadpool_init(int thread_num, int queue_max_num) { struct threadpool *pool = NULL; do { pool = malloc(sizeof(struct threadpool)); if (NULL == pool) { printf("failed to malloc threadpool!\n"); break; } pool->thread_num = thread_num; pool->queue_max_num = queue_max_num; pool->queue_cur_num = 0; pool->head = NULL; pool->tail = NULL; if (pthread_mutex_init(&(pool->mutex), NULL)) { printf("failed to init mutex!\n"); break; } if (pthread_cond_init(&(pool->queue_empty), NULL)) { printf("failed to init queue_empty!\n"); break; } if (pthread_cond_init(&(pool->queue_not_empty), NULL)) { printf("failed to init queue_not_empty!\n"); break; } if (pthread_cond_init(&(pool->queue_not_full), NULL)) { printf("failed to init queue_not_full!\n"); break; } pool->pthreads = malloc(sizeof(pthread_t) * thread_num); if (NULL == pool->pthreads) { printf("failed to malloc pthreads!\n"); break; } pool->queue_close = 0; pool->pool_close = 0; int i; for (i = 0; i < pool->thread_num; ++i) { pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool); } return pool; } while (0); return NULL; } int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg) { if(pool == NULL || callback_function == NULL || arg == NULL) return -1; pthread_mutex_lock(&(pool->mutex)); while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close)) { pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex)); //队列满的时候就等待 } if (pool->queue_close || pool->pool_close) //队列关闭或者线程池关闭就退出 { pthread_mutex_unlock(&(pool->mutex)); return -1; } struct job *pjob =(struct job*) malloc(sizeof(struct job)); if (NULL == pjob) { pthread_mutex_unlock(&(pool->mutex)); return -1; } pjob->callback_function = callback_function; pjob->arg = arg; pjob->next = NULL; if (pool->head == NULL) { pool->head = pool->tail = pjob; pthread_cond_broadcast(&(pool->queue_not_empty)); //队列空的时候,有任务来时就通知线程池中的线程:队列非空 } else { pool->tail->next = pjob; pool->tail = pjob; } pool->queue_cur_num++; pthread_mutex_unlock(&(pool->mutex)); return 0; } void* threadpool_function(void* arg) { struct threadpool *pool = (struct threadpool*)arg; struct job *pjob = NULL; while (1) //死循环 { pthread_mutex_lock(&(pool->mutex)); while ((pool->queue_cur_num == 0) && !pool->pool_close) //队列为空时,就等待队列非空 { pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex)); } if (pool->pool_close) //线程池关闭,线程就退出 { pthread_mutex_unlock(&(pool->mutex)); pthread_exit(NULL); } pool->queue_cur_num--; pjob = pool->head; if (pool->queue_cur_num == 0) { pool->head = pool->tail = NULL; } else { pool->head = pjob->next; } if (pool->queue_cur_num == 0) { pthread_cond_signal(&(pool->queue_empty)); //队列为空,就能够通知threadpool_destroy函数,销毁线程函数 } if (pool->queue_cur_num == pool->queue_max_num - 1) { pthread_cond_broadcast(&(pool->queue_not_full)); //队列非满。就能够通知threadpool_add_job函数,加入新任务 } pthread_mutex_unlock(&(pool->mutex)); (*(pjob->callback_function))(pjob->arg); //线程真正要做的工作,回调函数的调用 free(pjob); pjob = NULL; } } int threadpool_destroy(struct threadpool *pool) { if(pool == NULL) return -1; pthread_mutex_lock(&(pool->mutex)); if (pool->queue_close || pool->pool_close) //线程池已经退出了,就直接返回 { pthread_mutex_unlock(&(pool->mutex)); return -1; } pool->queue_close = 1; //置队列关闭标志 while (pool->queue_cur_num != 0) { pthread_cond_wait(&(pool->queue_empty), &(pool->mutex)); //等待队列为空 } pool->pool_close = 1; //置线程池关闭标志 pthread_mutex_unlock(&(pool->mutex)); pthread_cond_broadcast(&(pool->queue_not_empty)); //唤醒线程池中正在堵塞的线程 pthread_cond_broadcast(&(pool->queue_not_full)); //唤醒加入任务的threadpool_add_job函数 int i; for (i = 0; i < pool->thread_num; ++i) { pthread_join(pool->pthreads[i], NULL); //等待线程池的全部线程运行完成 } pthread_mutex_destroy(&(pool->mutex)); //清理资源 pthread_cond_destroy(&(pool->queue_empty)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); free(pool->pthreads); struct job *p; while (pool->head != NULL) { p = pool->head; pool->head = p->next; free(p); } free(pool); return 0; }
3.进入client
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >
client.c
/************************************************************************* > File Name: test.c > Author: wangzhicheng > Mail: [email protected] > Created Time: Fri 03 Oct 2014 09:43:59 PM WST ************************************************************************/ #include "socket.h" const char * serveraddr = "127.0.0.1"; #define TCPPORT 4001 int main() { int clientfd = -1; char buf[256]; strcpy(buf, "1"); if(TCPClientInit(&clientfd) < 0) { perror("client init failed...!\n"); exit(EXIT_FAILURE); } if(TCPClientConnect(clientfd, serveraddr, TCPPORT)) { perror("can not connect to server...!\n"); exit(EXIT_FAILURE); } if(TCPWrite(clientfd, buf, strlen(buf) == 1)) { printf("send successfully...!\n"); } else printf("send failed...!\n"); return 0; }
Makefile
CC=gcc LIBRARY=../lib CFLAGS=-I$(LIBRARY) CXXFLAGS= OBJS1=client.o socket.o all: client client: $(OBJS1) $(CC) -o [email protected] $(OBJS1) socket.o: $(LIBRARY)/socket.c $(CC) -c $(LIBRARY)/socket.c clean: rm *.o client > /dev/null 2>&1
4. 进入server
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >
server.c
/************************************************************************* > File Name: server.c > Author: ma6174 > Mail: [email protected] > Created Time: Sat 04 Oct 2014 09:46:30 PM WST ************************************************************************/ #include "socket.h" #include "threadpool.h" #define TCPPORT 4001 #define SIZE 256 #define N 10 int array[N] = {1, 2, 6, 8, 12, 88, 208, 222, 688, 1018}; int find(int low, int high, int m) { int mid; if(low <= high) { mid = (low + high) >> 1; if(array[mid] == m) return 1; else if(array[mid] > m) return find(low, mid - 1, m); else return find(mid + 1, high, m); } return 0; } void* work(void* arg) { int *p = (int *) arg; int m = *p; if(find(0, N - 1, m)) printf("%d has been found...!\n", m); else printf("%d has not been found...!\n", m); sleep(1); } int main() { int serverfd = -1, clientfd = -1; char clientaddr[SIZE]; char buf[SIZE]; int num; struct threadpool *pool = NULL; TCPServerInit(TCPPORT, &serverfd); if(serverfd < 0) { perror("server init failed...!\n"); exit(EXIT_FAILURE); } pool = threadpool_init(10, 20); while(1) { TCPServerWaitConnection(serverfd, &clientfd, clientaddr); if(clientfd < 0) { perror("can not connect the clients...!\n"); exit(EXIT_FAILURE); } if(TCPBlockRead(clientfd, buf, SIZE) <= 0) { perror("can not read from client...!\n"); sleep(1); } else { num = atoi(buf); threadpool_add_job(pool, work, &num); } } threadpool_destroy(pool); return 0; }
Makefile
CC=gcc LIBRARY=../lib CFLAGS=-I$(LIBRARY) CXXFLAGS= OBJS1=server.o socket.o threadpool.o all: server server: $(OBJS1) $(CC) -o [email protected] $(OBJS1) -lpthread socket.o: $(LIBRARY)/socket.c $(CC) -c $(LIBRARY)/socket.c threadpool.o: $(LIBRARY)/threadpool.c $(CC) -c $(LIBRARY)/threadpool.c clean: rm *.o client > /dev/null 2>&1
三、測试
四、有关线程池的说明
当线程池被创建时,线程池中有些“空”的线程。即不运行任务,每当一个任务被增加进来时,任务就被组织成任务队列,线程依照队列队头出。队尾进的原则取出头任务运行。
任务队列中所含任务数必须控制在一个上限内。超过上限时。任务被堵塞。当全部任务被运行完,销毁线程池。