基于线程池的线程调度管控系统

本文将详细描述“基于线程池的线程调度管控系统”的实现原理,以梳理当时的编程思路。

简单叙述一下此线程池的开发背景:客户端是批量运行的,虽然客户端均运行在服务器上,但是大量客户端运行时它们对机器资源是抢占式的,所以客户端在大规模运行时与单次运行时的运行效果是不一样的,因为相同客户端在单次运行与大规模运行时所占有的资源量是不同的,理论上说大规模运行时客户端的数量越多,每个客户端所占有的资源量就越少,于是我们认为解决问题的关键应该是不管是单次运行还是大规模运行,相同客户端所获取的资源量就应该是相同的,为此我们尝试过使用container或者docker来解决问题,熟悉container或docker的应该都知道,它们可以对资源(所占用的cpu时间、memory、IO、Internet等)进行限制,对资源进行限制后,每个客户端就运行在container中,这样不就解决客户端单次运行与大规模运行时占有资源量不同的情况了吗?当时我们也这么乐观的认为,但是测试方认为添加container后对资源进行了限制,这或许会影响客户端的运行效果,比如某客户端需要占用的memory为100M,我们强行让其运行在只分配80M的container中,此客户端永远不会达到理想的运行效果;而假如我们让其运行在分配120M的container中,虽然可以达到理想的运行效果,但是又造成的资源浪费,白白浪费了20M的资源。况且客户端对于我们来说是黑盒的,我们不知道应该给它分配多少资源就可以让它运行出理想效果,分配多了造成资源浪费,分配少了又达不到理想的运行效果,因此我们在运行客户端之前不能所以然的假设它的理想资源占有量。这也否决了container和docker的实现方案。测试方要求“在保证实际运行效果的前提下,实现资源利用率的最大化”。

于是提出了线程池的资源管控方案

(1) 在启动客户端时记录对应的ID,当该客户端在启动线程时记录ID与TID的对应关系(或者从peb里面获取),客户端退出时分别将对应关系删除 ; (2) 当有新线程创建时候(无论是否主线程),判断被记录的线程数+1是否大于当前服务器可并发执行的最大线程数,如果小于等于,则继续执行,如果大于则调用逻辑3 ;  (3)  查询当前线程数量最大的客户端,获取该客户端的线程数量,暂停此客户端(保证一个客户端的线程在同时工作),调用逻辑4 ;  (4)  当前服务器可执行最大线程数减去被暂停客户端的线程数,得到可执行线程数,调用逻辑5;
(5)  查询客户端线程关系,根据线程池空闲线程数量,计算出最理想被唤醒的客户端,调用逻辑6;  (6)  恢复这些线程执行,即运行暂停态的客户端。

有了管控方案,接下来就是具体的实现

该程序的架构模型取自《Linux高性能服务器编程》,是国内的大神写的,从销量来看这本书是相当不错的。言归正传,服务端基于epoll机制来实现的,说起IO复用,首先就会想到的是select、poll、epoll,对于三者的特点也是技术面试最容易问到的点,那么我们就说一说三者的优点。首先epoll是不可移植的,这不同于select和poll,select和poll是可移植的;其次epoll是基于事件通知机制实现的,而select和poll是基于轮询机制实现的;再有就是epoll的执行效率,epoll对于所监视的描述符的数量没有限制,而且epoll的监视性能不会随着监视描述符数量的增多而下降,反观select和poll,select对于监视描述符的数量是有限制的,通常好像是1024(应该与系统也有一定关系)其次select的监视效率会随着监视描述符数量的增多而下降,虽然poll对于所监视的描述符的数量没有限制,但是poll的监视性能会也随着监视描述符数量的增多而下降。虽然epoll是不可移植的,但是开发人员仍然喜欢使用epoll的原因应该就是看中的epoll监视fd的高效性。epoll的相关函数有3
个:epoll_create、epoll_ctl、epoll_wait,对于三者的使用可以直接查看man手册。

再来看具体实现,线程池方案有三方:服务端、客户端、脚本端,其中脚本端用于批量启动客户端,而且客户端是运行在container中的,服务端使用本地域套接字来与客户端实现通信,使用命名管道与后台脚本通信,但是管道有两个:pool_to_shell用于服务端写,脚本端读、shell_to_pool用于脚本写,服务端读,用户首先上传一批客户端,然后脚本会获取所有的客户端,并且记录当前的客户端数量,然后往管道中shell_to_pool写端写数量,服务端从shell_to_pool读端读取待启动数量后,根据当前线程池所支持的线程数计算出一个可以实际运行的客户端数量,然后再往pool_to_shell写端写数量,脚本端从pool_to_shell读端读取数量后,根据数量启动客户端。例如用户上传500个客户端,此时脚本往管道shell_to_pool写端写“+500”,服务端从shel_to_pooll读端读取500后,根据当前空闲数量计算实际可运行的客户端数量(假如线程池最大支持5000个线程同时运行,我们按每个客户端平均线程数量16来计算,而刚开始时线程池中没有运行任何线程,因此我们计算出实际可以运行的客户端数量为5000/16=312),然后就往pool_to_shell写端写入“312+”,脚本端从pool_to_shell读端读取312后,启动312个客户端,然后不断的交替执行整个过程。为什么写数字的同时还要些一个“+”呢?我们通过“+”来区分各次的写入情况,假如没有规定分割符,第一次往某管道写端写入10,管道读端还没有读取数据,便发生了第二次写入,写了一个20,此时有进程从读端读取数量,结果直接读取1020,这显然误解了程序真正的意图。对于具体实现,我们用queue_number记录从管道pool_to_shell读取的数量,用start_number记录待启动样本的数量,有值代表还有实际未启动的,为0代表全部启动,当客户端启动之初便会于服务端建立连接,此时start_number--,假如线程池执行一段满载状态之后,有客户端被暂停或退出,此时空闲出部分线程数量,首先我们应该明确的是当前是否仍然存在暂停态的客户端,然后查看当前暂停的客户端的线程数量是否满足唤醒条件,满足就唤醒,不满足则继续查找下一个暂停态的客户端,假如查找了所有暂停的客户端均不满足唤醒条件,但是还有等待启动的客户端,此时我们根据当前总线程数量和start_number计算出可以启动的客户端数量,然后再往管道中pool_to_shell中写数量。

