No.36协程

No.36

今日概要

  • 协程
  • gevent模块
  • asyncio模块

内容回顾

1.锁

  • 互斥锁

    • 一把锁不能在一个线程中连续acquire
    • 开销小
  • 递归锁
    • 一把锁可以在一个线程中连续acquire多次,acquire多少次就release多少次。
    • 开销大
  • 死锁现象
    • 在线程中陷入阻塞并且永远无法结束阻塞的情况
    • 形成原因
      • 多把锁 + 交替使用
      • 互斥锁在一个线程中连续acquire
  • 避免死锁
    • 在一个线程中只用一把锁,并且每一次acquire之后都要release
  • 线程中导致数据不安全的情况
    • += 、-= 、*= 、/= 相关的赋值运算
    • 多个线程对同一文件进行写操作
  • 队列
    • 先进先出 Queue
    • 后进先出 LifoQueue
    • 优先级 PriorityQueue

2.池

  • 创建池

    • 导入

      • from concurrent.futures import ProcessPoolExecutor
      • from concurrent.futrues import ThreadPoolExecutor
    • tp = ThreadPoolExecutor(cpu*5)
  • 方法
    • obj = tp.submit(需要在子线程执行的函数名,参数)

      • 获取返回值:obj.result()是一个阻塞方法
      • 绑定回调函数:obj.add_done_callback(子线程任务结束后需要继续执行的函数)
    • ret = tp.map(需要在子线程执行的函数名,iterable)
      • 迭代ret,可以获取所有的返回值。
    • tp.shutdown()
      • 阻塞,直到池中所有任务执行完毕。
  • 什么情况使用池
    • 只需要一个子线程来完成一个任务,不建议使用线程池。
    • 需要多个线程来完成大量任务且要达到一定的并发数,建议使用线程池。
    • 根据程序的IO操作频率判定是否开启线程池
      • 任务IO操作频繁,完成时间长且不确定,不建议使用线程池。

        • socket
      • 任务IO操作较少,完成时间短且确定,建议使用线程池。
        • 爬虫
    • 所有在线程中使用的功能基本都不能在进程中使用
    • 反之所有在进程中使用的功能基本都能在线程中使用
    • 进程锁可以在线程中用,线程锁不可以在进程中用
  • 回调函数
    • 子线程任务执行完后,直接调用对应的回调函数。
    • obj = submit(存在IO操作的函数)
      • cpython解释器中由于GIL锁的原因,多线程无法实现并行;所以只能提高存在IO操作任务的效率。
    • obj.add_done_callback(高计算型的函数)
      • 对于高计算的任务没必要开启多线程,即使开了实际也是串行执行和不开没有区别。
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    def son():
        print(666)
        time.sleep(3)
        return 888
    
    def func(obj):
        print(obj) # obj.result() = 888
    
    t = ThreadPoolExecutor(20)
    obj = t.submit(son)
    print(‘main:‘, obj)
    obj.add_done_callback(func)
    
    # def my_add_done_callback(obj , fn):
    #     ret = obj.result()
    #     fn(obj)
    
    • 爬虫

      • 爬取网页

        • 需要进行数据传输和等待网络响应,存在IO操作。
        • 交给子线程完成
      • 分析网页
        • 没有IO操作
        • 交给回调函数完成

3.子进程中启动子线程

from multiprocessing import Process
from threading import Thread

def pfunc():
    print(‘启动子进程执行任务‘)
    Thread(target=tfunc).start()

def tfunc():
    print(‘子进程中启动子线程执行任务‘)

if __name__ == ‘__main__‘:
    Process(target=pfunc).start()
    print(‘主进程‘)

