py线程池

版本一:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading

class ThreadPool(object):

    def __init__(self, max_num=20):
        self.queue = Queue.Queue(max_num)
        for i in xrange(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)

"""
pool = ThreadPool(10)

def func(arg, p):
    print arg
    import time
    time.sleep(2)
    p.add_thread()

for i in xrange(30):
    thread = pool.get_thread()
    t = thread(target=func, args=(i, pool))
    t.start()
"""

版本二:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()

更多参见:twisted.python.threadpool

上下文管理:https://docs.python.org/2/library/contextlib.html

感谢http://www.cnblogs.com/wupeiqi/articles/4839959.html供参考

时间: 2024-10-09 23:04:13

py线程池的相关文章

Python之路【第八篇】python实现线程池

线程池概念 什么是线程池?诸如web服务器.数据库服务器.文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务.构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务.但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大.所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,“池”的概念使得人们可以定制一定量的资源,然后对这些资源

爬天极网开哥讲线程池和进程池.py

#导入多线程模块:import threadingimport osimport requests # 发送请求import timefrom bs4 import BeautifulSoup # 解析文本#导入线程池执行器和进程池执行器:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor#导入获取CPU的数量cpu_count模块的功能:from multiprocessing import cpu_cou

线程锁、线程池

一.线程(IO密集型工作多线程有用) 线程: 概述: 若一个文件从上到下顺序执行,则为串行执行,整个py文件实际上是一个主线程 若多线程,则可以并行执行,同一个时刻可以运行多个代码段 给每个client请求分配一个线程,则这些线程可以同时工作 多线程.多进程: 1.一个应用程序,可以有多进程和多线程:默认是单进程.单线程 2.单进程.多线程 : 多线程:IO操作(输入输出流,文件操作)有用,因为几乎不用cpu来调度,一般用多线程来提高并发 计算型操作,需要用到cpu调度执行,一般用多进程提高并发

python学习第十课续 :线程池

线程分步走 t=threading.Thread(target=fun,args=()) t.start() 执行流程:         threading.Thread(target=fun,args=()) à           self.__target = target          self.__name = str(name or _newname())          self.__args = args         t.start  à           _star

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

我对python线程池的理解

#!/usr/bin/env pythonfrom Queue import Queuefrom threading import Threadimport randomimport time def person(i,q):    while True:  #这个人一直处与可以接活干的状态        q.get()        print "Thread",i,"do_job"        time.sleep(random.randint(1,5))#每

使用线程池多线程爬取链接,检验链接正确性

我们网站大多数链接都是活链接都是运营配置的,而有的时候运营会将链接配置错误使访问出错,有时也会因为程序bug造成访问出错,因此对主站写了个监控脚本,使用python爬取主站设置的链接并访问,统计访问出错的链接,因为链接有上百个,所以使用了多线程进行,因为http访问是io密集型,所以python多线程还是可以很好的完成并发访问的. 首先是index.py 使用了线程池管理线程,做到了配置需要检验的链接,然后爬取配置的链接页面中的所有链接,同时因为可能子页面许多url链接是和主站重复的,也可以做剔

python 线程池使用

传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态. 一个线程的运行时间可以分为3部分:线程的启动时间.线程体的运行时间和线程的销毁时间.在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动.销毁和运行3个过程.这必然会增加系统相应的时间,降低了效率. 使用线程池:由于线程预先被创建并放入线程池中,同时处理完

27 Apr 18 GIL 多进程多线程使用场景 线程互斥锁与GIL对比 基于多线程实现并发的套接字通信 进程池与线程池 同步、异步、阻塞、非阻塞

27 Apr 18 一.全局解释器锁 (GIL) 运行test.py的流程: a.将python解释器的代码从硬盘读入内存 b.将test.py的代码从硬盘读入内存  (一个进程内装有两份代码) c.将test.py中的代码像字符串一样读入python解释器中解析执行 1 .GIL:全局解释器锁 (CPython解释器的特性) In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple na