python—day32 异步 + 回调 、Event、gevent 、协程、单线程下实现遇到IO切换

异步 + 回调:就是把下载好的东西回调主进程执行 或者回调给线程,哪个线程闲着就执行

 1 #进程的异步 + 回调
 2 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 3 #
 4 # import requests
 5 # import os,time,random
 6 # def get(url):
 7 #     print(‘%s get %s‘%(os.getpid(),url) )
 8 #
 9 #     response = requests.get(url)
10 #     time.sleep(random.randint(1, 3))
11 #
12 #     if response.status_code == 200 :
13 #         #干解析的活 只要下载完立刻进行解析
14 #         return response.text
15 #
16 # def pasrse(obj):
17 #     res = obj.result()
18 #     print(‘%s 解析结果为:%s‘ %(os.getpid(),len(res)))
19 #
20 # if __name__ == ‘__main__‘:
21 #     urls = [
22 #         ‘https://www.baidu.com/‘,
23 #         ‘https://www.baidu.com/‘,
24 #         ‘https://www.baidu.com/‘,
25 #         ‘https://www.baidu.com/‘,
26 #         ‘https://www.baidu.com/‘,
27 #         ‘http://www.sina.com.cn/‘,
28 #         ‘http://www.sina.com.cn/‘,
29 #         ‘http://www.sina.com.cn/‘
30 #     ]
31 #     pool = ProcessPoolExecutor(4)
32 #     # objs = []
33 #     for url in urls:
34 #         #把get函数和url任务扔进进程池
35 #         obj = pool.submit(get,url)
36 #         #提交完后给obj对象绑定了一个工具pasrse
37 #         #任务有返回值就会自动运行,有结果立即调用解析方法pasrse,完成了解耦
38 #         obj.add_done_callback(pasrse)
39 #
40 #     print(‘主进程 %s‘%os.getpid())
41 #         # objs.append(obj)
42 #         # res = pool.submit(get,url).result() 同步解析
43 #     # pool.shutdown(wait=True)
44 #
45 #     #问题
46 #     #1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能继续统一进行处理
47 #     #2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18秒
48 #
49 #     # 串行了
50 #     # for obj in objs:
51 #     #     res = obj.result()
52 #     #     pasrse(res)
53
54
55
56 #哪个线程闲着就用回调函数
57 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
58 from threading import current_thread
59 import requests
60 import os,time,random
61 def get(url):
62     print(‘%s get %s‘%(current_thread().name,url) )
63
64     response = requests.get(url)
65     time.sleep(random.randint(1, 3))
66
67     if response.status_code == 200 :
68         #干解析的活 只要下载完立刻进行解析
69         return response.text
70
71 def pasrse(obj):
72     res = obj.result()
73     print(‘%s 解析结果为:%s‘ %(current_thread().name,len(res)))
74
75 if __name__ == ‘__main__‘:
76     urls = [
77         ‘https://www.baidu.com/‘,
78         ‘https://www.baidu.com/‘,
79         ‘https://www.baidu.com/‘,
80         ‘https://www.baidu.com/‘,
81         ‘https://www.baidu.com/‘,
82         ‘http://www.sina.com.cn/‘,
83         ‘http://www.sina.com.cn/‘,
84         ‘http://www.sina.com.cn/‘
85     ]
86     pool = ThreadPoolExecutor(4)
87     # objs = []
88     for url in urls:
89         #把get函数和url任务扔进进程池
90         obj = pool.submit(get,url)
91         #提交完后给obj对象绑定了一个工具pasrse
92         #任务有返回值就会自动运行,有结果立即调用解析方法pasrse,完成了解耦
93         obj.add_done_callback(pasrse)
94
95     print(‘主线程 %s‘%current_thread().name)

线程Queue:

 1 import queue
 2
 3 q=queue.Queue(3) #队列:先进先出
 4 q.put(1)
 5 q.put(2)
 6 q.put(3)
 7 # q.put(4)
 8
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12
13
14 q=queue.LifoQueue(3) #堆栈:后进先出
15
16 q.put(‘a‘)
17 q.put(‘b‘)
18 q.put(‘c‘)
19
20 print(q.get())
21 print(q.get())
22 print(q.get())
23
24
25 q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高
26 q.put((10,‘user1‘))
27 q.put((-3,‘user2‘))
28 q.put((-2,‘user3‘))
29
30
31 print(q.get())
32 print(q.get())
33 print(q.get())

