发一个可伸缩线程池大小的python线程池。已通过测试。

发一个可伸缩线程池大小的线程池。

当任务不多时候,不开那么多线程,当任务多的时候开更多线程。当长时间没任务时候,将线程数量减小到一定数量。

"""
可自动实时调节线程数量的线程池。

"""

import atexit
import queue
import sys
import threading
import time
import weakref

from app.utils_ydf import LoggerMixin, nb_print, LoggerLevelSetterMixin

# noinspection PyShadowingBuiltins
# print = nb_print

_shutdown = False
_threads_queues = weakref.WeakKeyDictionary()

def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()

atexit.register(_python_exit)

class _WorkItem(LoggerMixin):
    def __init__(self, fn, args, kwargs):
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        # noinspection PyBroadException
        try:
            self.fn(*self.args, **self.kwargs)
        except BaseException as exc:
            self.logger.exception(f‘函数 {self.fn.__name__} 中发生错误,错误原因是 {type(exc)} {exc} ‘)

    def __str__(self):
        return f‘{(self.fn.__name__, self.args, self.kwargs)}‘

class CustomThreadPoolExecutor(LoggerMixin, LoggerLevelSetterMixin):
    def __init__(self, max_workers=None, thread_name_prefix=‘‘):
        """
        最好需要兼容官方concurren.futures.ThreadPoolExecutor 和改版的BoundedThreadPoolExecutor,入参名字和个数保持了一致。
        :param max_workers:
        :param thread_name_prefix:
        """
        self._max_workers = max_workers or 4
        self._min_workers = 5
        self._thread_name_prefix = thread_name_prefix
        self.work_queue = queue.Queue(max_workers)
        # self._threads = set()
        self._threads = weakref.WeakSet()
        self._lock_compute_threads_free_count = threading.Lock()
        self.threads_free_count = 0
        self._shutdown = False
        self._shutdown_lock = threading.Lock()

    def set_min_workers(self, min_workers=5):
        self._min_workers = min_workers
        return self

    def change_threads_free_count(self, change_num):
        with self._lock_compute_threads_free_count:
            self.threads_free_count += change_num

    def submit(self, func, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError(‘不能添加新的任务到线程池‘)
        self.work_queue.put(_WorkItem(func, args, kwargs))
        self._adjust_thread_count()

    def _adjust_thread_count(self):
        # if len(self._threads) < self._threads_num:
        self.logger.debug((self.threads_free_count, len(self._threads), len(_threads_queues), get_current_threads_num()))
        if self.threads_free_count < self._min_workers and len(self._threads) < self._max_workers:
            # t = threading.Thread(target=_work,
            #                      args=(self._work_queue,self))
            t = _CustomThread(self).set_log_level(self.logger.level)
            t.setDaemon(True)
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self.work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self.work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

class _CustomThread(threading.Thread, LoggerMixin, LoggerLevelSetterMixin):
    def __init__(self, executorx: CustomThreadPoolExecutor):
        super().__init__()
        self._executorx = executorx
        self._run_times = 0

    def _remove_thread(self, stop_resson=‘‘):
        # noinspection PyUnresolvedReferences
        self.logger.debug(f‘停止线程 {self._ident}, 触发条件是 {stop_resson} ‘)
        self._executorx.change_threads_free_count(-1)
        self._executorx._threads.remove(self)
        _threads_queues.pop(self)

    # noinspection PyProtectedMember
    def run(self):
        # noinspection PyUnresolvedReferences
        self.logger.debug(f‘新启动线程 {self._ident} ‘)
        self._executorx.change_threads_free_count(1)
        while True:
            try:
                work_item = self._executorx.work_queue.get(block=True, timeout=60)
            except queue.Empty:
                # continue
                # self._remove_thread()
                # break
                if self._executorx.threads_free_count > self._executorx._min_workers:
                    self._remove_thread(f‘当前线程超过60秒没有任务,线程池中不在工作状态中的线程数量是 {self._executorx.threads_free_count},超过了指定的数量 {self._executorx._min_workers}‘)
                    break
                else:
                    continue

            # nb_print(work_item)
            if work_item is not None:
                self._executorx.change_threads_free_count(-1)
                work_item.run()
                del work_item
                self._executorx.change_threads_free_count(1)
                self._run_times += 1
                if self._run_times == 50:
                    self._remove_thread(f‘运行超过了50次,销毁线程‘)
                    break
                continue
            if _shutdown or self._executorx._shutdown:
                self._executorx.work_queue.put(None)
                break

# @decorators.tomorrow_threads(20)
def show_current_threads_num(sleep_time=60, process_name=‘‘, block=False):
    process_name = sys.argv[0] if process_name == ‘‘ else process_name

    def _show_current_threads_num():
        while True:
            nb_print(f‘{process_name} 进程 的 线程数量是 -->  {threading.active_count()}‘)
            time.sleep(sleep_time)

    if block:
        _show_current_threads_num()
    else:
        t = threading.Thread(target=_show_current_threads_num, daemon=True)
        t.start()

def get_current_threads_num():
    return threading.active_count()

if __name__ == ‘__main__‘:
    from app.utils_ydf import decorators, BoundedThreadPoolExecutor

    # @decorators.keep_circulating(1)
    def f1(a):
        time.sleep(0.2)
        nb_print(f‘{a} 。。。。。。。‘)
        # raise Exception(‘抛个错误测试‘)

    # show_current_threads_num()
    pool = CustomThreadPoolExecutor(200).set_log_level(10).set_min_workers()
    # pool = BoundedThreadPoolExecutor(200)   # 测试对比原来写的BoundedThreadPoolExecutor
    show_current_threads_num(sleep_time=5)
    for i in range(300):
        time.sleep(0.3)  # 这里的间隔时间模拟,当任务来临不密集,只需要少量线程就能搞定f1了,因为f1的消耗时间短,不需要开那么多线程,CustomThreadPoolExecutor比BoundedThreadPoolExecutor 优势之一。
        pool.submit(f1, str(i))

    nb_print(6666)
    # pool.shutdown(wait=True)
    pool.submit(f1, ‘yyyy‘)

    # 下面测试阻塞主线程退出的情况。注释掉可以测主线程退出的情况。
    while True:
        time.sleep(10)

原文地址:https://www.cnblogs.com/ydf0509/p/11025724.html

时间: 2024-08-03 01:18:45

发一个可伸缩线程池大小的python线程池。已通过测试。的相关文章

一个简单缩略版的python 线程池实现

1 #-*-coding:utf-8-*-2 2 3 import threading 4 import queue 5 import itertools 6 import os 7 import time 8 9 10 RUN = 0 11 CLOSE = 1 12 TERMINATE = 2 13 job_counter = itertools.count() 14 15 16 class Pool(object): 17 18 def __init__(self, max_thread_n

JAVA线程池中队列与池大小的关系

JAVA线程中对于线程池(ThreadPoolExecutor)中队列,池大小,核心线程的关系写出自己的理解: 1:核心线程:简单来讲就是线程池中能否允许同时并发运行的线程的数量 2:线程池大小:线程池中最多能够容纳的线程的数量. 3:队列:对提交过来的任务的处理模式. 对于线程池与队列的交互有个原则: 如果队列发过来的任务,发现线程池中正在运行的线程的数量小于核心线程,则立即创建新的线程,无需进入队列等待.如果正在运行的线程等于或者大于核心线程,则必须参考提交的任务能否加入队列中去. 1:提交

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

Android线程管理之ThreadPoolExecutor自定义线程池(三)

前言: 上篇主要介绍了使用线程池的好处以及ExecutorService接口,然后学习了通过Executors工厂类生成满足不同需求的简单线程池,但是有时候我们需要相对复杂的线程池的时候就需要我们自己来自定义一个线程池,今天来学习一下ThreadPoolExecutor,然后结合使用场景定义一个按照线程优先级来执行的任务的线程池. ThreadPoolExecutor ThreadPoolExecutor线程池用于管理线程任务队列.若干个线程. 1.)ThreadPoolExecutor构造函数

一个简单的python线程池框架

初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信. 代码如下: 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 __author__ = "pandaychen" 5 6 import Queue 7 import sys 8 import os 9 import threading 10 import time 11

线程池Python 线程、进程和协程

Python   线程 Threading是用于提供线程相关的操作,线程是应用程序中工作的最小单元.线程与进程的关系下图所示: 子线程是由主线程产生的,但两者并没有关联. 利用threading创建线程: 1 '''利用threading包创建''' 2 import threading 3 import time 4 5 def run(n): 6 time.sleep(2) 7 print("task:",n) 8 9 '''串行:一个运行完后,再运行另外一个''' 10 run(

[python] ThreadPoolExecutor线程池 python 线程池

初识 Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行. 这就是线程池的思想(当然没这么简单),但是自己编

python线程池

线程池: 版本一: #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread

Python 线程池

python默认没有提供线程池的功能,所以要想使用线程池,就必要使用第三方的模块或者自定义线程 线程并不是越多越好,线程的上下文切换会影响到服务器的性能 线程池:一个容器,有最大数,取一个少一个,无线程时等待,线程执行完毕,交还线程 __author__ = 'alex' #coding:utf-8 import queue import threading import time class ThreadPool: def __init__(self,maxsize=5): self.maxs