服务端创建本地域套接字listener,然后将listener加入epoll进行监听,所有的客户端在启动之初便与与listener相连。刚才谈到了epoll,但是并没说明epoll需要注意的点,那就是epoll仅会通知你某fd上有事件,但是对于事件到来的次数并不明确,假如某fd第一次有事件时你未进行处理,紧接着便发生了第2、3、4次事件,而你在发生第4次事件后才处理,假如你仅对该fd处理了一次,那么就忽略了3次事件,这显然是不正确的,因此我们应该最大程度的避免这种情况。于是我们的做法是将服务端epoll监听的fd都设置为非阻塞模式,而客户端与服务端通信的fd设置为阻塞模式,需要说明的是假如你open一个fd,它默认就是阻塞的,因此对于客户端的fd我们没必要进行处理,而服务端fd需要用fcntl设置为非阻塞模式,客户端在线程启动的时候便会通过send往服务端发消息,之后便执行recv等待服务端的应答,此时客户端相当于阻塞在recv函数,服务端在处理完该fd上的事件之后便send打断客户端在recv的阻塞,这样以来,假如即使服务端没有响应某客户端的事件,相同客户端的事件便不会到来多次。说道底,epoll的事件是以fd为单位的,虽然我们在客户端运行中避免了某fd有事件,未进行处理时便发生后续事件的情况,但是服务端与众多客户端相连接的fd的事件又该如何处理呢?我们好像无法控制客户端一个一个的按序连接,因此对于listener上的连接事件,我们应该循环处理,当listener上有事件时,说明有客户端与服务端相连接,我们循环accept,直到把所有的连接事件处理完。

对于其他的实现,我们用循环双向链表记录客户端与客户端的线程数量,因为客户端与服务端连接成功后便产生了一个用于双方通信的fd,我们用此fd来区分不同的客户端,循环链表的数据插入用的是数据前插,因为在出现暂停态的客户端后,我们是从链表头开始依次往后查找数据,所以最开始运行的客户端就应该最先处理。

用pool->total_thread_number记录当前总线程数量,理论上说刚开始其值为0,当所有客户端运行完毕后,其值也应该为0,对于此值的处理尤为重要,因为pool->total_thread_number直接决定客户端的运行数量,当某客户端启动线程时pool->total_thread_number++,线程退出时pool->total_thread_number--,当线程总线程数量超过最大值时,暂停当前线程数量最大的客户端,然后pool->total_thread_number减去被暂停客户端的线程数量,此时线程池中空缺出部分线程数量,然后查看链表中是否有可以满足唤醒条件(暂停态客户端线程总数
< 线程池空缺线程数)的客户端,如果存在唤醒某客户端,然后pool->total_thread_number加上被唤醒客户端的线程数量,并唤醒该客户端,如果所有暂停态沙箱均不满足唤醒条件,查看是否存在等待启动的客户端,存在则根据(MAX_THREAD_NUMBER - pool->total_thread_number - (start_number<<4))>>4 计算可启动样本数量。但是其中需要的点是假如某客户端已经处于暂停态,我们无须再根据客户端线程数量更新pool->total_thread_number数值,例如:线程数为20的客户端和线程池为50的客户端均在处理,但是20的先处理,处理完发现需要线程总数超标,需要暂停某客户端,而暂停的恰巧就是线程数为50的客户端,而处理50的客户端时,应该仅是将链表中的线程数更新,而不再更新pool->total_thread_number,因为线程数为50的客户端已经被暂停了,在暂停时pool->total_thread_number已经减去了暂停态客户端线程总线;再有客户端退出时,pool->total_thread_number应该减去退出客户端的线程数量,但是在减去该客户端的线程数量之前,也应该判断该客户端的状态是否为暂停态,例如:线程数为30的客户端和线程数为40的客户端均在处理,但是30的先处理,但在处理之前或之后已经将40的客户端设置为暂停态(此时pool->total_thread_number已经减去了40),而在处理40的客户端时发现它为退出事件,此时我们应该删除链表节点,pool->total_thread_number减去40,但是实际上在40被暂停的时候pool->total_thread_number已经减去了40,因此我们没必要再减40,此时再减势必会影响pool->total_thread_number的最终数值。

-------------------------------------------------- 代码如下 ------------------------------------------------------------