4.总结

  • 操作系统

    • 计算机中所有资源都是由操作系统分配的
    • 操作系统如何调度任务
      • 时间分片
      • 多道机制
    • 提高CPU利用率是我们努力的目标
  • 并发
    • 进程

      • 特点:开销大、数据隔离、资源分配单位、Cpython下可以并行(可以利用多核)
      • 三状态:就绪、运行、阻塞
      • multiprocessing模块
        • Process

          • 开启进程 start
          • 等待进程结束 join
        • Lock
          • 多进程同时操作文件或同时访问数据库都会导致数据不安全现象发生。
        • Queue
          • IPC机制(Pipe、redis、memcache、rabbitmq、kafka)
          • 生产者消费者模型
        • Manager
          • 提供数据共享机制
    • 线程
      • 特点:开销小、数据共享、CPU调度单位、Cpython下不能并行(不能利用多核)
      • GIL锁
        • 全局解释器锁
        • 由Cpython提供
        • 导致了一个进程中的多个线程在同一时刻只有一个能访问CPU
      • threading模块
        • Thread

          • 开启线程 start
          • 等待线程结束 join
        • Lock
          • 互斥锁:不能在同一线程中连续acquire,效率相对高。
        • Rlock
          • 递归锁:可以在同一线程中连续acquire,效率相对低。
        • 死锁现象
          • 如何发生
          • 如何避免
      • queue模块
        • Queue
        • LifoQueue
        • PriorityQueue
    • concurrent.futrues模块

      • ProcessPoolExecutor
      • ThreadPoolExecutor
    • 实例化一个池
      • tp = ThreadPoolExecutor(CPU核心数*2)
    • 提交任务到池
      • obj = tp.submit(fn, arg1, arg2, ...)
    • 获取返回值
      • obj.result()
    • 回调函数
      • obj.add_done_callback(fn)
    • 阻塞等待池中任务都结束
      • tp.shutdown()
  • 概念
    • IO操作
    • 同步异步
    • 阻塞非阻塞

内容详细

1.协程

协程的本质就是在单线程下,当一个任务遇到IO阻塞后通过用户控制切换到另一个任务去执行,以此来提升效率。

进程、线程、协程的区别:

  • 进程线程:系统级别,通过操作系统控制切换

    • 开销大,会增加操作系统压力
    • 操作系统对IO操作的感知灵敏
  • 协程:用户级别,通过Python代码控制切换
    • 开销小,不会增加操作系统压力
    • 用户对IO操作的感知不灵敏
  • Cpython解释器下,线程和协程都不能利用多核

协程的切换方式:

  • 协程:在一个线程中的多个任务能够相互切换,那么每一个任务就是一个协程。
  • 原生python完成
    • asyncio模块基于yield实现切换

      def eat():
          print(‘alex is eating‘)
          yield 1
          print(‘alex finished eat‘)
          yield 2
      
      def sleep():
          g = eat()
          next(g)
          print(‘yuan is sleeping‘)
          print(‘yuan finished sleep‘)
          next(g)
      
      sleep()
      
  • C语言完成
    • gevent模块基于greenlet模块实现切换

      from greenlet import greenlet
      
      def eat():
          print(‘alex is eating‘)
          g2.switch()
          print(‘alex finished eat‘)
      
      def sleep():
          print(‘yuan is sleeping‘)
          print(‘yuan finished sleep‘)
          g1.switch()
      
      g1 = greenlet(eat)
      g2 = greenlet(sleep)
      g1.switch()
      

2.gevent模块

import gevent

def eat():
    print(‘alex is eating‘)
    time.sleep(1)
    print(‘alex finished eat‘)

g = gevent.spawn(eat) # 创建一个协程任务,遇到阻塞才会执行。
print(‘没有阻塞则永远不会执行协程任务‘)
import time
import gevent

def eat():
    print(‘alex is eating‘)
    time.sleep(1)
    print(‘alex finished eat‘)

g = gevent.spawn(eat) # 创建协程任务,遇到阻塞执行。
time.sleep(1)
print(‘无法识别sleep是阻塞方法。‘)
import time
import gevent
from gevent import monkey
monkey.patch_all()  # 识别所有阻塞

