Python程序中的线程操作(线程池)-concurrent模块

目录

  • Python程序中的线程操作(线程池)-concurrent模块
  • 一、Python标准模块——concurrent.futures
  • 二、介绍
  • 三、基本方法
  • 四、ProcessPoolExecutor
  • 五、ThreadPoolExecutor
  • 六、map的用法
  • 七、回调函数

Python程序中的线程操作(线程池)-concurrent模块

一、Python标准模块——concurrent.futures

官方文档:https://docs.python.org/dev/library/concurrent.futures.html

二、介绍

concurrent.futures模块提供了高度封装的异步调用接口

ThreadPoolExecutor:线程池,提供异步调用

ProcessPoolExecutor:进程池,提供异步调用

Both implement the same interface, which is defined by the abstract Executor class.

三、基本方法

submit(fn, *args, **kwargs):异步提交任务

map(func, *iterables, timeout=None, chunksize=1):取代for循环submit的操作

shutdown(wait=True):相当于进程池的pool.close()+pool.join()操作

  • wait=True,等待池内所有任务执行完毕回收完资源后才继续
  • wait=False,立即返回,并不会等待池内的任务执行完毕
  • 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
  • submit和map必须在shutdown之前

result(timeout=None):取得结果

add_done_callback(fn):回调函数

done():判断某一个线程是否完成

cancle():取消某个任务

四、ProcessPoolExecutor

介绍

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os, time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

五、ThreadPoolExecutor

介绍

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=‘‘)
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
与ProcessPoolExecutor相同
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    # print(f'{currentThread().name} 在执行任务{i}')
    # 进程
    print(f'进程 {current_process().name} 在执行任务 {i}')
    time.sleep(2)
    return i * 2

if __name__ == '__main__':
    # 池子里只有四个线程
    # pool = ThreadPoolExecutor(4) # 池子里面有4个线程

    # 池子里有四个进程
    pool = ProcessPoolExecutor(4)

    fu_list = []

    for i in range(20):
        # task任务要做20次, 4个进程负责做这个事
        future = pool.submit(task, i)  # task任务要做20次,4个进程负责做这个事情
        fu_list.append(future)

    # 关闭池的入口, 会等待所有的任务执行完,结束阻塞
    pool.shutdown()
    for fu in fu_list:
        print(fu.result())

六、map的用法

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import os, time, random

def task(n):
    print('%s is runing' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n ** 2

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    res = executor.map(task, range(1, 12))  # map取代了for+submit
    executor.shutdown()
    for r in res:
        print(r)

54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
1
4
9
16
25
36
49
64
81
100
121

七、回调函数

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)

if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

原文地址:https://www.cnblogs.com/randysun/p/12258457.html

时间: 2024-10-07 16:19:46

Python程序中的线程操作(线程池)-concurrent模块的相关文章

Python程序中的进程操作-进程池(multiprocess.Pool)

Python程序中的进程操作-进程池(multiprocess.Pool) 一.进程池 为什么要有进程池?进程池的概念. 在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务.那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间.第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率.因此我们不能无限制的根据任务开启或者结束进程.那么我们要怎么做呢? 在这里,要给大家介

Python程序中的进程操作-开启多进程(multiprocess.process)

目录 一.multiprocess模块 二.multiprocess.process模块 三.process模块介绍 3.1 方法介绍 3.2 属性介绍 3.3 在windows中使用process模块的注意事项 四.使用process模块创建进程 4.1 在Python中启动的第一个子进程 4.2 join方法 4.3 查看主进程和子进程的进程号 4.4 多个进程同时运行 4.5 多个进程同时运行,再谈join方法(1) 4.6 多个进程同时运行,再谈join方法(2) 4.7 通过继承Pro

110 python程序中的进程操作-开启多进程

之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程.所有的进程都是通过它的父进程来创建的.因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程.多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快.以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块. 一.multiprocess模块 仔细说来,multipro

112 python程序中的进程操作-进程之间进行通信(mulitiProcessing Queue队列)

一.进程间通信 IPC(Inter-Process Communication) IPC机制:实现进程之间通讯 管道:pipe 基于共享的内存空间 队列:pipe+锁的概念--->queue 二.队列(Queue) 2.1 概念-----multiProcess.Queue 创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. Queue([maxsize])创建共享的进程队列. 参数 :maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制

113 python程序中的进程操作-进程间数据共享(multiProcess.Manger)

一.进程之间的数据共享 展望未来,基于消息传递的并发编程是大势所趋 即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据. 这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中. 但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题. 以后我们会尝试使用数据库来解决现在进程之间的数据共享问题. 1.1 Manager模块介绍 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于

111 python程序中的进程操作-多进程同步(mulitProcessing Lock锁)

通过学习,我们使用各种方法实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制.尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题. 一.锁的基础使用 1.1多个进程抢占数据资源 from multiprocessing import Process import os import time import random def work(n): print

Python程序中的进程操作-进程同步(multiprocess.Lock)

目录 一.锁--multiprocess.Lock 一.锁--multiprocess.Lock 原文地址:https://www.cnblogs.com/nickchen121/p/11130253.html

Python程序中的线程操作-锁

Python程序中的线程操作-锁 一.同步锁 1.1多个线程抢占资源的情况 from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l:

Python程序中的线程操作-concurrent模块

Python程序中的线程操作-concurrent模块 一.Python标准模块--concurrent.futures 官方文档:https://docs.python.org/dev/library/concurrent.futures.html 二.介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor:进程池,提供异步调用 两者都实现相同的接口,该接口由抽象Execut