/*                    ------------ 线程池调度管控 ------------

*    利用线程池实现客户端总线程管控,若客户端总线程数大于线程池线程总数,暂停线程数最大的客户端,

*    再根据线程池空闲线程数唤醒某个处于暂停态的客户端,若不存在可唤醒沙客户端则写管道运行新客户端,

*    但是唤醒客户端时处于暂停态的客户端优先级高于未运行的客户端;

*    添加线程池的目的是:

*        根据线程池的线程数量管控大规模时客户端总线程数量,避免线程数过多时

*        导致频繁地切换线程耗费系统资源,时刻保证线程池处于满载或接近满载运行状态。

* *******************************************************************************************/

#include <stdio.h>

#include <fcntl.h>

#include <stdlib.h>

#include <unistd.h>

#include <errno.h>

#include <string.h>

#include <pthread.h>

#include <sys/socket.h>

#include <sys/types.h>

#include <sys/stat.h>

#include <sys/un.h>

#include <sys/epoll.h>

#include <signal.h>

#define    NAMESIZE                    128

#define BUFSIZE                        128

#define MAX_EVENTS                    10000

#define MAX_FAILED_COUNT            10

#define POOL_THREAD_NUMBER            20

#define MAX_THREAD_NUMBER            5000

#define handle_error(msg)            do{perror(msg);exit(EXIT_FAILURE);}while(0)

/* 客户端状态 */

enum sandbox_state_en

{

RUNNING=1,

SUSPEND,

WAITING,

OVER,/* 链表头状态 */

};

/* 客户端与服务器用于传输数据的结构体 */

struct sandbox_message_st

{

int sandbox_cmd;

char container_name[NAMESIZE];//容器名

};

/* 任务队列结构体 */

struct task_list_st

{

int connect_fd;

struct sandbox_message_st sandbox_message;

int sandbox_thread_number;

enum sandbox_state_en sandbox_state;

int resume_failed_count;

struct task_list_st *prev;

struct task_list_st *next;

};

/* 线程池结构体,多线程共享数据时应考虑数据同步问题 */

struct thread_pool_st

{

int fd;

int shutdown;

int total_thread_number;

pthread_mutex_t mutex_fd;

pthread_mutex_t mutex_number;

pthread_cond_t cond_main;

pthread_cond_t cond_equal;

pthread_t *tid;

};

static int epfd;/* 主线程添加实例,对等线程删除实例 */

static struct epoll_event events[MAX_EVENTS];

static int queue_number;//用于记录从管道读取的数量

static int start_number;//用于记录待启动样本的数量,有值代表还有实际未启动的,为0代表全部启动

static const char fifo_1[] = "/opt/malware_sandbox_deploy/thread_pool/sh_to_pool";

static const char fifo_2[] = "/opt/malware_sandbox_deploy/thread_pool/pool_to_sh";

static int fifo_wfd = -1;//pool_to_sh写端fd

static int fifo_rfd = -1;//sh_to_pool读端fd

static struct thread_pool_st    *pool;

static struct task_list_st        *head;

void task_list_init(void);

void pool_init(void);

void handle_connection(void);

void *thread_function(void *);

void setnonblocking(int);

void add_task_list(int);

int delete_task_list(int);

void print_task_list(void);

int modify_task_list(int, struct sandbox_message_st);

struct task_list_st *find_suspend_sandbox(void);

struct task_list_st *find_resume_sandbox(int, int);

int min(int, int);

void suspend_sandbox(char *);

void resume_sandbox(char *);

void pool_destory(void);

void task_list_init(void)

{

head = (struct task_list_st *)malloc(sizeof(struct task_list_st));

if(head == NULL)

handle_error("malloc->head");

head->connect_fd = -1;

memset(head->sandbox_message.container_name, ‘\0‘, NAMESIZE);

head->sandbox_message.sandbox_cmd = -1;

head->sandbox_thread_number = 0;

head->sandbox_state = OVER;

head->resume_failed_count = 0;

head->prev = head;

head->next = head;

}

void pool_init(void)

{

int var;

int err;

pool = (struct thread_pool_st *)malloc(sizeof(struct thread_pool_st));

if(pool == NULL)

handle_error("malloc->pool");

pool->fd = -1;

pool->shutdown = 0;

pool->total_thread_number = 0;

pthread_mutex_init(&(pool->mutex_fd), NULL);

pthread_mutex_init(&(pool->mutex_number), NULL);

pthread_cond_init(&(pool->cond_main), NULL);

pthread_cond_init(&(pool->cond_equal), NULL);

pool->tid = (pthread_t *)malloc(sizeof(pthread_t) * POOL_THREAD_NUMBER);

if(pool->tid == NULL)

handle_error("malloc->pool->tid");

for(var = 0; var < POOL_THREAD_NUMBER; var++)

{

err = pthread_create(&(pool->tid[var]), NULL, thread_function, NULL);//暂时未传参

if(err)

fprintf(stderr, "pool_init()->pthread_create: %s\n", strerror(errno));

}

}

void pool_destory(void)

