可动态增减的线程池,主线程accept——基于UNP代码修改

可动态增减的线程池,主线程accept——基于UNP代码修改

1.说明

线程池基于一个区间动态变化,在客户连接过多线程不够用时,动态增加一定数量的线程。在线程闲置数量多于一半时,动态减小线程数量到一个基准线。

这个例子模式为:半同步/半异步(half-sync/half-async)

2.代码相关说明

代码基于UNP的库函数,要想运行必须先安装相应库。

3.代码

#include "unpthread.h"
#include <queue>
#include <list>
#include <vector>

using std::vector;
using std::list;
using std::queue;

//可以将下面的信息放入一个结构体中,作为线程池结构
const int MAX_NTHREADS     = 30;        //线程池能够创建的最大线程数
const int MIN_NTHREADS     = 15;        //线程池中最小线程数
const int MINFREE_NTHREADS = 2;        //允许的最小空闲线程数
const int ADD_NTHREADS     = 5;        //空闲线程数< MINFREE_NTHREDS时增加的线程数

//记录每个线程信息
typedef struct {
    bool        thread_flag;        //是否正在记录线程信息;true: 是; false: 否
    pthread_t   thread_tid;         //线程的ID号
    long        thread_count;       //此线程被执行的次数
} thread_info;

static thread_info thread_info_array[MAX_NTHREADS];  //存放所有的线程信息
static int total_nthreads = MIN_NTHREADS;             //当前存在的总线程数,初始化线程池的大小
static int free_nthreads = MIN_NTHREADS;              //空闲的线程数量

//static queue<int> clifd_list;
static list<int> clifd_list;                    //已连接的客户端描述符队列
//static forward_list<int> clifd_list;
//const int MAXNCLI = 50;                       //UNP中的最大的客户端数目
//static int clifd[MAXNCLI], iget, iput;        //循环队列: UNP中用于存放已连接的客户,和取放客户端的位置。可以使用vector做循环队列在不够时动态增加长度。
static pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  clifd_cond  = PTHREAD_COND_INITIALIZER;

static void sig_int(int signo)
{
    void pr_cpu_time(void);
    pr_cpu_time();
    exit(0);
}

//线程运行函数
void *thread_main(void *arg)
{
    //arg是在thread_info_array中的位置
    int thread_pos = (int)arg;
    DPRINTF("thread %d starting\n", thread_pos);
    void web_child(int);

    for (;;) {
        DPRINTF("thread %d, threadID %d want to lock\n", thread_pos,
                thread_info_array[thread_pos].thread_tid);
        //给当前的线程上锁
        Pthread_mutex_lock(&clifd_mutex);
        DPRINTF("thread %d threadID %d, thread_id %d locked\n", thread_pos,
                thread_info_array[thread_pos].thread_tid, pthread_self());
        //UNP中原本的循环数组
        /*
        while (iget == iput) {
            DPRINTF("no jobs, thread %d wait cond\n");
            Pthread_cond_wait(&clifd_cond, &clifd_mutex);
            DPRINTF("thread %d wait signal cond\n", thread_pos);
        }
        int connfd = clifd[iget];
        if (++iget == MAXNCLI) {
            iget = 0;
        }
        */
        //没有已连接的客户
        while (clifd_list.empty()) {
            //busy_nthreads = 0;
            //memset(thread_busyflag, 0, MAX_NTHREADS);
            //free_nthreads = total_nthreads;
            DPRINTF("no jobs, thread %d wait cond\n");
            //等待条件变量,此时mutex解锁。
            //被唤醒后,重新加锁
            Pthread_cond_wait(&clifd_cond, &clifd_mutex);
            DPRINTF("thread %d wait signal cond\n", thread_pos);
        }
        //for (int i = 0; i < MAX_NTHREADS; ++i) {
        //    if (thread_info_list[i].thread_busyflag == 0) {
        //        ++free_nthreads;
        //    }
        //}
        //空线程数目
        free_nthreads = total_nthreads - clifd_list.size();
        //当前线程不够用
        if (free_nthreads < 0) {
            free_nthreads = 0;
        }
        //空闲线程数超过一半
        if (free_nthreads > total_nthreads / 2 && total_nthreads > MIN_NTHREADS) {
            DPRINTF("*free %d, total %d**********************************Free a thread\n\n",
                    free_nthreads, total_nthreads);
           //减小线程数目
            --total_nthreads;
            //标识此结构没有再用
            thread_info_array[thread_pos].thread_flag = false;
            //分离当前线程,线程结束系统回收资源
            Pthread_detach(pthread_self());
            //thread_info_list[thread_pos].thread_busyflag = 0;
            //解锁互斥量
            Pthread_mutex_unlock(&clifd_mutex);
            //线程退出
            pthread_exit(NULL);
        }
        int connfd = clifd_list.front();
        //clifd_list.pop();
        clifd_list.pop_front();
        //++busy_nthreads;
        //--free_nthreads;
        //thread_busyflag[thread_pos] = true;
        //thread_info_list[thread_pos].thread_busyflag = 1;

        Pthread_mutex_unlock(&clifd_mutex);
        DPRINTF("thread %d unlocked\n", thread_pos);
        //++thread_info_list[thread_pos].thread_count;
        //执行相应任务
        web_child(connfd);
        Close(connfd);
        //thread_busyflag[thread_pos] = false;
    }

}