def eat():
    print(‘alex is eating‘)
    time.sleep(1)
    print(‘alex finished eat‘)

g = gevent.spawn(eat) # 创建一个协程任务
time.sleep(1)
print(‘识别所有阻塞‘)
import time
import gevent
from gevent import monkey
monkey.patch_all() 

def eat():
    print(‘alex is eating‘)
    time.sleep(1)
    print(‘alex finished eat‘)

def sleep():
    print(‘yuan is sleeping‘)
    time.sleep(1)
    print(‘yuan finished sleep‘)

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
g1.join() # 阻塞直到g1任务完成
g2.join() # 阻塞直到g2任务完成
gevent.joinall([g1, g2]) # 阻塞直到列表中的所有完成
import time
import gevent
from gevent import monkey
monkey.patch_all()

def eat():
    print(2)
    time.sleep(0.5)
    print(5)

def sleep():
    print(3)
    time.sleep(0.5)
    print(6)

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
print(1)
time.sleep(0.5)
print(4)
time.sleep(0.5)
print(7)
import time
import gevent
from gevent import monkey
monkey.patch_all()

def eat():
    print(‘alex is eating‘)
    time.sleep(1)
    print(‘alex finished eat‘)

g_l = []
for i in range(10):
    g = gevent.spawn(eat) # 循环添加协程任务
    g_l.append(g)
print(1)
gevent.joinall(g_l)
import time
import gevent
from gevent import monkey
monkey.patch_all()

def eat():
    print(‘alex is eating‘)
    time.sleep(1)
    print(‘alex finished eat‘)
    return ‘alex‘

def sleep():
    print(‘yuan is sleeping‘)
    time.sleep(1)
    print(‘yuan finished sleep‘)
    return ‘yuan‘

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
gevent.joinall([g1, g2])
print(g1.value) # 获取返回值
print(g2.value)

3.asyncio模块

# 启动一个任务
import asyncio

async def demo():
    print(‘start‘)
    await asyncio.sleep(1)
    print(‘end‘)

loop = asyncio.get_event_loop() # 创建一个事件循环对象
loop.run_until_complete(demo()) # 把demo任务丢到事件循环中执行
# 启动多个任务,无返回值。
import asyncio

async def demo():
    print(‘start‘)
    await asyncio.sleep(1)
    print(‘end‘)

loop = asyncio.get_event_loop()
wait_obj = asyncio.wait([demo(), demo(), demo()])
loop.run_until_complete(wait_obj)
# 启动多个任务,有返回值。
import asyncio

async def demo():
    print(‘start‘)
    await asyncio.sleep(1)
    print(‘end‘)
    return 666

loop = asyncio.get_event_loop()
t1 = loop.create_task(demo())
t2 = loop.create_task(demo())
wait_obj = asyncio.wait([t1, t2])
loop.run_until_complete(wait_obj)

task_l = [t1, t2]
for t in task_l:
    print(t.result())
# 谁先回来先取谁的结果
import asyncio
async def demo(i):
    print(‘start‘)
    await asyncio.sleep(1)
    print(‘end‘)
    return i, 666

async def main():
    lst = []
    for i in range(10):
        task = asyncio.ensure_future(demo(i))
        lst.append(task)
    for ret in asyncio.as_completed(lst):
    	res = await ret
        print(ret)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

4.IO多路复用

# 方式一
import requests