{

int var;

struct task_list_st *p1 = head->next;

struct task_list_st *p2;

if(pool->shutdown)

return;

pool->shutdown = -1;

/* 唤醒所有等待线程,线程池要销毁了 */

pthread_cond_broadcast(&(pool->cond_main));

pthread_cond_broadcast(&(pool->cond_equal));

/* 阻塞等待线程退出 */

for(var = 0; var < POOL_THREAD_NUMBER; var++)

pthread_join(pool->tid[var], NULL);

/* 先释放结构体内指针,再释放结构体 */

free(pool->tid);

/* 销毁任务队列 */

while(p1 != NULL)

{

p2 = p1->next;

free(p1);

p1 = p2;

}

free(head);

head = NULL;

/* 销毁互斥量和条件变量 */

pthread_mutex_destroy(&(pool->mutex_fd));

pthread_mutex_destroy(&(pool->mutex_number));

pthread_cond_destroy(&(pool->cond_main));

pthread_cond_destroy(&(pool->cond_equal));

/* 销毁线程池结构体 */

free(pool);

pool = NULL;

/* 关闭socket */

//close(listener);

}

/*    epoll通知某fd有事件,有数据等待接收的事件只会通知一次,因为客户端方send之后便会recv阻塞等待服务端send

*    因此服务端第一次recv:会接收到实际数据;第二次recv:recv返回-1,表示无数据可接收

*    但epoll仍然存在某fd两次有事件的情况,假如第一次epoll通知某fd有数据要接收,在未接收时该fd连接的客户端与服务端断开

*  此时便会再通知一次相同fd有事件,此时recv就应该执行两次:一次recv数据,一次recv表示对端关闭

* */

void *thread_function(void *p)

{

struct sandbox_message_st recvbuf;

struct sandbox_message_st sendbuf;

struct task_list_st *task = NULL;

int fd, ret;

//struct epoll_event ev;

while(1)

{

pthread_mutex_lock(&(pool->mutex_fd));

while(pool->fd == -1)

pthread_cond_wait(&(pool->cond_equal), &(pool->mutex_fd));

fd = pool->fd;

pool->fd = -1;

pthread_cond_signal(&(pool->cond_main));

pthread_mutex_unlock(&(pool->mutex_fd));

while (1)

{

bzero(&recvbuf, sizeof(recvbuf));

ret = recv(fd, &recvbuf, sizeof(recvbuf), 0);

if(ret == -1)

{

/* cmd=2/3先发送,shutdown后发送,由于线程并发shutdown先处理,cmd=2/3后处理,导致send之前该客户端fd已经关闭了 */

if(errno == EBADF)

break;

sendbuf.sandbox_cmd = 0;

memset(sendbuf.container_name, ‘\0‘, NAMESIZE);

/* 客户端send后会阻塞recv该消息,等不到该消息便不会再send,防止fd多次有事件,但实际却recv一次的情况 */

if(send(fd, &sendbuf, sizeof(sendbuf), MSG_NOSIGNAL) == -1)/* MSG_NOSIGNAL */

{

//perror("thread_function()->send1");

fprintf(stderr, "fd = %d send failed, reason : %s\n", fd, strerror(errno));

if(errno == EPIPE)

continue;

}

break;/* 循环recv时接收到无数据时便不再recv */

}

else if(ret == 0)/* 客户端与服务器断开连接 */

{

pthread_mutex_lock(&(pool->mutex_number));

fprintf(stderr, "fd 为 %d 的客户端退出\n", fd);

/* 删除相应任务链表节点、更新总线程数、根据目前总线程数和待启动样本数量决定是否需要再唤醒或启动样本 */

pool->total_thread_number -= delete_task_list(fd);

/* 防止任务链表已经清空但是客户端总线程数并未归零的情况 */

if(head->next == head)

{

fprintf(stderr, "Before clear out, pool->total_thread_number = %d\n", pool->total_thread_number);

pool->total_thread_number = 0;

}

fprintf(stderr, "fd 为 %d 的客户端退出后 pool->total_thread_number = %d\n", fd, pool->total_thread_number);

/* 有客户端退出,空缺出部分线程数量,可唤醒暂停态客户端或启动新客户端 */

if((task = find_resume_sandbox(pool->total_thread_number, -2)) != NULL)

{

/* resume某客户端后更新总线程数 */

pool->total_thread_number += task->sandbox_thread_number;

fprintf(stderr, "某客户端退出,fd 为 %d 的客户端被唤醒后 pool->total_thread_number = %d\n", task->connect_fd, pool->total_thread_number);

/* resume某客户端,激活客户端时处于暂停态客户端的优先级高于未运行客户端*/

task->sandbox_state = RUNNING;

resume_sandbox(task->sandbox_message.container_name);

}

//pthread_mutex_unlock(&(pool->mutex_number));

/* 客户端退出后删除相应客户端的epoll实例 */

if(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) == -1)

perror("thread_function()->epoll_ctl");

fprintf(stderr, "删除epoll 中fd 为 %d 的客户端实例成功\n", fd);

close(fd);/* 若没有显式地关闭fd,主线程添加实例和对等线程删除实例不会产生冲突 */

pthread_mutex_unlock(&(pool->mutex_number));/* epoll删除实例操作也应该在锁内完成 */

break;/* 循环recv时对方断开连接便不再recv */

}

else/* recv成功且有数据,维护任务链表内容 */

{

pthread_mutex_lock(&(pool->mutex_number));

fprintf(stderr, "recvbuf.sandbox_cmd = %d\n", recvbuf.sandbox_cmd);

if(recvbuf.sandbox_cmd == 1)

fprintf(stderr, "recvbuf.container_name = %s\n", recvbuf.container_name);

pool->total_thread_number += modify_task_list(fd, recvbuf);

fprintf(stderr, "自增或自减后 pool->total_thread_number = %d\n", pool->total_thread_number);

/* a客户端线程数+1后大于MAX_THREAD_NUMBER,此时就应该暂停最大线程数量的客户端 */

if(pool->total_thread_number > MAX_THREAD_NUMBER)

{

/* 获取suspend客户端信息 */

if((task = find_suspend_sandbox()) != NULL)

{

/* 暂停某客户端后更新总线程数 */

pool->total_thread_number -= task->sandbox_thread_number;

//fprintf(stderr, "线程数量超限,fd 为 %d 的客户端被暂停后 pool->total_thread_number = %d\n", task->connect_fd, pool->total_thread_number);

fprintf(stderr, "线程数量超限,最大线程数量 %d ,fd 为 %d 的客户端被暂停后 pool->total_thread_number = %d\n", task->sandbox_thread_number, task->connect_fd, pool->total_thread_number);

/* 暂停某客户端,暂停目前最大线程数量的客户端 */

printf("freeze sandbox : %s\n", task->sandbox_message.container_name);

task->sandbox_state = SUSPEND;

suspend_sandbox(task->sandbox_message.container_name);

/* 获取resume客户端信息 */

if((task = find_resume_sandbox(pool->total_thread_number, fd)) != NULL)

{

/* resume某客户端后更新总线程数 */

pool->total_thread_number += task->sandbox_thread_number;

fprintf(stderr, "线程数量超限暂停某客户端后,fd 为 %d 的客户端被唤醒 pool->total_thread_number = %d\n", task->connect_fd, pool->total_thread_number);

/* resume某客户端,激活客户端时处于暂停态客户端的优先级高于未运行客户端*/

task->sandbox_state = RUNNING;

resume_sandbox(task->sandbox_message.container_name);

}

}

}/* 若客户端总线程数量自减后,空闲线程数量可再容纳一个客户端线程数量,唤醒某客户端或启动新客户端 */

else if((MAX_THREAD_NUMBER - pool->total_thread_number - (start_number<<4))>>4 > 0)

{

//fprintf(stderr, "(MAX_THREAD_NUMBER - pool->total_thread_number - (start_number<<4))>>4 = %d\n", (MAX_THREAD_NUMBER - pool->total_thread_number - (start_number<<4))>>4);

if((task = find_resume_sandbox(pool->total_thread_number, -2)) != NULL)

{

/* resume某客户端后更新总线程数 */

pool->total_thread_number += task->sandbox_thread_number;

fprintf(stderr, "线程数量空缺,fd 为 %d 的客户端被唤醒后 pool->total_thread_number = %d\n", task->connect_fd, pool->total_thread_number);

/* resume某客户端,激活客户端时处于暂停态客户端的优先级高于未运行客户端*/

task->sandbox_state = RUNNING;

resume_sandbox(task->sandbox_message.container_name);

}

}

pthread_mutex_unlock(&(pool->mutex_number));

}

}

}