线程Event:event.wait()

 1 from threading import Event,current_thread,Thread
 2 import time
 3
 4 event=Event()
 5
 6 def check():
 7     print(‘%s 正在检测服务是否正常....‘ %current_thread().name)
 8     time.sleep(5)
 9     event.set()
10
11
12 def connect():
13     count=1
14     while not event.is_set():
15         if count ==  4:
16             print(‘尝试的次数过多,请稍后重试‘)
17             return
18         print(‘%s 尝试第%s次连接...‘ %(current_thread().name,count))
19         event.wait(1)
20         count+=1
21     print(‘%s 开始连接...‘ % current_thread().name)
22
23 if __name__ == ‘__main__‘:
24     t1=Thread(target=connect)
25     t2=Thread(target=connect)
26     t3=Thread(target=connect)
27
28     c1=Thread(target=check)
29
30     t1.start()
31     t2.start()
32     t3.start()
33     c1.start()

gevent:

 1 from gevent import monkey;monkey.patch_all()
 2 from threading import current_thread
 3 import gevent
 4 import time
 5
 6 def eat():
 7     print(‘%s eat 1‘ %current_thread().name)
 8     time.sleep(5)
 9     print(‘%s eat 2‘ %current_thread().name)
10 def play():
11     print(‘%s play 1‘ %current_thread().name)
12     time.sleep(3)
13     print(‘%s play 2‘ %current_thread().name)
14
15 g1=gevent.spawn(eat)
16 g2=gevent.spawn(play)
17
18 # gevent.sleep(100)
19 # g1.join()
20 # g2.join()
21 print(current_thread().name)
22 gevent.joinall([g1,g2])

协程:

1、单线程下实现并发:协程

  并发指的是多个任务看起来是同时运行的

  并发实现的本质:切换 + 保存状态

  并发、并行、串行

  并发:看起来是同时运行,切换 + 保存状态

    实现并行,4个cpu能够并行4个任务

  串行:一个人完完整整地执行完毕才能运行下一个任务

 1 import time
 2 def consumer():
 3     ‘‘‘任务1:接收数据,处理数据‘‘‘
 4     while True:
 5         x=yield
 6
 7
 8 def producer():
 9     ‘‘‘任务2:生产数据‘‘‘
10     g=consumer()
11     next(g)
12     for i in range(10000000):
13         g.send(i)
14
15 start=time.time()
16 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
17 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
18 producer() #1.0202116966247559
19
20
21 stop=time.time()
22 print(stop-start)
 1 import time
 2 def consumer(res):
 3     ‘‘‘任务1:接收数据,处理数据‘‘‘
 4     pass
 5
 6 def producer():
 7     ‘‘‘任务2:生产数据‘‘‘
 8     res=[]
 9     for i in range(10000000):
10         res.append(i)
11
12     consumer(res)
13     # return res
14
15 start=time.time()
16 #串行执行
17 res=producer()
18 stop=time.time()
19 print(stop-start)
 1 # 纯计算的任务串行执行
 2 import time
 3 def task1():
 4     res=1
 5     for i in range(1000000):
 6         res+=i
 7
 8 def task2():
 9     res=1
10     for i in range(1000000):
11         res*=i
12
13 start=time.time()
14 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
15 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
16 task1()
17 task2()
18 stop=time.time()
19 print(stop-start)
20
21
22
23 # 纯计算的任务并发执行
24 import time
25 def task1():
26     res=1
27     for i in range(1000000):
28         res+=i
29         yield
30         time.sleep(10000)
31         print(‘task1‘)
32
33 def task2():
34     g=task1()
35     res=1
36     for i in range(1000000):
37         res*=i
38         next(g)
39         print(‘task2‘)
40
41 start=time.time()
42 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
43 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
44 task2()
45 stop=time.time()
46 print(stop-start)

单线程下实现遇到IO切换:

 1 from greenlet import greenlet
 2 import time
 3
 4 def eat(name):
 5     print(‘%s eat 1‘ %name)
 6     time.sleep(30)
 7     g2.switch(‘alex‘)
 8     print(‘%s eat 2‘ %name)
 9     g2.switch()
10 def play(name):
11     print(‘%s play 1‘ %name)
12     g1.switch()
13     print(‘%s play 2‘ %name)
14
15 g1=greenlet(eat)
16 g2=greenlet(play)
17
18 g1.switch(‘egon‘)

