#!/user/bin/evn python # -*- coding:utf-8 -*- import threading import queue,time ‘‘‘ 线程池的思路: 将任务依次放在队列里面 然后从队列取出任务交给线程执行 ‘‘‘ stopEvent=object()#任务完了的标志---下面我们将任务包封装到元组中 class ThreadPool(object): def __init__(self,max_num): #创建队列 self.q=queue.Queue() #创建线程最大的数量(线程池的最大容量) self.max_num=max_num #空闲线程列表(数量) self.free_list=[] #真实创建的线程列表(数量) self.gemerate_list=[] #中断任务标志 self.terminal=False self.num=0 def run(self,func,args,callback=None): #func:任务函数 #args:任务函数的参数 #callback:线程执行成功或者失败后执行的回调函数 task=(func,args,callback)#将任务封装到元组中 ==任务包 #将任务包放到队列中 self.q.put(task) #创建线程 if len(self.free_list)==0 and len(self.gemerate_list)<self.max_num : self.generate_thread() #创建线程 def generate_thread(self): #创建一个线程 t=threading.Thread(target=self.call) t.start() def call(self): ‘‘‘ 循环去获取任务函数并执行任务函数 ‘‘‘ #获取当前线程 current_thread=threading.currentThread #将当前线程添加到列表中 self.gemerate_list.append(current_thread) #获取任务 Event=self.q.get() while Event!=stopEvent :#表示是任务 #分解任务包 func,args,callable=Event status=True #标志执行成功 try: #执行任务 ret=func(*args) except Exception as e: status=False ret=e #执行回调函数callback if callback==None: pass else: callback(status,ret) if self.terminal :#不终止任务 Event=stopEvent else: #标记:我空闲了 self.free_list.append(current_thread) #再从队列去取任务 Event=self.q.get() #将线程从空闲列表中移除 self.free_list.remove(current_thread) else:#表示不是任务 self.gemerate_list.remove(current_thread) #任务执行完毕后 停止运行 def close(self): time.sleep(2) num=len(self.gemerate_list) print(num) while num: self.q.put(stopEvent)#往队列添加停止标志(创建了多少线程,就添加多少) num-=1 def terminals(self): self.terminal=True#标记任务终止 while self.gemerate_list: self.q.put(stopEvent) self.q.empty()#清空队列 #回调函数 def callback(statue,result): # print(statue) # print(result) pass #任务函数 def action(args): time.sleep(1) print(args) return args #创建线程池对象 pool=ThreadPool(10) for item in range(100): ‘‘‘ #将任务放在队列中 #着手开始处理任务(线程处理) --创建线程 有空闲线程,则不再创建线程 没有空闲线程,开始创建线程 1.不能高于线程池的限制 2.根据任务来判断 --去队列取任务 ‘‘‘ pool.run(func=action,args=(item,),callback=callback) # pool.close()#任务执行完后 #pool.terminals()终止任务
时间: 2024-12-11 01:43:35