pthread_exit(NULL);/* 此句应该是不可达的 */

}

void setnonblocking(int fd)

{

int flags;

if((flags = fcntl(fd, F_GETFL, 0)) == -1)

handle_error("fcntl1");

if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)

handle_error("fcntl2");

}

void print_task_list(void)

{

if(!head) return ;

struct task_list_st *p = head->next;

while(p != head)

{

fprintf(stderr, "p->connect_fd = %d\n", p->connect_fd);

fprintf(stderr, "p->sandbox_message.container_name = %s\n", p->sandbox_message.container_name);

fprintf(stderr, "p->sandbox_thread_number = %d\n", p->sandbox_thread_number);

fprintf(stderr, "p->sandbox_state = %d\n", p->sandbox_state);

//fprintf(stderr, "p->resume_failed_count = %d\n", p->resume_failed_count);

fprintf(stderr, "---------------------------------------\n");

p = p->next;

}

fprintf(stderr, "=====================================================\n");

}

void add_task_list(int fd)

{

if(!head) return;

struct task_list_st *new;

new = (struct task_list_st *)malloc(sizeof(struct task_list_st));

if(new == NULL)

handle_error("add_task_list()->malloc");

new->connect_fd = fd;

memset(new->sandbox_message.container_name, ‘\0‘, NAMESIZE);

new->sandbox_message.sandbox_cmd = -1;

new->sandbox_thread_number = 0;//自加

new->sandbox_state = WAITING;//WAITING

new->resume_failed_count = 0;

/* 数据后插 */

new->next = head;

new->prev = head->prev;

new->prev->next = new;

new->next->prev = new;

}

int delete_task_list(int fd)

{

int delete_thread_number = 0;

if(!head) return 0;

struct task_list_st *p = head->next;

while(p != head)

{

if(p->connect_fd == fd)

{

delete_thread_number = p->sandbox_thread_number;

p->prev->next = p->next;

p->next->prev = p->prev;

free(p);

if(p->sandbox_state == SUSPEND)/* 暂停态客户端pool->total_thread_number已经减去了暂停态客户端线程数 */

return 0;

return delete_thread_number;

}

p = p->next;

}

return 0;

}

