python自定义线程池

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 #!/usr/bin/env python
  4 # -*- coding:utf-8 -*-
  5
  6 import queue
  7 import threading
  8 import contextlib
  9 import time
 10
 11 StopEvent = object()
 12
 13
 14 class ThreadPool(object):
 15
 16     def __init__(self, max_num):
 17         self.q = queue.Queue()#存放任务的队列
 18         self.max_num = max_num#最大线程并发数
 19
 20         self.terminal = False#如果为True 终止所有线程,不再获取新任务
 21         self.generate_list = [] #已经创建的线程
 22         self.free_list = []#闲置的线程
 23
 24     def run(self, func, args, callback=None):
 25         """
 26         线程池执行一个任务
 27         :param func: 任务函数
 28         :param args: 任务函数所需参数
 29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 30         :return: 如果线程池已经终止,则返回True否则None
 31         """
 32
 33         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: #无空闲线程和不超过最大线程数
 34             self.generate_thread() # 创建线程
 35         w = (func, args, callback,)#保存参数为元组
 36         self.q.put(w)#添加到任务队列
 37
 38     def generate_thread(self):
 39         """
 40         创建一个线程
 41         """
 42         t = threading.Thread(target=self.call)
 43         t.start()
 44
 45     def call(self):
 46         """
 47         循环去获取任务函数并执行任务函数
 48         """
 49         current_thread = threading.currentThread#获取当前线程对象
 50         self.generate_list.append(current_thread)#添加到已创建线程里
 51
 52         event = self.q.get() #获取任务
 53         while event != StopEvent: #如果不为停止信号
 54
 55             func, arguments, callback = event#分别取值,
 56             try:
 57                 result = func(*arguments) #运行函数,把结果赋值给result
 58                 status = True   #运行结果是否正常
 59             except Exception as e:
 60                 status = False #不正常
 61                 result = e  #结果为错误信息
 62
 63             if callback is not None: # 是否有回调函数
 64                 try:
 65                     callback(status, result) #执行回调函数
 66                 except Exception as e:
 67                     pass
 68
 69             if self.terminal: # 默认为False ,如果调用terminal方法
 70                 event = StopEvent #停止信号
 71             else:
 72                 # self.free_list.append(current_thread) #执行完毕任务,添加到闲置列表
 73                 # event = self.q.get()    #获取任务
 74                 # self.free_list.remove(current_thread) #获取到任务之后,从闲置里删除
 75                 with self.worker_state(self.free_list,current_thread):
 76                     event = self.q.get()
 77
 78
 79         else:
 80             self.generate_list.remove(current_thread) #如果收到终止信号,就从已创建的列表删除
 81
 82     def close(self): #终止线程
 83         num = len(self.generate_list) #获取总已创建的线程
 84         while num:
 85             self.q.put(StopEvent) #添加停止信号,有几个线程就添加几个
 86             num -= 1
 87
 88     # 终止线程(清空队列)
 89     def terminate(self):
 90
 91         self.terminal = True #更改为True,
 92
 93         while self.generate_list: #如果有已创建线程存活
 94             self.q.put(StopEvent) #有几个就发几个信号
 95         self.q.empty()  #清空队列
 96     @contextlib.contextmanager
 97     def worker_state(self,free_list,current_thread):
 98         free_list.append(current_thread)
 99         try:
100             yield
101         finally:
102             free_list.remove(current_thread)
103 import time
104
105 def work(i):
106     print(i)
107
108 pool = ThreadPool(10)
109 for item in range(50):
110     pool.run(func=work, args=(item,))
111 # pool.terminate()
112 pool.close()
时间: 2024-10-11 17:17:18

python自定义线程池的相关文章

自定义线程池

线程池: 自定义线程池一: #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_th

java多线程(四)-自定义线程池

当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. 1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new Synchro

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能

Java自定义线程池详解

自定义线程池的核心:ThreadPoolExecutor 为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制,其中在java.util.concurrent包下,是JDK并发包的核心,比如我们熟知的Executors.Executors扮演着线程工厂的角色,我们通过它可以创建特定功能的线程池,而这些线程池背后的就是:ThreadPoolExecutor.那么下面我们来具体分析下它. 构造ThreadPoolExecutor public ThreadP

JAVA并发,线程工厂及自定义线程池

1 package com.xt.thinks21_2; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.SynchronousQueue; 6 import java.util.concurrent.ThreadFactory; 7 import java.util.concurrent.ThreadPo

使用Lock(ReentrantLock)结合Condition实现自定义线程池

声明: 1.该篇只是提供一种自定义线程池的实现方式,可能性能.安全等方面需要优化: 2.该篇自定义线程池使用Lock(可重入锁ReentrantLock)结合Condition来实现: 3.该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢. 概述 自定义线程池三要素包括: 1.存储线程的容器(或叫线程池).该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现. 2.执行线程(或叫执行器).具体执行的线程. 3.执行任务.执行线程需要执行的具体任务. 代码 /** * 任务

Android 自定义线程池的实战

前言:在上一篇文章中我们讲到了AsyncTask的基本使用.AsyncTask的封装.AsyncTask 的串行/并行线程队列.自定义线程池.线程池的快速创建方式. 对线程池不了解的同学可以先看 Android AsyncTask 深度理解.简单封装.任务队列分析.自定义线程池 ------------------------------------------------------------------------------------------------------- 1.Exec

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

使用wait()和notifyAll()方法自定义线程池

声明: 1.该篇只是提供一种自定义线程池的实现方式,可能性能.安全等方面需要优化: 2.该篇自定义线程池使用的是wait()和notifyAll()方法,也可以使用Lock结合Condition来实现: 3.该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢. 概述 自定义线程池三要素包括: 1.存储线程的容器(或叫线程池).该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现. 2.执行线程(或叫执行器).具体执行的线程. 3.执行任务.执行线程需要执行的具体任务. 代码