//创建线程,记录线程信息:ID,结构是否在用
void thread_make(int i)
{
    Pthread_create(&thread_info_array[i].thread_tid, NULL, &thread_main, (void*)i);
    thread_info_array[i].thread_flag = true;
    return;
}

int main(int argc, char *argv[])
{
    socklen_t addrlen;
    int listenfd;
    if (argc == 3) { //IP:Port
        listenfd = Tcp_listen(NULL, argv[1], &addrlen);
    } else if (argc == 4) { //用于指定ipv4还是ipv6
        listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
    } else {
        err_quit("Usage: a.out [ <host> ] <port#> <#threads>");
    }
    struct sockaddr *cliaddr  = (struct sockaddr*)Malloc(addrlen);

//    total_nthreads = atoi(argv[argc - 1]);
    //thread_info_list = (Thread*)Calloc(total_nthreads, sizeof(Thread));
    //iget = iput = 0;

    //create all threads,开始数目是线程池允许的最小数目
    for (int i = 0; i < total_nthreads; ++i) {
        thread_make(i);
    }
    //中断用于比较时间
    Signal(SIGINT, sig_int);

    for (;;) {
        DPRINTF("Busy thread number is %d\n", clifd_list.size());
        DPRINTF("Free thread number is %d\n", free_nthreads);
        DPRINTF("Total thread number is %d\n", total_nthreads);

        socklen_t clilen = addrlen;
        DPRINTF("Wait for a connection\n");
        //获取已连接的描述符
        int connfd = Accept(listenfd, cliaddr, &clilen);
        DPRINTF("Accept a connection\n");

        DPRINTF("Main thread want to lock\n");
        //加锁更改存放描述符的结构
        Pthread_mutex_lock(&clifd_mutex);
        DPRINTF("Main thread locked\n");

        /*clifd[iput] = connfd;
        if (++iput == MAXNCLI) {
            iput = 0;
        }
        if (iput == iget) {
            err_quit("iput = iget = %d\n", iput);
        }*/
        //clifd_list.push(connfd);
        clifd_list.push_back(connfd);

        //空闲线程数
        free_nthreads = total_nthreads - clifd_list.size();
        if (free_nthreads < 0) {
            free_nthreads = 0;
        }
        //空闲连接数小于允许的最小空闲连接数目,增加线程数木
        if (free_nthreads < MINFREE_NTHREADS && total_nthreads < MAX_NTHREADS) {
            for (int i = 0; i < ADD_NTHREADS; ++i, ++total_nthreads) {
                DPRINTF("******************************create a new threads\n");
                for (int j = 0; j < MAX_NTHREADS; ++j) {
                    if (thread_info_array[j].thread_flag == false) {
                        thread_make(j);
                    }
                }
            }
        }
        //广播条件变量,唤醒正在等待的线程
        Pthread_cond_signal(&clifd_cond);
        DPRINTF("Main thread singal cond\n");
        Pthread_mutex_unlock(&clifd_mutex);
        DPRINTF("Main thread unlocked\n");

    }
    return 0;
}

可动态增减的线程池,主线程accept——基于UNP代码修改

时间: 2024-12-11 14:08:26

可动态增减的线程池,主线程accept——基于UNP代码修改的相关文章

线程池的各个线程accept——基于UNP代码