/*    modify_task_list

*    根据recv接收的数据信息,维护客户端线程数量,修改客户端任务状态

*    返回值:

*        线程启动          1

*        线程退出        -1

*        未找到              0

* */

int modify_task_list(int fd, struct sandbox_message_st buf)

{

if(!head) return 0;

struct task_list_st *p = head->next;

while(p != head)

{

if(p->connect_fd == fd)

{

switch(buf.sandbox_cmd)

{

case 1:

p->sandbox_state = RUNNING;//WAITING --> RUNNING,cmd=1仅会发送一次

strncpy(p->sandbox_message.container_name, buf.container_name, NAMESIZE);

break;

case 2:

p->sandbox_thread_number++;//自加

fprintf(stderr, "fd 为 %d 的客户端线程数自增\n", fd);

if(p->sandbox_state == SUSPEND)//已经处于暂停态的客户端仅更新链表节点信息,不再更新pool->total_thread_number

return 0;

return 1;

case 3:

p->sandbox_thread_number--;//自减

fprintf(stderr, "fd 为 %d 的客户端线程数自减\n", fd);

if(p->sandbox_state == SUSPEND)//已经处于暂停态的客户端仅更新链表节点信息,不再更新pool->total_thread_number

return 0;

return -1;

}

}

p = p->next;

}

return 0;

}

struct task_list_st *find_suspend_sandbox(void)

{

if(!head) return NULL;

struct task_list_st *p = head->next;

struct task_list_st *node = NULL;

while(p != head)

{

if(p->sandbox_state == RUNNING)

{

if(node == NULL)

node = p;

else if(p->sandbox_thread_number > node->sandbox_thread_number)

node = p;

}

p = p->next;

}

return node;

}

/*    find_resume_sandbox

*    长安提议:根据超时时间和线程数量,计算出最理想被继续执行的客户端

*    返回值

*        需要继续执行的任务节点            p

* */

struct task_list_st *find_resume_sandbox(int thread_number, int fd)

{

if(!head) return NULL;

int idle_thread_number = MAX_THREAD_NUMBER - thread_number;

struct task_list_st *p = head->next;

while(p != head)

{

if((p->sandbox_state == SUSPEND) && (p->connect_fd != fd))

{

if(p->sandbox_thread_number < idle_thread_number)

{

//p->sandbox_state = RUNNING;//SUSPEND --> RUNNING,是否应该真正重新启动成功后再修改状态?

p->resume_failed_count = 0;

return p;

}

else/* 某suspend客户端线程数>=线程池空闲线程数,不唤醒该客户端,唤醒失败计数+1,若所有suspend客户端均不满足唤醒条件,则向线程池中添加一个新的任务 */

{

p->resume_failed_count++;

/* 若某suspend态客户端唤醒失败计数达到最大值,则即使线程池空闲线程数不满足唤醒条件也不再启动新客户端 */

if(p->resume_failed_count > MAX_FAILED_COUNT)

return NULL;

}

}

p = p->next;

}

/* 除刚暂停的客户端外不存在suspend态客户端,或所有suspend态客户端均不满足唤醒条件,且等待队列中有任务,则启动新客户端 */

if(queue_number > 0)/* 等待队列中有任务 */

{

int num = (MAX_THREAD_NUMBER - pool->total_thread_number - (start_number<<4))>>4;/* 可启动的样本数量 */

if((num > 0))

{

char s[32] = {‘\0‘};

int min_num = min(num, queue_number);

start_number += min_num;

sprintf(s, "%d+", min_num);

if(write(fifo_wfd, s, strlen(s)) == -1)

handle_error("handle_connection()->write");

fprintf(stderr, "向pool_to_sh 的管道的写端写入 %s 成功\n", s);

/* 更新待启动样本数量 */

queue_number -= min_num;

}

}

return NULL;

}

int min(int a, int b)

{

if(a < b)

return a;

return b;

}

/*    suspend_sandbox

*    根据客户端号暂停客户端

* */

void suspend_sandbox(char container_name[])

{

/* 冻结命令 lxc-freeze -n container_name */

char command[128] = {‘\0‘};

sprintf(command, "lxc-freeze -n %s", container_name);

printf("command = %s\n", command);

if(system(command) == -1)

handle_error("suspend_sandbox()->system");

}

/*    resume_sandbox

*    根据客户端号唤醒客户端

* */

void resume_sandbox(char container_name[])

{

/* 解冻命令 lxc-unfreeze -n container_name */

char command[128] = {‘\0‘};

sprintf(command, "lxc-unfreeze -n %s", container_name);

printf("command = %s\n", command);

if(system(command) == -1)

handle_error("resume_sandbox()->system");

}

void handle_connection(void)