key_lst = [‘alex‘, ‘wusir‘, ‘yuan‘]
for item in key_lst:
    ret = requests.get(‘https://www.baidu.com/s?wd=%s‘ % item)
    print(ret.text)

# 方式二
import socket

def get_data(key):
    client = socket.socket()

    # 创建连接:和百度创建连接,阻塞。
    client.connect((‘www.baidu.com‘, 80))
    # 发送请求:告诉百度你要什么。
    client.sendall(b‘GET /s?wd=%s HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘ % key)
    # 接收数据:等着接收百度的回复。
    chunk_list = []
    while True:
        chunk = client.recv(8096)
        if not chunk:
            break
        chunk_list.append(chunk)

    body = b‘‘.join(chunk_list)
    print(body.decode(‘utf-8‘))

key_lst = [‘alex‘, ‘wusir‘, ‘yuan‘]
for item in key_lst:
    get_data(item)

# 多线程并发
import threading

for item in key_lst:
    t = threading.Thread(target=get_data, args=(item,))
    t.start()

基于IO多路复用 + socket实现单线程并发请求

  • IO多路复用

    • 操作系统可以监听所有的IO请求状态

      • socket

        • 是否已经连接成功 → 可读
        • 是否已经获得数据 → 可写
    • 三种实现模式
      • select:最多监听1024个socket对象,循环检测(水平触发)。
      • poll:不限制监听个数,循环检测(水平触发)。
      • epoll:不限制监听个数,回调方式检测(边缘触发)。
  • socket非阻塞
    • 对象.setblocking(False)
import socket
import select

client1 = socket.socket()
client1.setblocking(False)  # 将原来阻塞的位置变成非阻塞
try:
    client1.connect((‘www.baidu.com‘, 80))
except BlockingIOError as e:
    pass

client2 = socket.socket()
client2.setblocking(False)  # 将原来阻塞的位置变成非阻塞
try:
    client2.connect((‘www.sogou.com‘, 80))
except BlockingIOError as e:
    pass

socket_lst = [client1, client2]
conn_lst = [client1, client2]

while True:
    r_lst, w_lst, e_lst = select.select(socket_lst, conn_lst, [], 0.005)  # IO多路复用

    # w_lst 表示已经连接成功的socket对象
    for sk in w_lst:
        if sk == client1:
            sk.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘)
        else:
            sk.sendall(b‘GET /web?query=wusir HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n‘)
        conn_lst.remove(sk)

    # r_lst 表示已经返回数据的socket对象
    for sk in r_lst:
        chunk_lst = []
        while True:
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_lst.append(chunk)
            except BlockingIOError as e:
                break
        body = b‘‘.join(chunk_lst)
        # print(‘>>>>>>>>‘, body.decode(‘utf-8‘))
        print(‘>>>>>>>>‘, body)
        sk.close()
        socket_lst.remove(sk)

    if not socket_lst:
        break

5.总结

协程

  • 协程称之为“微线程”,由开发者控制线程的执行流程。
  • 协程自身无法实现并发,只有配合IO切换才能实现并发。

单线程并发方式

  • 协程+IO切换

    • gevent
  • 基于事件循环的异步非阻塞框架
    • Twisted

原文地址:https://www.cnblogs.com/elliottwave/p/12656278.html

时间: 2024-11-09 10:41:16

No.36协程的相关文章

python协程有多厉害?

爬一个××网站上的东西,测算了一下协程的速度提升到底有多大,网站链接就不放了... import requests from bs4 import BeautifulSoup as sb import lxml import time url = 'http://www.××××.com/html/part/index27_' url_list = [] start = time.time() for i in range(2,47): print('get page '+str(i)) hea

写个百万级别full-stack小型协程库——原理介绍

