异步 + 回调:就是把下载好的东西回调主进程执行 或者回调给线程,哪个线程闲着就执行
1 #进程的异步 + 回调 2 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 3 # 4 # import requests 5 # import os,time,random 6 # def get(url): 7 # print(‘%s get %s‘%(os.getpid(),url) ) 8 # 9 # response = requests.get(url) 10 # time.sleep(random.randint(1, 3)) 11 # 12 # if response.status_code == 200 : 13 # #干解析的活 只要下载完立刻进行解析 14 # return response.text 15 # 16 # def pasrse(obj): 17 # res = obj.result() 18 # print(‘%s 解析结果为:%s‘ %(os.getpid(),len(res))) 19 # 20 # if __name__ == ‘__main__‘: 21 # urls = [ 22 # ‘https://www.baidu.com/‘, 23 # ‘https://www.baidu.com/‘, 24 # ‘https://www.baidu.com/‘, 25 # ‘https://www.baidu.com/‘, 26 # ‘https://www.baidu.com/‘, 27 # ‘http://www.sina.com.cn/‘, 28 # ‘http://www.sina.com.cn/‘, 29 # ‘http://www.sina.com.cn/‘ 30 # ] 31 # pool = ProcessPoolExecutor(4) 32 # # objs = [] 33 # for url in urls: 34 # #把get函数和url任务扔进进程池 35 # obj = pool.submit(get,url) 36 # #提交完后给obj对象绑定了一个工具pasrse 37 # #任务有返回值就会自动运行,有结果立即调用解析方法pasrse,完成了解耦 38 # obj.add_done_callback(pasrse) 39 # 40 # print(‘主进程 %s‘%os.getpid()) 41 # # objs.append(obj) 42 # # res = pool.submit(get,url).result() 同步解析 43 # # pool.shutdown(wait=True) 44 # 45 # #问题 46 # #1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能继续统一进行处理 47 # #2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18秒 48 # 49 # # 串行了 50 # # for obj in objs: 51 # # res = obj.result() 52 # # pasrse(res) 53 54 55 56 #哪个线程闲着就用回调函数 57 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 58 from threading import current_thread 59 import requests 60 import os,time,random 61 def get(url): 62 print(‘%s get %s‘%(current_thread().name,url) ) 63 64 response = requests.get(url) 65 time.sleep(random.randint(1, 3)) 66 67 if response.status_code == 200 : 68 #干解析的活 只要下载完立刻进行解析 69 return response.text 70 71 def pasrse(obj): 72 res = obj.result() 73 print(‘%s 解析结果为:%s‘ %(current_thread().name,len(res))) 74 75 if __name__ == ‘__main__‘: 76 urls = [ 77 ‘https://www.baidu.com/‘, 78 ‘https://www.baidu.com/‘, 79 ‘https://www.baidu.com/‘, 80 ‘https://www.baidu.com/‘, 81 ‘https://www.baidu.com/‘, 82 ‘http://www.sina.com.cn/‘, 83 ‘http://www.sina.com.cn/‘, 84 ‘http://www.sina.com.cn/‘ 85 ] 86 pool = ThreadPoolExecutor(4) 87 # objs = [] 88 for url in urls: 89 #把get函数和url任务扔进进程池 90 obj = pool.submit(get,url) 91 #提交完后给obj对象绑定了一个工具pasrse 92 #任务有返回值就会自动运行,有结果立即调用解析方法pasrse,完成了解耦 93 obj.add_done_callback(pasrse) 94 95 print(‘主线程 %s‘%current_thread().name)
线程Queue:
1 import queue 2 3 q=queue.Queue(3) #队列:先进先出 4 q.put(1) 5 q.put(2) 6 q.put(3) 7 # q.put(4) 8 9 print(q.get()) 10 print(q.get()) 11 print(q.get()) 12 13 14 q=queue.LifoQueue(3) #堆栈:后进先出 15 16 q.put(‘a‘) 17 q.put(‘b‘) 18 q.put(‘c‘) 19 20 print(q.get()) 21 print(q.get()) 22 print(q.get()) 23 24 25 q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高 26 q.put((10,‘user1‘)) 27 q.put((-3,‘user2‘)) 28 q.put((-2,‘user3‘)) 29 30 31 print(q.get()) 32 print(q.get()) 33 print(q.get())
线程Event:event.wait()
1 from threading import Event,current_thread,Thread 2 import time 3 4 event=Event() 5 6 def check(): 7 print(‘%s 正在检测服务是否正常....‘ %current_thread().name) 8 time.sleep(5) 9 event.set() 10 11 12 def connect(): 13 count=1 14 while not event.is_set(): 15 if count == 4: 16 print(‘尝试的次数过多,请稍后重试‘) 17 return 18 print(‘%s 尝试第%s次连接...‘ %(current_thread().name,count)) 19 event.wait(1) 20 count+=1 21 print(‘%s 开始连接...‘ % current_thread().name) 22 23 if __name__ == ‘__main__‘: 24 t1=Thread(target=connect) 25 t2=Thread(target=connect) 26 t3=Thread(target=connect) 27 28 c1=Thread(target=check) 29 30 t1.start() 31 t2.start() 32 t3.start() 33 c1.start()
gevent:
1 from gevent import monkey;monkey.patch_all() 2 from threading import current_thread 3 import gevent 4 import time 5 6 def eat(): 7 print(‘%s eat 1‘ %current_thread().name) 8 time.sleep(5) 9 print(‘%s eat 2‘ %current_thread().name) 10 def play(): 11 print(‘%s play 1‘ %current_thread().name) 12 time.sleep(3) 13 print(‘%s play 2‘ %current_thread().name) 14 15 g1=gevent.spawn(eat) 16 g2=gevent.spawn(play) 17 18 # gevent.sleep(100) 19 # g1.join() 20 # g2.join() 21 print(current_thread().name) 22 gevent.joinall([g1,g2])
协程:
1、单线程下实现并发:协程
并发指的是多个任务看起来是同时运行的
并发实现的本质:切换 + 保存状态
并发、并行、串行
并发:看起来是同时运行,切换 + 保存状态
实现并行,4个cpu能够并行4个任务
串行:一个人完完整整地执行完毕才能运行下一个任务
1 import time 2 def consumer(): 3 ‘‘‘任务1:接收数据,处理数据‘‘‘ 4 while True: 5 x=yield 6 7 8 def producer(): 9 ‘‘‘任务2:生产数据‘‘‘ 10 g=consumer() 11 next(g) 12 for i in range(10000000): 13 g.send(i) 14 15 start=time.time() 16 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果 17 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. 18 producer() #1.0202116966247559 19 20 21 stop=time.time() 22 print(stop-start)
1 import time 2 def consumer(res): 3 ‘‘‘任务1:接收数据,处理数据‘‘‘ 4 pass 5 6 def producer(): 7 ‘‘‘任务2:生产数据‘‘‘ 8 res=[] 9 for i in range(10000000): 10 res.append(i) 11 12 consumer(res) 13 # return res 14 15 start=time.time() 16 #串行执行 17 res=producer() 18 stop=time.time() 19 print(stop-start)
1 # 纯计算的任务串行执行 2 import time 3 def task1(): 4 res=1 5 for i in range(1000000): 6 res+=i 7 8 def task2(): 9 res=1 10 for i in range(1000000): 11 res*=i 12 13 start=time.time() 14 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果 15 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. 16 task1() 17 task2() 18 stop=time.time() 19 print(stop-start) 20 21 22 23 # 纯计算的任务并发执行 24 import time 25 def task1(): 26 res=1 27 for i in range(1000000): 28 res+=i 29 yield 30 time.sleep(10000) 31 print(‘task1‘) 32 33 def task2(): 34 g=task1() 35 res=1 36 for i in range(1000000): 37 res*=i 38 next(g) 39 print(‘task2‘) 40 41 start=time.time() 42 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果 43 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. 44 task2() 45 stop=time.time() 46 print(stop-start)
单线程下实现遇到IO切换:
1 from greenlet import greenlet 2 import time 3 4 def eat(name): 5 print(‘%s eat 1‘ %name) 6 time.sleep(30) 7 g2.switch(‘alex‘) 8 print(‘%s eat 2‘ %name) 9 g2.switch() 10 def play(name): 11 print(‘%s play 1‘ %name) 12 g1.switch() 13 print(‘%s play 2‘ %name) 14 15 g1=greenlet(eat) 16 g2=greenlet(play) 17 18 g1.switch(‘egon‘)
原文地址:https://www.cnblogs.com/kermitjam/p/8977843.html
时间: 2024-11-10 07:35:35