{

int listener, new_fd, /* epfd ,*/ nfds, n;

int backlog = 128;

struct sockaddr_un seraddr, cliaddr;

struct epoll_event  ev /*, events[MAX_EVENTS]*/;

socklen_t cliaddr_len = sizeof(cliaddr);

char buf[BUFSIZE];

/* socket */

if((listener = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)

handle_error("socket");

fprintf(stderr, "socket 创建成功\n");

setnonblocking(listener);/* 将fd设为非阻塞模式 */

/* bind */

bzero(&seraddr, sizeof(seraddr));

seraddr.sun_family = AF_UNIX;

strcpy(seraddr.sun_path, "/tmp/thread_pool");

unlink("/tmp/thread_pool");/* 防止重复绑定地址时bind失败 */

if(bind(listener, (struct sockaddr *)&seraddr, sizeof(seraddr)) == -1)

handle_error("bind");

fprintf(stderr, "bind 成功\n");

/* listen */

if(listen(listener, backlog) == -1)

handle_error("listen");

fprintf(stderr, "listen 成功\n");

//    /* epoll events */

//    if((epfd = epoll_create(MAX_EVENTS)) == -1)

//        handle_error("epoll_create");

ev.events = EPOLLIN | EPOLLET;

ev.data.fd = listener;

if(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev) == -1)

handle_error("epoll_ctl");

fprintf(stderr, "监听socket 加入epoll 成功\n");

while(1)

{

fprintf(stderr, "等待事件的到来epoll_wait \n");

nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);

if(nfds == -1)

perror("epoll_wait");

for(n = 0; n < nfds; n++)

{

if(events[n].data.fd == listener)//处理连接

{

fprintf(stderr, "有客户端连接服务端, before accept\n");

while(1)/* 多个客户端同时连接listener,epoll仅会告诉内核有事件,而并不明确相同事件有几次,若只accept一次,其他待连接的fd仍在队列中等待连接 */

{

new_fd = accept(listener, (struct sockaddr *)&cliaddr, &cliaddr_len);

if(new_fd == -1)

{

if(errno == EAGAIN)

break;

}

fprintf(stderr, "有客户端连接服务端,分配的fd 为 %d \n", new_fd);

setnonblocking(new_fd);/* 将fd设为非阻塞模式 */

ev.events = EPOLLIN | EPOLLET;

ev.data.fd = new_fd;

if(epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &ev) == -1)

perror("epoll_ctl");

fprintf(stderr, "将新连接的fd 为 %d 的客户端实例加入epoll 成功\n", new_fd);

/* 添加任务链表节点,任务状态初始为WAITING态,首次连接时发送信息包含容器名称 */

add_task_list(new_fd);

fprintf(stderr, "将fd 为 %d 的客户端信息添加到任务链表成功\n", new_fd);

start_number--;

if(start_number < 0)

start_number = 0;

}

}

else if(events[n].data.fd == fifo_rfd)

{

fprintf(stderr, "shell_to_pool 管道的读端 %d 有事件\n", fifo_rfd);

if(read(fifo_rfd, buf, BUFSIZE) == -1)

perror("handle_connection()->read");

else if(buf[0] == ‘+‘)//有待启动样本

{

queue_number = atoi(buf+1);/* 获取待启动样本数量 */

if(queue_number > 0)

{

/* 查询pool->total_thread_number数量,根据计算得出的可启动样本数量写管道 */

pthread_mutex_lock(&(pool->mutex_number));

int num = (MAX_THREAD_NUMBER - pool->total_thread_number - (start_number<<4))>>4;

pthread_mutex_unlock(&(pool->mutex_number));

if(num > 0)

{

char s[32] = {‘\0‘};

int min_num = min(num, queue_number);

start_number += min_num;

sprintf(s, "%d+", min_num);

if(write(fifo_wfd, s, strlen(s)) == -1)

perror("handle_connection()->write");

fprintf(stderr, "向pool_to_sh 的管道的写端写入 %s 成功\n", s);

/* 更新待启动样本数量 */

queue_number -= min_num;

}

}

}

}

else

{

fprintf(stderr, "连接服务端fd 为 %d 的客户端有事件\n", events[n].data.fd);

pthread_mutex_lock(&(pool->mutex_fd));

while(pool->fd != -1)

pthread_cond_wait(&(pool->cond_main), &(pool->mutex_fd));

pool->fd = events[n].data.fd;

pthread_cond_signal(&(pool->cond_equal));

pthread_mutex_unlock(&(pool->mutex_fd));

/*    注意:着重考虑当线程池中工作线程满时,待处理的的fd多次有事件而处理该fd时仅recv一次的情况 */

}

}

}

}

void create_epoll(void)

{

/* epoll events */

if((epfd = epoll_create(MAX_EVENTS)) == -1)

handle_error("epoll_create");

fprintf(stderr, "创建epoll 成功\n");

}

void open_fifo(void)

{

struct epoll_event ev;

/* 打开shell_to_poll读端,并将此读端fd加入epoll进行监听 */

if(access(fifo_1, F_OK) == -1)

if(mkfifo(fifo_1, 0777) == -1)

handle_error("open_fifo()->mkfifo fifo_1");

if(access(fifo_2, F_OK) == -1)

if(mkfifo(fifo_2, 0777) == -1)

handle_error("open_fifo()->mkfifo fifo_2");

fifo_rfd = open(fifo_1, O_RDONLY | O_NONBLOCK);

if(fifo_rfd == -1)

handle_error("open()->fifo_rfd");

fprintf(stderr, "打开fifo_rfd 成功\n");

ev.events = EPOLLIN | EPOLLET;

ev.data.fd = fifo_rfd;

if(epoll_ctl(epfd, EPOLL_CTL_ADD, fifo_rfd, &ev) == -1)

handle_error("open_fifo()->epoll_ctl");

fprintf(stderr, "将fifo_rfd 加入 epoll 成功\n");

/* 打开poll_to_shell写端 */

//fifo_wfd = open(fifo_2, O_WRONLY | O_NONBLOCK);

fifo_wfd = open(fifo_2, O_RDWR | O_NONBLOCK);

if(fifo_wfd == -1)

handle_error("open()->fifo_wfd");

fprintf(stderr, "打开fifo_wfd 成功\n");

}