线程池的各个线程accept——基于UNP代码 1.说明 预创建一个线程池,线程池中各个线程accept.主线程不做什么. 这是Leader/Follower领导者/跟随者模式 2.代码 代码基于UNP的库函数 #include "unpthread.h" //线程信息 typedef struct { pthread_t thread_tid; //thread ID long thread_count; //connections handled } Thread; //线程结构数

线程系列06,通过CLR代码查看线程池及其线程

在"线程系列04,传递数据给线程,线程命名,线程异常处理,线程池"中,我们已经知道,每个进程都有一个线程池.可以通过TPL,ThreadPool.QueueUserWorkItem,委托与线程池交互.本篇体验:通过查看CLR代码来观察线程池及其线程. □ 通过编码查看线程池和线程 使用ThreadPool的静态方法QueueUserWorkItem把线程放入线程池,来看线程池线程和主程序线程的执行情况. class Program { static void Main(string[]

线程池和线程的选择

能用线程池就用线程池,线程池效率比线程高很多. 线程池处理线程的顺序不一定. 线程池不能手动关闭具体线程. 如果执行线程时间特别长,那手动创建线程,和放入线程池中没太大区别. 线程池非常适合做大量的小的运算.

多线程 线程池 守护线程

守护线程只是个概念问题,一句话可以总结(不知道总结的对不对^_^); 当所有用户线程都结束的时候,守护线程也就结束了,当有用户线程存在的时候,守护线程就是一个普通线程. main线程不可以设置成守护线程,应为只有在线程调用start方法前,才可以设置线程为守护线程,main线程是jvm创建的 多线程以及线程池的问题 import java.io.DataInputStream; import java.io.File; import java.io.FileOutputStream; impor

Android子线程刷新主线程中View

最近遇到一问题,ListView Item加载多个图片,图片是在Adapter的getView方法通过子线程异步进行加载的. 这时候就涉及到子线程刷新主线程中View的问题,一般有两个方式, 1.View.post(Runnable); 2.Activity.runOnUiThread(Runnable); 首次是使用View.post方式来刷新界面,但是一直刷新失败,debug发现图片下载已经成功,View.post方法也已经调用了,但是图片刷新一直没有成功. 很纳闷,于是加上log看了一下,

通过实验研究“线程池中线程数目的变化规律” --- 下有不错的线程池使用 原理 总结

通过实验研究“线程池中线程数目的变化规律” 自从看了老赵关于线程池的实验以后,我就想学着做一个类似的实验,验证自己的理解,现在终于做好了,请大家指正. 一般情况下我们都使用Thread类创建线程,因为通过Thread对象可以对线程进行灵活的控制.但创建线程和销毁线程代价不菲,过多的线程会消耗掉大量的内存和CPU资源,假如某段时间内突然爆发了100个短小的线程,创建和销毁这些线程就会消耗很多时间,可能比线程本身运行的时间还长.为了改善这种状况,.NET提供了一种称之为线程池(Thread Pool

Python的并发并行[4] -&gt; 并发 -&gt; 利用线程池启动线程

利用线程池启动线程 submit与map启动线程 利用两种方式分别启动线程,同时利用with上下文管理来对线程池进行控制 1 from concurrent.futures import ThreadPoolExecutor as tpe 2 from concurrent.futures import ProcessPoolExecutor as ppe 3 from time import ctime, sleep 4 from random import randint 5 6 def f

由浅入深理解Java线程池及线程池的如何使用

前言 多线程的异步执行方式,虽然能够最大限度发挥多核计算机的计算能力,但是如果不加控制,反而会对系统造成负担.线程本身也要占用内存空间,大量的线程会占用内存资源并且可能会导致Out of Memory.即便没有这样的情况,大量的线程回收也会给GC带来很大的压力. 为了避免重复的创建线程,线程池的出现可以让线程进行复用.通俗点讲,当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用. 接下来从总体到细致的方式,来共同探讨线程池. 总体的架构

根据CPU核心数确定线程池并发线程数

转载:https://blog.csdn.net/FAw67J7/article/details/79885842 目录 一.抛出问题 二.分析 三.实际应用 四.总结: 正文 回到顶部 一.抛出问题 关于如何计算并发线程数,一般分两派,来自两本书,且都是好书,到底哪个是对的?问题追踪后,整理如下: 第一派:<Java Concurrency in Practice>即<java并发编程实践>,如下图: 如上图,在<Java Concurrency in Practice&g