No.36
今日概要
- 协程
- gevent模块
- asyncio模块
内容回顾
1.锁
- 互斥锁
- 一把锁不能在一个线程中连续
acquire
- 开销小
- 一把锁不能在一个线程中连续
- 递归锁
- 一把锁可以在一个线程中连续
acquire
多次,acquire
多少次就release
多少次。 - 开销大
- 一把锁可以在一个线程中连续
- 死锁现象
- 在线程中陷入阻塞并且永远无法结束阻塞的情况
- 形成原因
- 多把锁 + 交替使用
- 互斥锁在一个线程中连续
acquire
- 避免死锁
- 在一个线程中只用一把锁,并且每一次
acquire
之后都要release
。
- 在一个线程中只用一把锁,并且每一次
- 线程中导致数据不安全的情况
+= 、-= 、*= 、/=
相关的赋值运算- 多个线程对同一文件进行写操作
- 队列
- 先进先出 Queue
- 后进先出 LifoQueue
- 优先级 PriorityQueue
2.池
- 创建池
- 导入
from concurrent.futures import ProcessPoolExecutor
from concurrent.futrues import ThreadPoolExecutor
- tp = ThreadPoolExecutor(cpu*5)
- 导入
- 方法
obj = tp.submit(需要在子线程执行的函数名,参数)
- 获取返回值:
obj.result()
是一个阻塞方法 - 绑定回调函数:
obj.add_done_callback(子线程任务结束后需要继续执行的函数)
- 获取返回值:
ret = tp.map(需要在子线程执行的函数名,iterable)
- 迭代ret,可以获取所有的返回值。
tp.shutdown()
- 阻塞,直到池中所有任务执行完毕。
- 什么情况使用池
- 只需要一个子线程来完成一个任务,不建议使用线程池。
- 需要多个线程来完成大量任务且要达到一定的并发数,建议使用线程池。
- 根据程序的IO操作频率判定是否开启线程池
- 任务IO操作频繁,完成时间长且不确定,不建议使用线程池。
- socket
- 任务IO操作较少,完成时间短且确定,建议使用线程池。
- 爬虫
- 任务IO操作频繁,完成时间长且不确定,不建议使用线程池。
- 锁
- 所有在线程中使用的功能基本都不能在进程中使用
- 反之所有在进程中使用的功能基本都能在线程中使用
- 进程锁可以在线程中用,线程锁不可以在进程中用
- 回调函数
- 子线程任务执行完后,直接调用对应的回调函数。
- obj = submit(存在IO操作的函数)
- cpython解释器中由于GIL锁的原因,多线程无法实现并行;所以只能提高存在IO操作任务的效率。
- obj.add_done_callback(高计算型的函数)
- 对于高计算的任务没必要开启多线程,即使开了实际也是串行执行和不开没有区别。
import time from concurrent.futures import ThreadPoolExecutor def son(): print(666) time.sleep(3) return 888 def func(obj): print(obj) # obj.result() = 888 t = ThreadPoolExecutor(20) obj = t.submit(son) print(‘main:‘, obj) obj.add_done_callback(func) # def my_add_done_callback(obj , fn): # ret = obj.result() # fn(obj)
- 爬虫
- 爬取网页
- 需要进行数据传输和等待网络响应,存在IO操作。
- 交给子线程完成
- 分析网页
- 没有IO操作
- 交给回调函数完成
- 爬取网页
3.子进程中启动子线程
from multiprocessing import Process
from threading import Thread
def pfunc():
print(‘启动子进程执行任务‘)
Thread(target=tfunc).start()
def tfunc():
print(‘子进程中启动子线程执行任务‘)
if __name__ == ‘__main__‘:
Process(target=pfunc).start()
print(‘主进程‘)
4.总结
- 操作系统
- 计算机中所有资源都是由操作系统分配的
- 操作系统如何调度任务
- 时间分片
- 多道机制
- 提高CPU利用率是我们努力的目标
- 并发
- 进程
- 特点:开销大、数据隔离、资源分配单位、Cpython下可以并行(可以利用多核)
- 三状态:就绪、运行、阻塞
- multiprocessing模块
- Process
- 开启进程 start
- 等待进程结束 join
- Lock
- 多进程同时操作文件或同时访问数据库都会导致数据不安全现象发生。
- Queue
- IPC机制(Pipe、redis、memcache、rabbitmq、kafka)
- 生产者消费者模型
- Manager
- 提供数据共享机制
- Process
- 线程
- 特点:开销小、数据共享、CPU调度单位、Cpython下不能并行(不能利用多核)
- GIL锁
- 全局解释器锁
- 由Cpython提供
- 导致了一个进程中的多个线程在同一时刻只有一个能访问CPU
- threading模块
- Thread
- 开启线程 start
- 等待线程结束 join
- Lock
- 互斥锁:不能在同一线程中连续acquire,效率相对高。
- Rlock
- 递归锁:可以在同一线程中连续acquire,效率相对低。
- 死锁现象
- 如何发生
- 如何避免
- Thread
- queue模块
- Queue
- LifoQueue
- PriorityQueue
- 进程
- 池
- concurrent.futrues模块
- ProcessPoolExecutor
- ThreadPoolExecutor
- 实例化一个池
- tp = ThreadPoolExecutor(CPU核心数*2)
- 提交任务到池
- obj = tp.submit(fn, arg1, arg2, ...)
- 获取返回值
- obj.result()
- 回调函数
- obj.add_done_callback(fn)
- 阻塞等待池中任务都结束
- tp.shutdown()
- concurrent.futrues模块
- 概念
- IO操作
- 同步异步
- 阻塞非阻塞
内容详细
1.协程
协程的本质就是在单线程下,当一个任务遇到IO阻塞后通过用户控制切换到另一个任务去执行,以此来提升效率。
进程、线程、协程的区别:
- 进程线程:系统级别,通过操作系统控制切换
- 开销大,会增加操作系统压力
- 操作系统对IO操作的感知灵敏
- 协程:用户级别,通过Python代码控制切换
- 开销小,不会增加操作系统压力
- 用户对IO操作的感知不灵敏
- Cpython解释器下,线程和协程都不能利用多核
协程的切换方式:
- 协程:在一个线程中的多个任务能够相互切换,那么每一个任务就是一个协程。
- 原生python完成
asyncio模块
基于yield
实现切换def eat(): print(‘alex is eating‘) yield 1 print(‘alex finished eat‘) yield 2 def sleep(): g = eat() next(g) print(‘yuan is sleeping‘) print(‘yuan finished sleep‘) next(g) sleep()
- C语言完成
gevent模块
基于greenlet模块
实现切换from greenlet import greenlet def eat(): print(‘alex is eating‘) g2.switch() print(‘alex finished eat‘) def sleep(): print(‘yuan is sleeping‘) print(‘yuan finished sleep‘) g1.switch() g1 = greenlet(eat) g2 = greenlet(sleep) g1.switch()
2.gevent模块
import gevent
def eat():
print(‘alex is eating‘)
time.sleep(1)
print(‘alex finished eat‘)
g = gevent.spawn(eat) # 创建一个协程任务,遇到阻塞才会执行。
print(‘没有阻塞则永远不会执行协程任务‘)
import time
import gevent
def eat():
print(‘alex is eating‘)
time.sleep(1)
print(‘alex finished eat‘)
g = gevent.spawn(eat) # 创建协程任务,遇到阻塞执行。
time.sleep(1)
print(‘无法识别sleep是阻塞方法。‘)
import time
import gevent
from gevent import monkey
monkey.patch_all() # 识别所有阻塞
def eat():
print(‘alex is eating‘)
time.sleep(1)
print(‘alex finished eat‘)
g = gevent.spawn(eat) # 创建一个协程任务
time.sleep(1)
print(‘识别所有阻塞‘)
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print(‘alex is eating‘)
time.sleep(1)
print(‘alex finished eat‘)
def sleep():
print(‘yuan is sleeping‘)
time.sleep(1)
print(‘yuan finished sleep‘)
g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
g1.join() # 阻塞直到g1任务完成
g2.join() # 阻塞直到g2任务完成
gevent.joinall([g1, g2]) # 阻塞直到列表中的所有完成
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print(2)
time.sleep(0.5)
print(5)
def sleep():
print(3)
time.sleep(0.5)
print(6)
g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
print(1)
time.sleep(0.5)
print(4)
time.sleep(0.5)
print(7)
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print(‘alex is eating‘)
time.sleep(1)
print(‘alex finished eat‘)
g_l = []
for i in range(10):
g = gevent.spawn(eat) # 循环添加协程任务
g_l.append(g)
print(1)
gevent.joinall(g_l)
import time
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print(‘alex is eating‘)
time.sleep(1)
print(‘alex finished eat‘)
return ‘alex‘
def sleep():
print(‘yuan is sleeping‘)
time.sleep(1)
print(‘yuan finished sleep‘)
return ‘yuan‘
g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
gevent.joinall([g1, g2])
print(g1.value) # 获取返回值
print(g2.value)
3.asyncio模块
# 启动一个任务
import asyncio
async def demo():
print(‘start‘)
await asyncio.sleep(1)
print(‘end‘)
loop = asyncio.get_event_loop() # 创建一个事件循环对象
loop.run_until_complete(demo()) # 把demo任务丢到事件循环中执行
# 启动多个任务,无返回值。
import asyncio
async def demo():
print(‘start‘)
await asyncio.sleep(1)
print(‘end‘)
loop = asyncio.get_event_loop()
wait_obj = asyncio.wait([demo(), demo(), demo()])
loop.run_until_complete(wait_obj)
# 启动多个任务,有返回值。
import asyncio
async def demo():
print(‘start‘)
await asyncio.sleep(1)
print(‘end‘)
return 666
loop = asyncio.get_event_loop()
t1 = loop.create_task(demo())
t2 = loop.create_task(demo())
wait_obj = asyncio.wait([t1, t2])
loop.run_until_complete(wait_obj)
task_l = [t1, t2]
for t in task_l:
print(t.result())
# 谁先回来先取谁的结果
import asyncio
async def demo(i):
print(‘start‘)
await asyncio.sleep(1)
print(‘end‘)
return i, 666
async def main():
lst = []
for i in range(10):
task = asyncio.ensure_future(demo(i))
lst.append(task)
for ret in asyncio.as_completed(lst):
res = await ret
print(ret)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
4.IO多路复用
# 方式一
import requests
key_lst = [‘alex‘, ‘wusir‘, ‘yuan‘]
for item in key_lst:
ret = requests.get(‘https://www.baidu.com/s?wd=%s‘ % item)
print(ret.text)
# 方式二
import socket
def get_data(key):
client = socket.socket()
# 创建连接:和百度创建连接,阻塞。
client.connect((‘www.baidu.com‘, 80))
# 发送请求:告诉百度你要什么。
client.sendall(b‘GET /s?wd=%s HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘ % key)
# 接收数据:等着接收百度的回复。
chunk_list = []
while True:
chunk = client.recv(8096)
if not chunk:
break
chunk_list.append(chunk)
body = b‘‘.join(chunk_list)
print(body.decode(‘utf-8‘))
key_lst = [‘alex‘, ‘wusir‘, ‘yuan‘]
for item in key_lst:
get_data(item)
# 多线程并发
import threading
for item in key_lst:
t = threading.Thread(target=get_data, args=(item,))
t.start()
基于IO多路复用 + socket实现单线程并发请求
- IO多路复用
- 操作系统可以监听所有的IO请求状态
- socket
- 是否已经连接成功 → 可读
- 是否已经获得数据 → 可写
- socket
- 三种实现模式
- select:最多监听1024个socket对象,循环检测(水平触发)。
- poll:不限制监听个数,循环检测(水平触发)。
- epoll:不限制监听个数,回调方式检测(边缘触发)。
- 操作系统可以监听所有的IO请求状态
- socket非阻塞
对象.setblocking(False)
import socket
import select
client1 = socket.socket()
client1.setblocking(False) # 将原来阻塞的位置变成非阻塞
try:
client1.connect((‘www.baidu.com‘, 80))
except BlockingIOError as e:
pass
client2 = socket.socket()
client2.setblocking(False) # 将原来阻塞的位置变成非阻塞
try:
client2.connect((‘www.sogou.com‘, 80))
except BlockingIOError as e:
pass
socket_lst = [client1, client2]
conn_lst = [client1, client2]
while True:
r_lst, w_lst, e_lst = select.select(socket_lst, conn_lst, [], 0.005) # IO多路复用
# w_lst 表示已经连接成功的socket对象
for sk in w_lst:
if sk == client1:
sk.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘)
else:
sk.sendall(b‘GET /web?query=wusir HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n‘)
conn_lst.remove(sk)
# r_lst 表示已经返回数据的socket对象
for sk in r_lst:
chunk_lst = []
while True:
try:
chunk = sk.recv(8096)
if not chunk:
break
chunk_lst.append(chunk)
except BlockingIOError as e:
break
body = b‘‘.join(chunk_lst)
# print(‘>>>>>>>>‘, body.decode(‘utf-8‘))
print(‘>>>>>>>>‘, body)
sk.close()
socket_lst.remove(sk)
if not socket_lst:
break
5.总结
协程
- 协程称之为“微线程”,由开发者控制线程的执行流程。
- 协程自身无法实现并发,只有配合IO切换才能实现并发。
单线程并发方式
- 协程+IO切换
- gevent
- 基于事件循环的异步非阻塞框架
- Twisted
原文地址:https://www.cnblogs.com/elliottwave/p/12656278.html
时间: 2024-11-09 10:41:16