线程
- 队列:先进先出
- 堆栈:后进先出
- 优先级:数字越小优先级越大,越先输出
import queue q = queue.Queue(3) # 先进先出-->队列 q.put(‘first‘) q.put(2) # q.put(‘third‘) # q.put(4) #由于没有人取走,就会卡主 q.put(4,block=False) #等同于q.get_nowait(), Ture 阻塞,Flase不阻塞,报异常满了 # # q.put(4,block=True,timeout=3) print(q.get()) print(q.get()) print(q.get()) print(q.get(block=True,timeout=3)) # 阻塞等待3秒 没有取走数据就报异常 # print(q.get(block=False)) #等同于q.get_nowait() # print(q.get_nowait()) q = queue.LifoQueue(3) #后进先出-->堆栈 q.put(‘first‘) q.put(2) q.put(‘third‘) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 打印结果: third 2 first ‘‘‘ q = queue.PriorityQueue(3) #优先级队列 q.put((10,‘one‘)) q.put((40,‘two‘)) q.put((30,‘three‘)) print(q.get()) print(q.get()) print(q.get()) ‘‘‘ 数字越小优先级越高 打印结果 (10, ‘one‘) (30, ‘three‘) (40, ‘two‘) ‘‘‘
线程queue
进程池线程池
- 池:是用来对进程(线程)的数量加以限制
- 进程池:计算密集型,用多进程
- 线程池:IO密集型,用多线程,例如:sockect网络通信就应该用多线程
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random ‘‘‘ sockect网络通信是IO操作,所以用多线程 计算密集型:用多进程 ‘‘‘ def task(name): print(‘name:%s pid:%s run‘ %(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task,‘yang%s‘ %i) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口关掉 计数器-1 print(‘主‘) ‘‘‘ 打印结果: name:yang0 pid:11120 run name:yang1 pid:11120 run name:yang2 pid:11120 run name:yang3 pid:11120 run name:yang4 pid:11120 run name:yang5 pid:11120 run name:yang6 pid:11120 run name:yang7 pid:11120 run name:yang8 pid:11120 run name:yang9 pid:11120 run 主 ‘‘‘ from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,random def task(): print(‘name:%s pid:%s run‘ %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口关掉 计数器-1 print(‘主‘) ‘‘‘ 打印结果: name:ThreadPoolExecutor-0_0 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_0 pid:14052 run 主 ‘‘‘
进程池|线程池
同步调用和异步调用
提交任务的两种方式:
- 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
- 异步调用:提交完任务后,不在原地等待任务执行完。回调机制:自动触发
#1.同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print(‘%s is laing‘ %name) time.sleep(random.randint(3,5)) res = random.randint(7,13)*‘#‘ return {‘name‘:name,‘res‘:res} def weigh(shit): name = shit[‘name‘] size = len(shit[‘res‘]) print(‘%s 拉了 <%s>kg‘ %(name,size)) if __name__ == ‘__main__‘: pool = ThreadPoolExecutor(10) shit1 = pool.submit(la,‘alex‘).result() weigh(shit1) shit2 = pool.submit(la,‘yang‘).result() weigh(shit2) shit3 = pool.submit(la,‘hang‘).result() weigh(shit3) ‘‘‘ 打印结果: alex is laing alex 拉了 <8>kg yang is laing yang 拉了 <8>kg hang is laing hang 拉了 <7>kg ‘‘‘
同步调用
#2.异步调用:提交完任务后,不在原地等待任务执行完 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print(‘%s is laing‘ %name) time.sleep(random.randint(3,5)) res = random.randint(7,13)*‘#‘ return {‘name‘:name,‘res‘:res} # weigh({‘name‘:name,‘res‘:res}) # 这样写,所有功能 不能体现出解耦合 def weigh(shit): shit = shit.result() # 拿到是一个对象,需要进行result() name = shit[‘name‘] size = len(shit[‘res‘]) print(‘%s 拉了 <%s>kg‘ %(name,size)) if __name__ == ‘__main__‘: pool = ThreadPoolExecutor(10) shit1 = pool.submit(la,‘alex‘).add_done_callback(weigh) shit2 = pool.submit(la,‘yang‘).add_done_callback(weigh) shit3 = pool.submit(la,‘hang‘).add_done_callback(weigh) ‘‘‘ 打印结果: alex is laing yang is laing hang is laing hang 拉了 <10>kg alex 拉了 <7>kg yang 拉了 <12>kg ‘‘‘
异步调用
异步调用的应用
from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): print(‘GET %s‘%url) response = requests.get(url) time.sleep(3) return {‘url‘:url,‘content‘:response.text} def parse(res): res = res.result() print(‘%s parse res is %s‘ %(res[‘url‘],len(res[‘content‘]))) if __name__ == ‘__main__‘: urls = [ ‘http://www.cnblogs.com/linhaifeng‘, ‘https://www.python.org‘, ‘https://www.openstack.org‘, ] pool = ThreadPoolExecutor(2) for url in urls: pool.submit(get,url).add_done_callback(parse) ‘‘‘ 打印结果: GET http://www.cnblogs.com/linhaifeng GET https://www.python.org http://www.cnblogs.com/linhaifeng parse res is 16320 GET https://www.openstack.org https://www.python.org parse res is 49273 https://www.openstack.org parse res is 64040 ‘‘‘
应用
原文地址:https://www.cnblogs.com/Mryang123/p/8921962.html
时间: 2024-11-05 16:35:08