原文地址:https://www.cnblogs.com/kermitjam/p/8977843.html

时间: 2024-11-10 07:35:35

python—day32 异步 + 回调 、Event、gevent 、协程、单线程下实现遇到IO切换的相关文章

day 32异步+回调、线程queue、线程Event、协程、单线程下实现遇到IO切换

一.异步+回调:线程是谁空谁调,进程是主进程调用 from concurrent.futures import ProcessPoolExcutor,ThreadPoolExecutor from threading import current_thread import requests,os,time,random def get(url): print('%s GET %s'%(current_thread().name,url)) response=requests.get(url)

异步调用与回调机制,协程

1.异步调用与回调机制 上一篇我们已经了解到了两组比较容易混淆的概念问题,1.同步与异步调用 2.阻塞与非阻塞状态.在说到异步调用的时候,说到提交任务后,就直接执行下一行代码,而不去拿结果,这样明显存在缺陷,结果是肯定要拿的,这辈子都肯定是要拿到这个结果的,没有这个结果后面的活又不会干,没办法,只能去拿结果的,那么问题是异步调用提交任务后,如何实现既要拿到结果又不需要原地等的理想状态呢?专门为异步调用配备了一个方法--回调机制 先来想想我们之前是怎么拿到一个函数的结果,就传给另外一个函数取执行,

python gevent 协程

简介 没有切换开销.因为子程序切换不是线程切换,而是由程序自身控制,没有线程切换的开销,因此执行效率高, 不需要锁机制.因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多 Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程. yield 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁. 如果改用协程,生产者生产消息后,直接通过y

python的gevent协程

gevent Python通过yield提供了对协程的基本支持,但是不完全.而第三方的gevent为Python提供了比较完善的协程支持 gevent是第三方库,通过greenlet实现协程,其基本思想是: 当一个greenlet遇到IO操作时(比如访问网络),就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行.由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO. import ge

python38 1.线程一堆队列 2.事件Event 3.协程 4.断点续传

复习 1.GIL锁 2.如何避免GIL锁给程序带来的效率影响 3.与自定义锁的区别 4. 线程池进程池 5 同步  异步 6.异步回调 1.GIL锁 ? 全局解释器锁,   用来锁住解释器的互斥锁 ? 为啥加: CPython 中内存管理是非线程安全的,  GIL是为了   保护解释器的数据不被并发修改 ? 加锁后的问题:导致多个线程无法并行执行, 降低了效率 ? 当然 是可以并发的 ? 2.如何避免GIL锁给程序带来的效率影响 ? 什么时候会影响效率 ?  如果是计算密集型任务,开多线程不能提

python2.0_s12_day9_协程&Gevent协程

Python之路,Day9 - 异步IO\数据库\队列\缓存 本节内容 Gevent协程 Select\Poll\Epoll异步IO与事件驱动 Python连接Mysql数据库操作 协程 1.协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是协程:协程是一种用户态的轻量级线程.(操作系统跟不知道它存在),那你指定协程的实现原理是什么吗? 我们来聊聊协程的实现原理: 首先我们知道多个线程在一个单核CPU上进行并发,它的操作过程是,操作系统能调动的最小单位是线程,当操作系统触发多个线

线程queue、事件event及协程

线程queue.事件event及协程 线程queue 多线程抢占资源,让其保持串行的两种方式: ? 1.互斥锁 ? 2.队列 线程队列分为以下三种: 1.Queue(先进先出) import queue q = queue.Queue(3) q.put(1) q.put(2) q.put(3) # q.put(4,block=False) # 若不设置block参数,默认为True,大于队列长度进入阻塞状态,若设置block为False,大于对列长度直接报错 print(q.get()) pri

Python:线程、进程与协程(4)——multiprocessing模块(1)

multiprocessing模块是Python提供的用于多进程开发的包,multiprocessing包提供本地和远程两种并发,通过使用子进程而非线程有效地回避了全局解释器锁. (一)创建进程Process 类 创建进程的类,其源码在multiprocessing包的process.py里,有兴趣的可以对照着源码边理解边学习.它的用法同threading.Thread差不多,从它的类定义上就可以看的出来,如下: class Process(object):     '''     Proces

Python:线程、进程与协程(2)——threading模块(1)

上一篇博文介绍了Python中线程.进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块.首先来看看threading模块有哪些方法和类吧. 主要有: Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能. Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类. Lock :原锁,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用. RLock :可重入锁