int main()

{

struct sigaction sa;

sa.sa_handler = SIG_IGN;

sigemptyset(&sa.sa_mask);

sigaddset(&sa.sa_mask, SIGPIPE);

sa.sa_flags = 0;

if(sigaction(SIGPIPE, &sa, NULL) == -1)

handle_error("sigaction()");

pool_init();

task_list_init();

create_epoll();

open_fifo();

handle_connection();

//pool_destory();

return 0;

}

时间: 2024-10-13 03:07:11

基于线程池的线程调度管控系统的相关文章

【Java TCP/IP Socket】基于线程池的TCP服务器(含代码)

了解线程池 在http://blog.csdn.net/ns_code/article/details/14105457(读书笔记一:TCP Socket)这篇博文中,服务器端采用的实现方式是:一个客户端对应一个线程.但是,每个新线程都会消耗系统资源:创建一个线程会占用CPU周期,而且每个线程都会建立自己的数据结构(如,栈),也要消耗系统内存,另外,当一个线程阻塞时,JVM将保存其状态,选择另外一个线程运行,并在上下文转换(context switch)时恢复阻塞线程的状态.随着线程数的增加,线

使用Android新式LruCache缓存图片,基于线程池异步加载图片

import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import a

Android下基于线程池的网络访问基础框架

引言 现在的Android开发很多都使用Volley.OkHttp.Retrofit等框架,这些框架固然有优秀的地方(以后会写代码学习分享),但是我们今天介绍一种基于Java线程池的网络访问框架. 实现思路及实现 APP界面上面的数据都是通过网络请求获取的,我们能不能将网络请求依次入队,然后配合着Java线程池,让线程依次处理我们的请求,最后返回结果给我们.下面我们先来看一下线程池工具类的实现: 1 public class ThreadPoolUtils { 2 3 private Threa

基于线程池、消息队列和epoll模型实现Client-Server并发架构

引言 并发是什么?企业在进行产品开发过程中为什么需要考虑这个问题?想象一下天猫的双11和京东的618活动,一秒的点击量就有几十万甚至上百万,这么多请求一下子涌入到服务器,服务器需要对这么多的请求逐个进行消化掉,假如服务器一秒的处理能力就几万,那么剩下的不能及时得到处理的这些请求作何处理?总不能让用户界面一直等着,因此消息队列应运而生,所有的请求都统一放入消息队列,工作线程从消息队列不断的消费,消息队列相当于一个缓冲区,可达到解藕.异步和削峰的目的. Kafka.ActiveMQ.RabbitMQ

10.线程池_线程调度

/*线程池*/ /*第四种 获取线程的方法:线程池*/ 一个ExecutorService,它使用线程池的可能的某个线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置(ExecutorService service = Executors.newFixedThreadPool(5);) 线程池可以解决两个不同问题: 1.由于减少了每个任务的开销,它们通常可以在执行大量异步任务时 提供增强的性能, 2.并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.每个Thr

基于线程池异步抓取

from multiprocessing.dummy import Pool #线程池模块 #必须只可以有一个参数 def my_requests(url): return requests.get(url=url,headers=headers).text start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', ] p

基于线程池和连接池的Http请求

背景:最新项目需求调用http接口,所以打算使用最新的httpClient客户端写一个工具类,写好了以后在实际应用过程中遇到了一些问题,因为数据量还算 大,每次处理大概要处理600-700次请求,平均算下来大概需要20分钟,这个速度虽然是跑在定时任务中的,但是也是不能忍受的,所以有了这个博客. 1.首先想到的解决办法就是多线程发请求了,但是这个有坑,最后会在结果处说明. 2.代码方面如下 ExecutorService executor = Executors.newFixedThreadPoo

Java线程池的理论与实践

前段时间公司里有个项目需要进行重构,目标是提高吞吐量和可用性,在这个过程中对原有的线程模型和处理逻辑进行了修改,发现有很多基础的多线程的知识已经模糊不清,如底层线程的运行情况.现有的线程池的策略和逻辑.池中线程的健康状况的监控等,这次重新回顾了一下,其中涉及大量java.util.concurrent包中的类.本文将会包含以下内容:Java中的Thread与操作系统中的线程的关系线程切换的各种开销ThreadGroup存在的意义使用线程池减少线程开销Executor的概念ThreadPoolEx

Android性能优化之使用线程池处理异步任务

说到线程,我想大家都不陌生,因为在开发时候或多或少都会用到线程,而通常创建线程有两种方式: 1.继承Thread类 2.实现Runnable接口 虽说这两种方式都可以创建出一个线程,不过它们之间还是有一点区别的,主要区别在于在多线程访问同一资源的情况下,用Runnable接口创建的线程可以处理同一资源,而用Thread类创建的线程则各自独立处理,各自拥有自己的资源. 所以,在Java中大多数多线程程序都是通过实现Runnable来完成的,而对于Android来说也不例外,当涉及到需要开启线程去完