1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 6 import queue 7 import threading 8 import contextlib 9 import time 10 11 StopEvent = object() 12 13 14 class ThreadPool(object): 15 16 def __init__(self, max_num): 17 self.q = queue.Queue()#存放任务的队列 18 self.max_num = max_num#最大线程并发数 19 20 self.terminal = False#如果为True 终止所有线程,不再获取新任务 21 self.generate_list = [] #已经创建的线程 22 self.free_list = []#闲置的线程 23 24 def run(self, func, args, callback=None): 25 """ 26 线程池执行一个任务 27 :param func: 任务函数 28 :param args: 任务函数所需参数 29 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 30 :return: 如果线程池已经终止,则返回True否则None 31 """ 32 33 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: #无空闲线程和不超过最大线程数 34 self.generate_thread() # 创建线程 35 w = (func, args, callback,)#保存参数为元组 36 self.q.put(w)#添加到任务队列 37 38 def generate_thread(self): 39 """ 40 创建一个线程 41 """ 42 t = threading.Thread(target=self.call) 43 t.start() 44 45 def call(self): 46 """ 47 循环去获取任务函数并执行任务函数 48 """ 49 current_thread = threading.currentThread#获取当前线程对象 50 self.generate_list.append(current_thread)#添加到已创建线程里 51 52 event = self.q.get() #获取任务 53 while event != StopEvent: #如果不为停止信号 54 55 func, arguments, callback = event#分别取值, 56 try: 57 result = func(*arguments) #运行函数,把结果赋值给result 58 status = True #运行结果是否正常 59 except Exception as e: 60 status = False #不正常 61 result = e #结果为错误信息 62 63 if callback is not None: # 是否有回调函数 64 try: 65 callback(status, result) #执行回调函数 66 except Exception as e: 67 pass 68 69 if self.terminal: # 默认为False ,如果调用terminal方法 70 event = StopEvent #停止信号 71 else: 72 # self.free_list.append(current_thread) #执行完毕任务,添加到闲置列表 73 # event = self.q.get() #获取任务 74 # self.free_list.remove(current_thread) #获取到任务之后,从闲置里删除 75 with self.worker_state(self.free_list,current_thread): 76 event = self.q.get() 77 78 79 else: 80 self.generate_list.remove(current_thread) #如果收到终止信号,就从已创建的列表删除 81 82 def close(self): #终止线程 83 num = len(self.generate_list) #获取总已创建的线程 84 while num: 85 self.q.put(StopEvent) #添加停止信号,有几个线程就添加几个 86 num -= 1 87 88 # 终止线程(清空队列) 89 def terminate(self): 90 91 self.terminal = True #更改为True, 92 93 while self.generate_list: #如果有已创建线程存活 94 self.q.put(StopEvent) #有几个就发几个信号 95 self.q.empty() #清空队列 96 @contextlib.contextmanager 97 def worker_state(self,free_list,current_thread): 98 free_list.append(current_thread) 99 try: 100 yield 101 finally: 102 free_list.remove(current_thread) 103 import time 104 105 def work(i): 106 print(i) 107 108 pool = ThreadPool(10) 109 for item in range(50): 110 pool.run(func=work, args=(item,)) 111 # pool.terminate() 112 pool.close()
时间: 2024-10-11 17:17:18