自定义进程池的方法

一 、比较low的线程池

import queue,time,threading
class My_theading_pool(object):
    def __init__(self,num = 20):
        self.queue = queue.Queue(num) #在类中分装一个队列,队列中最多容纳20
        for i in range(num):
            self.queue.put(threading.Thread) #在队列的20个位置上放置线程

    def get_thead(self):
        return self.queue.get()   #从队列中去取值

    def add_thead(self):  #线程用完以后,再往队列中添加线程
        return self.queue.put(threading.Thread)

def func(pool,a1):
    time.sleep(1)
    print(a1)
    pool.add_thead()            

t = My_theading_pool(10)  #创建一个里面有10个线程的线程池
for i in range(100):      #创建100个任务
    t1 = t.get_thead()    #每一次任务执行的时候都会去队列中去取出来一个线程
    t2 = t1(target = func,args = (t,i))  #取出来的每一个线程都执行一个func函数,并把类对象当做参数传入函数中,函数中再执行add方法  
    t2.start()

注:定义一个类,创建一个类对象,在对象中封装一个队列,定义队列的容量,并利用循环往队列中添加线程,利用循环创建任务,并从队列中获取线程执行函数,并把类对象传入函数中   执行完一个任务后,在利用传进来的类对象执行add函数获取一个线程执行任务,执行完毕后再往队列中添加新的线程

二 、比较高大上的线程池

import queue
import threading
import contextlib
import time
StopEvent = object()
class ThreadPool(object):
    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num
        self.terminal = False
        self.generate_list = []
        self.free_list = []
    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                status = True
            except Exception as e:
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            self.free_list.append(current_thread)
            event = self.q.get()
            self.free_list.remove(current_thread)
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put(StopEvent)
            num -= 1

import time

def work(i):
    print(i)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,))

pool.close()
时间: 2025-01-02 00:33:52

自定义进程池的方法的相关文章

Untiy3D联网插件——Photon的自定义对象池使用方法

本文章由cartzhang编写,转载请注明出处. 所有权利保留. 文章链接:http://blog.csdn.net/cartzhang/article/details/68068178 作者:cartzhang 一. 写在前面 最开始接触Photon的时候,没有怎么理解代码,我们自己的写的对象池与Photon结合使用起来非常不方便. 需要每次从池里取对象,然后手动设置ViewID,这样很烦人,从感觉来说,就是photon的打开方式不对. 直到有天再次耐心去读了Photon的代码才有发现,感觉是

类+进程池的方法爬取喜马拉雅

python是一门面向对象的语言,那么我们在写爬虫的时候自然也可以用到类的封装来实现爬虫. 一.类的使用 首先是对类的封装,可以将一些请求头写入构造函数当中(因为后面有进程池使用,所以构造函数不需要带参数,如果带上在后面的进程池中,也会报错). 二.Ajax数据处理 喜马拉雅的音乐也是采用Ajax的数据交互方式,所以需要用到urllib来构造请求的网址,因此在类的函数中需要传递一个参数,来控制访问的页面的内容. 1 data = { 2 "albumId": 3627097, 3 &q

第36篇 多进程的数据共享,进程池的回调函数,线程 什么是GIL锁,Threading模块记

内容概览: 进程 数据共享 进程池--回调函数 线程 线程的基础理论 什么是线程? 线程与进程的关系 GIL锁 线程的开启: Threading模块1,用多进程开启socket创建聊天 server端写了input函数会报错?因为服务器是高速运行的,自动化的为来访问的客户端提供服务, 不可能停下来等待管理员的输入,然后发送给客户.这就失去了自动化的意义. 2,进程池Pool()方法创建的进程,map()方法是否有返回值? p.map()得到的是迭代对象 import time from mult

python37 1.GIL--全局解释器锁 2.GIL带来的问题 3.为什么需要GIL 4.GIL的加锁解锁时机 5.关于GIL的性能的讨论 6.线程常用方法 7.GIL锁与自定义锁的区别 8.进程池与线程池 9.同步异步 10.异步调用

复习1.JoinableQueue--可以被join的队列2.多线程3线程的使用方法与进程一模一样3.1守护线程3.2线程安全问题3.3解决方案3.3.1互斥锁mutex3.3.2递归锁Rlock3.3.3信号量semaphore3.3.4死锁问题 详解:1.JoinableQueue--可以被join的队列 1.1join是等待任务结束 队列怎么叫结束 调用task_done一次则表示有一个数据被处理完成了,当task_done次数等于put的次数就意味着任务处理完成了 1.2这就是join的

python进程、线程、协程以及几种自定义线程池

Python线程 Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元. #!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time    def show(arg):     time.sleep(1)     print 'thread'+str(arg)    for i in range(10):     t = threading.Thread(target=show, args

使用wait()和notifyAll()方法自定义线程池

声明: 1.该篇只是提供一种自定义线程池的实现方式,可能性能.安全等方面需要优化: 2.该篇自定义线程池使用的是wait()和notifyAll()方法,也可以使用Lock结合Condition来实现: 3.该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢. 概述 自定义线程池三要素包括: 1.存储线程的容器(或叫线程池).该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现. 2.执行线程(或叫执行器).具体执行的线程. 3.执行任务.执行线程需要执行的具体任务. 代码

第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法

内容大纲: 进程之间的通讯 进程队列 管道 进程之间的数据共享 进程池 使用进程池 开启进程 提交任务 获得返回值 回调函数1.进程队列 先进先出 from multiprocessing import Queue import queue q = Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) 1 2 3 from multiprocessing import Queue impor

最方便建立进程池,线程池的方法

建立进程池,线程池: 进程池from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,random,os def task(n): print('%s is running'%os.getpid()) time.sleep(5) return n**2 if __name__ == '__main__': p = ProcessPoolExecutor() l=[] start=time.t

python 进程池的简单使用方法

回到python,用一下python的进程池. 记得之前面试的时候,面试官问:你知道进程池的默认参数吗? 我没有回答上来,后来才知道,是有默认参数的.下面就看看它的默认参数 1. 不加参数 from multiprocessing.pool import Pool from time import sleep def fun(a): sleep(5) print(a) if __name__ == '__main__': p = Pool() # 这里不加参数,但是进程池的默认大小,等于电脑CP