基于队列queue实现的线程池

本文通过文章同步功能推送至博客园,显示排版可能会有所错误,请见谅!

写在前文:在Python中给多进程提供了进程池类,对于线程,Python2并没有直接提供线程池类(Python3中提供了线程池功能),而线程池在并行中应用较广泛,因此实现一个进程池的功能十分必要。本文基于队列(queue)功能来实现线程池功能。

在Python3标准库中提供了线程池、进程池功能,推荐使用标准库。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

实现代码:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
__auth__ = "SongWei"
import threading,queue,time

class Threadpool:
    ‘‘‘基于队列queue实现的线程池‘‘‘

    def __init__(self,max_thread=1):
        ‘‘‘创建进程队列‘‘‘
        self.queue = queue.Queue(maxsize=max_thread)

    def apply(self,target=None,args=(),callback=None,calljoin=True,**kwargs):
        ‘‘‘:param callback 回调函数 当子线程函数运行结束后将返回值传入回调函数
            :param calljoin 布尔值  回调函数是否阻塞进程池 默认True 只有当目标函数和回调函数都执行结束后才视为该线程结束
            其他参数同threading.Thread类
            注意:只有当目标函数和回调函数都执行结束后,消息队列才会取回值(即回调函数会阻塞线程池)
        ‘‘‘
        if not callback:
            callback = self._callback
        t = threading.Thread(target=self._decorate(target,callback,calljoin),args=args,**kwargs)
        self.queue.put(t)
        t.start()

    def join(self):
        ‘‘‘
            当线程池中还有未执行结束的子线程时 阻塞主线程
            注意:当calljoin=False时 因回调函数在消息队列取回后才执行 故join不会等待回调函数
        ‘‘‘
        while self.queue.qsize():
            time.sleep(0.05)

    def _decorate(self,target,callback,calljoin):
        ‘‘‘:param target 接收一个目标函数
            :param callback 接受一个回调函数
            :param backjoin 布尔值 若为真 则当回调函数执行结束后才释放队列 否则 当目标函数执行结束后就会释放队列
            本函数本质上是一个装饰器,即运行目标函数后,执行队列取回(self.queque.get()),并将返回值作为参数执行回调函数。
        ‘‘‘
        def wrapper(*args,**kwargs):
            res = target(*args,**kwargs)
            if calljoin:
                callback(res)
                self.queue.get()
            else:
                self.queue.get()
                callback(res)
            return res
        return wrapper

    def _callback(self,*args,**kwargs):
        ‘‘‘没有传入回调函数时 什么也不干‘‘‘
        pass

调用示例:
result_list = []
def func(arg):
    print(‘正在等待执行%s‘ % arg)
    time.sleep(10)
    return arg

def back(res):
    print(‘我已经取回了数据:%s‘ % res)
    result_list.append(res)

pool = Threadpool(max_thread=20)
for i in range(40):
    pool.apply(target=func,args=(i,),callback=back)
pool.join()
print(result_list)

原文地址:https://www.cnblogs.com/lazyfish007/p/11487443.html

时间: 2024-10-18 03:39:14

基于队列queue实现的线程池的相关文章

同一个进程内的队列(多线程) 线程池

一.同一个进程内的队列(多线程) import queue queue.Queue()   先进先出 queue.LifoQueue()  后进先出 queue.PriorityQueue()   优先级队列 优先级队列   q = queue.PriorityQueue() q.put((pri , data))   接收的是一个元祖 元祖中第一个参数是:表示当前数据的优先级 元祖中第二个参数是:需要存放到队列中的数据 优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致) 如果

一个C++基于boost简单实现的线程池

xl_blocking_queue.h ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #ifndef SRC_COMMON_BLOCKING_QUEUE_H_ #define SRC_COMMON_BLOCKING_QUEUE_H_ #

python全栈开发 * 线程队列 线程池 协程 * 180731

一.线程队列 队列:1.Queue 先进先出 自带锁 数据安全 from queue import Queue from multiprocessing import Queue (IPC队列)2.LifoQueue后进先出 后进先出 自带锁 数据安全 from queue import LifoQueue lq=LifoQueue(5) lq.put(123) lq.put(666) lq.put(888) lq.put(999) lq.put("love") print(lq.pu

Python 线程----线程方法,线程事件,线程队列,线程池,GIL锁,协程,Greenlet

主要内容: 线程的一些其他方法 线程事件 线程队列 线程池 GIL锁 协程 Greenlet Gevent 一. 线程(threading)的一些其他方法 from threading import Thread import threading import time def work(): time.sleep(1) print("子线程对象>>>", threading.current_thread()) # 子线程对象 print("子线程名称>

Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具

锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = false; private Thread lockingThread = null; public synchronized void lock() throws InterruptedExpection { while(isLocked){ wait(); } isLocked = true; loc

Callable,阻塞队列,线程池问题

一.说说Java创建多线程的方法 1. 通过继承Thread类实现run方法   2. 通过实现Runnable接口 3. 通过实现Callable接口 4. 通过线程池获取 二. 可以写一个Callable的案例吗?如何调用Callable接口 /*是一个带返回值的多线程类,如果需要有线程返回的结果,就需要使用此类*/ class MyThread implements Callable<Integer> { @Override public Integer call() { return

线程池队列饱和策略

1.当一个有限队列充满后,线程池的饱和策略开始起作用. 2.ThreadPoolExecutor的饱和策略通过调用setRejectedExecutionHandler来修改.不同的饱和策略如下: 1)AbortPolicy:中止,executor抛出未检查RejectedExecutionException,调用者捕获这个异常,然后自己编写能满足自己需求的处理代码. 2)DiscardRunsPolicy:遗弃最旧的,选择丢弃的任务,是本应接下来就执行的任务. 3)DiscardPolicy:

深入理解Java之线程池

原作者:海子 出处:http://www.cnblogs.com/dolphin0520/ 本文归作者海子和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可

java多线程(四)-自定义线程池

当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. 1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new Synchro