其实说什么百万千万级别都是虚的,下面给出实现原理和测试结果,原理很简单,我就不上图了: 原理:为了简单明了,只支持单线程,每个协程共享一个4K的空间(你可以用堆,用匿名内存映射或者直接开个数组也都是可以的,总之得保证4K页对齐的空间),每个协程自己有私有栈空间指针privatestackptr,每个时刻只有一个协程在运行,此时栈空间在这个4K共享空间中(当然除了main以外),当切换协程时,动态分配一个堆内存,大小为此时协程栈实际大小(一般都很小,小的只有几十个Bytes, 大的有几百个Byte

Python基础—线程、进程和协程

今天已是学习Python的第十一天,来干一碗鸡汤继续今天的内容,今天的鸡汤是:超越别人对你的期望.本篇博客主要介绍以下几点内容: 线程的基本使用: 线程的锁机制: 生产者消费之模型(队列): 如何自定义线程池: 进程的基本使用: 进程的锁机制: 进程之间如何实现数据共享: 进程池: 协程的基本使用. 一.线程 1.创建线程 上篇博客已经介绍过如何创建多线程的程序,在这里在复习一下如何创建线程过程以及线程的一些方法: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 1

C#中的yield return与Unity中的Coroutine(协程)(下)

Unity中的Coroutine(协程) 估计熟悉Unity的人看过或者用过StartCoroutine() 假设我们在场景中有一个UGUI组件, Image: 将以下代码绑定到Image 1 using UnityEngine; 2 using System.Collections; 3 using System.Threading; 4 using UnityEngine.UI; 5 6 public class CoroutineDemo : MonoBehaviour { 7 8 //

Python并发实践_02_协程

python中实现并发的方式有很多种,通过多进程并发可以真正利用多核资源,而多线程并发则实现了进程内资源的共享,然而Python中由于GIL的存在,多线程是没有办法真正实现多核资源的. 对于计算密集型程序,应该使用多进程并发充分利用多核资源,而在IO密集型程序中,多核优势并不明显,甚至由于大多数时间都是在IO堵塞状态,多进程的切换消耗反而让程序效率更加低下. 而当需要并发处理IO密集型任务时,就需要用到协程(Coroutine).协程并没有系统级的调度,而是用户级的调度方式,避免了系统调用的开销

进程线程协程那些事儿

一.进程与线程 1.进程 我们电脑的应用程序,都是进程,假设我们用的电脑是单核的,cpu同时只能执行一个进程.当程序出于I/O阻塞的时候,CPU如果和程序一起等待,那就太浪费了,cpu会去执行其他的程序,此时就涉及到切换,切换前要保存上一个程序运行的状态,才能恢复,所以就需要有个东西来记录这个东西,就可以引出进程的概念了. 进程就是一个程序在一个数据集上的一次动态执行过程.进程由程序,数据集,进程控制块三部分组成.程序用来描述进程哪些功能以及如何完成:数据集是程序执行过程中所使用的资源:进程控制

python第五十三天--进程,协程.select.异步I/O...

进程: 1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import multiprocessing,threading,time 5 6 def run(name): 7 t=threading.Thread(target=run2)#创建新线程 8 t.start() 9 print('进程[%s],打印中...'%name) 10 time.sleep(1) 11 12 def run2(): 13 pri

python协程函数、递归、匿名函数与内置函数使用、模块与包

目录: 协程函数(yield生成器用法二) 面向过程编程 递归 匿名函数与内置函数的使用 模块 包 常用标准模块之re(正则表达式) 一.协程函数(yield生成器用法二) 1.生成器的语句形式 a.生成器相关python函数.装饰器.迭代器.生成器,我们是如何使用生成器的.一个生成器能暂停执行并返回一个中间的结果这就是 yield 语句的功能 : 返回一个中间值给调用者并暂停执行. 我们的调用方式为yeild 1的方式,此方式又称为生成器的语句形式. 而使用生成器的场景:使用生成器最好的场景就

python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)

1.协程 1 #协程 又称微线程 是一种用户的轻量级线程 程序级别代码控制 就不用加机器 2 #不同函数 = 不同任务 A函数切到B函数没有进行cpu级别的切换,而是程序级别的切换就是协程 yelied 3 4 #单线程下多个任务流用协程,比如打电话可以切换,nginx 5 #爽妹给你打电话的时候,她不说话,刘征电话过来时候你可以切过去,这时候要是爽妹说话,就会bibi响 6 ''' 7 8 协程的好处: 9 无需线程上下文切换的开销 10 无需原子操作锁定及同步的开销 11 "原子操作(ato