网络编程进阶:并发编程之协程、IO模型

协程:

基于单线程实现并发,即只用一个主线程(此时可利用的CPU只有一个)情况下实现并发; 并发的本质:切换+保存状态

CPU正在运行一个任务,会在两种情况下切走去执行其他任务(切换有操作系统强制控制),一种情况是该任务发生了阻塞,另一种是该任务计算的时间过长或有一个优先级更高的程序替代了它

在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态

如果多个任务都是纯计算的,上图的情况2并不能提升效率,因为只是让CPU来回切,这样看起来所有任务都被“同时”执行的效果,此时这种切换反而会降低效率;

yield本身就是一种在单线程下可以保存任务运行状态的方法,其特点如下:

  1. yield可以保存状态,yield的状态保存于操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级

  2. send可以把一个函数的结果传递给另外一个函数,以此实现单线程内程序之间的切换;yield并不能实现遇到io切换

在任务1遇到io情况下,切到另外一个任务去执行,这样就可以利用任务1阻塞的时间完成其他任务的计算,效率的提升就在此处

对于单线程下,我们不可避免程序中出现io操作,如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另一个任务去计算,这样就保证了该线程能最大限度的处于就绪状态,即随时都可以被CPU执行的状态,相当于我们在用户程序级别将自己的io操作最大限度的隐藏起来,从而可以“迷惑”操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将CPU的执行权限分配给我们的线程(程序执行效率高就是该软件能够过得更多的CPU执行权限)

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换到另外一个任务去执行,以此提升效率;为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

  1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行

  2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

协程:是单线程下的并发,又称微线程,纤程(Coroutine);协程是一种用户态的轻量级纤程,即协程是用户程序由自己控制调度的

需要注意的是:

  1. python的线程属于内核级别的,即有操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出CPU执行权限,切换到其他线程运行)

  2. 单线程内开启协程,一遇到io,就会从应用程序级别(而非操作系统级别)控制切换,以此来提升效率(注意:非io操作的切换与效率无关)

对比操作系统控制线程的切换,用户在单线程内控制协程的切换:

优点:

  1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

  2. 单线程内就可以实现并发的效果,最大限度的利用CPU

缺点:

  1. 协程的本质还是单线程,无法利用多核优势,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程;

  2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

协程特点总结:

  1. 必须在只有一个单线程里实现并发

  2. 修改共享数据不需要加锁

  3. 用户程序里自己保存多个控制流的上下文栈

  4. 附加: 一个协程遇到io操作自动切换到其他协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

协程greenlet:

from greenlet import greenlet
import time

def eat(name):
    print("%s is eating 1"%name)
    time.sleep(3)
    g2.switch("neo")   # g2 再去 switch
    print("%s is eating 2"%name)
    g2.switch("alex")  # 再 switch 到 play

def play(name):
    print("%s play 1"%name)
    g1.switch("neo")  # 再 switch 到 eat
    print("%s play 2"%name)

g1 = greenlet(eat)   # greenlet() 只能传入函数名,不要传函数参数
g2 = greenlet(play)

g1.switch("neo")  # 利用 g1.switch() 启动程序  # 此时需要传入函数的参数

"""
greenlet 并不能检测到I/O阻塞
"""

运行结果:

gevent协程:

from gevent import monkey;monkey.patch_all()  # 合并成一行,专门用于打标记
import gevent
import time

def eat(name):
    print("%s is eating 1"%name)
    # gevent.sleep(3)  # gevent.sleep()和 time.sleep()效果一样
    time.sleep(3)
    print("%s is eating 2"%name)

def play(name):
    print("%s play 1"%name)
    # gevent.sleep(4)
    time.sleep(4)
    print("%s play 2"%name)

start = time.time()

g1 = gevent.spawn(eat,"neo")   # 提交任务  #  spawn()第一个参数写任务名,后面直接参数就行(位置参数或关键字参数都可以)
g2 = gevent.spawn(play,"alice")  # gevent.spawn()是异步提交任务

g1.join()
g2.join()   # 保证上面提交的两个任务都执行完毕了  # 协程是单线程的,需要再线程结束前等待g1和g2,要不然g1和g2还没起来,“主线程”就结束了,此时g1和g2也就不会再执行了
# g1.join()和g2.join()可以合并成:# gevent.joinall([g1,g2])
"""
执行过程分析:
g1先起来,执行了第一个print,然后遇到了IO阻塞(gevent.sleep(3)),然后立马就切到了 g2 提交的 play任务,
执行 play中的第一个print,接着又遇到了IO阻塞(gevent.sleep(4)),然后就又切到了 g1的eat任务,此时g1的eat还是处于阻塞状态,接着就在两个任务之间来回切
直到 g1的eat 又处于就绪状态,打印 eat的第2个print;执行完 eat之后,g2的play还处于阻塞状态,然后等其阻塞结束后执行 play的第2个print;
通过这种方式 gevent帮你监测了多个任务之间的IO阻塞(遇到IO阻塞就切走)
"""

stop = time.time()
print(stop-start)

"""
执行时间大约是阻塞最长的时间,如果这两个任务改成串行,则执行时间就是 3+4 = 7秒多
"""

"""
gevent.sleep() 不等同于 time.sleep(),因为gevent只能识别它自己模拟的阻塞(所以,如果把gevent.sleep()改成了time.sleep(),则整个程序就还是串行)
如果想实现gevent识别所有的阻塞,就需要用到gevent模块下的monkey,monkey下面有一个功能是 patch_all(),
monkey.patch_all()的作用是把下面凡是涉及到IO操作的都会帮你打一个标记,从而能被gevent识别
但凡需要gevent模块监测一个IO操作,就需要在你整个文件的开头导入 monkey,并且做一个 monkey.patch_all()的操作(一定要在整个文件的开头写,这样文件下面多有的模块也好、功能也好,都会被做上标记)
"""

运行结果:

gevent应用场景:单线程下多个任务是IO密集型(因为计算密集型gevent并不能提高效率)

基于gevent模块实现并发的套接字通信

服务端:

from gevent import monkey,spawn;monkey.patch_all()
from socket import *

def comm(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            conn.close()
            break

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,client_addr = server.accept()
        spawn(comm,conn)
        # 这个地方就不需要再写 .join(),因为程序能走到这一步说明server这个协程肯定已经起来了,这时线程就已经进入了while True死循环,那么comm这个协程就肯定也能起来

    server.close()

if __name__ == "__main__":
    g = spawn(server,"127.0.0.1",8080)
    g.join()  # 这个地方需要写 join(),从而保证server函数能真正起来

服务端:

from socket import *
from threading import Thread,current_thread

def client():
    client = socket(AF_INET,SOCK_STREAM)
    client.connect(("127.0.0.1",8080))
    while True:
        client.send(("hello %s"%current_thread().getName()).encode("utf-8"))

        data = client.recv(1024)
        print(data.decode("utf-8"))

    conn.close()

if __name__ == "__main__":
    for i in range(500):  # 模拟并发的效果
        t = Thread(target=client)
        t.start()  # 500个线程能很快起来

IO模型:

同步调用不等于阻塞(同步是:提交完后不管有没有阻塞,不管是IO密集还是计算密集,我都在原地等着)

IO模型介绍:

为了更好的了解IO模型,需要先了解:同步(synchronous)IO、异步(asynchronous)IO、阻塞(blocking)IO、非阻塞(non-blocking)IO

以下讨论的是linux环境下的netword IO,共5中IO Model:

1. blocking IO

2. nonblocking IO

3. IO multiplexing

4. signal driven IO

5. asynchronous IO

附:由于signal driven IO(信号驱动IO)在实际中并不常见,所以主要介绍其余四种IO Model

再说一下IO发生时涉及的对象和步骤。对于一个network IO(这里我们以read为例),它会涉及到两个系统对象,一个是调用这个IO 的process(or thread),另一个是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

1. 等待数据准备;

2. 将数据从内核拷贝到进程(线程)中

上述IO模型的区别就是在这两个阶段上各有不同

阻塞IO(blocking IO)

在Linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程如下:

blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了

所谓阻塞型IO是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞;只有当该系统调用获得结果或者超时出错时才会返回

除非特别指定,几乎所有的IO接口(包括socket接口)都是阻塞型的

解决方法之多线程、多进程:

  在服务端使用多进程(多线程),让每个链接都拥有独立的进程(线程),这样任何一个链接的阻塞都不会影响其他的链接;但该方法的问题是:在遇到要同时相应成百上千的链接请求时会严重占据系统资源,降低系统对外界响应效率,进程线程本身也容易进入假死状态

改进方案之线程池、进程池:

  线程池(进程池)旨在减少创建和销毁线程的频率,维持一定合理数量的线程,并让空闲的线程重新担任起新的执行任务。“连接池”维持链接的缓存池,尽量重用自己的链接、减少创建和关闭链接的频率。虽然“线程池”和连接池在一定程度上缓解了频繁调用IO接口带来的资源占用,但“池”使用有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的效果好;所以“线程池”或“链接池”可以缓解部分压力,但不能解决所有问题。

多线程模型可以高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题

非阻塞IO

Linux下,可以通过设置socket使其变成non-blocking。 当对一个non-blocking socket执行读操作时,流程如下:

可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果;用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作;一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段任然是阻塞的),然后返回。(注意:拷贝数据整个过程,进程仍然是属于阻塞的状态)

所以,在非阻塞IO中,用户进程其实是需要不断的主动询问kernel数据准备好没有

非阻塞IO示例:

# 服务端

# socket套接字中的 accept,recv和send是阻塞IO,非阻塞IO模式中需要对这三个地方做处理
from socket import *

server = socket(AF_INET,SOCK_STREAM)
server.bind(("127.0.0.1",8080))
server.listen(5)
server.setblocking(False)   # Set the socket to blocking (flag is true) or non-blocking (false). # 设置socket是阻塞型还是非阻塞型;默认flag是True(阻塞型

conn_list = []
del_conn = []

while True:
    try:
        # 建链接
        conn,client_addr = server.accept()
        conn_list.append(conn)
        print(conn_list)

    except BlockingIOError:  # 如果程序在accept处遇到了IO阻塞,就执行 except
        try:
            # 接收数据、并准备发送数据的任务
            send_list = []
            for conn in conn_list:
                try:
                    data = conn.recv(1024)

                    # send也是IO,也有发生阻塞的可能性(例如发送数据过大但内存已不足),所以也需要解决这个阻塞问题
                    if not data:  # linux 系统
                        del_conn.append(conn)
                        continue
                    send_list.append((conn,data.upper()))  # 把要发送的数据及其相应的conn打包成元祖的形式添加到 send_list

                except BlockingIOError:  # 如果某个conn在recv处遇到了IO阻塞,就跳过它继续执行后面的conn
                    continue

                except ConnectionResetError:  # 如果某个conn当方面断开了链接,需要把这个conn回收,并且把它从conn_list中remove掉
                    # conn.close()
                    # 迭代对象在循环的过程中不能改变迭代对象的结构,所以此时不能把conn从conn_list中直接remove
                    del_conn.append(conn) # 把这个conn添加到要删除的列表中

            # 发消息的任务
            del_sent_list = []
            for item in send_list:  # 解决send的IO阻塞
                try:
                    conn = item[0]
                    content = item[1]
                    conn.send(content)
                    del_sent_list.append(item)  # 发送成功后,需要把这个元祖(conn,content)从 send_list中删除
                except BlockingIOError:  # 如果没发送成功
                    pass  # 循环的最后一行代码,可用pass代替continue

            # 删除已发送的信息
            for item in del_sent_list:
                send_list.remove(item)

            # 删除当方面断掉的客户端连接
            for conn in del_conn:  # 此时从connlist中删除这个conn
                conn_list.remove(conn)
        except:
            pass

"""
服务端此时就一个线程,
实现了自己去监测程序中的IO,遇到IO就切到其他任务去运行,这与gevent实现的原理类似
"""

# 客户端
from socket import *

client = socket(AF_INET,SOCK_STREAM)client.connect(("127.0.0.1",8080))

while True:    msg = input(">>>").strip()    if not msg:continue    client.send(msg.encode("utf-8"))

data = client.recv(1024)    print(data.decode("utf-8"))
 

但是非阻塞IO绝不推荐使用

原因: 1. 请求无法立即响应

    2.  客户端没有阻塞但却在一直运行着,这就是一个死循环,程序一直处于就绪状态,CPU占用率高,但程序却不是一直在工作,而是在做大量无用的“询问”,问操作系统“数据有没有准备好”

多路复用IO(IO multiplexing)

IO multiplexing 也称为 事件驱动IO(event driven IO);select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理是select/epoll这个function会不断的轮询所负责的所有的socket,当某个socket有数据到大了,就通知用户进程,如下图所示:

当用户进程调用了select,那么整个进程会被block,同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。这个图和blocking IO的图其实没有太大的不同,事实上还更差,因为这里需要使用两个系统调用(select 和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

在多路复用模型中,对于每一个socket,一般都设置成为 non-blocking,但是,如上图所示,整个用户的process其实一直被block的,只不过process是被select这个函数block,而不是被socket IO给block

结论: select的优势在于处理多个连接,不适用于单个连接

select网络IO模型示例(还没完全理解):

import select
from socket import *

server = socket(AF_INET,SOCK_STREAM)
server.bind(("127.0.0.1",8080))
server.listen(5)
server.setblocking(False)

rlist = [server,]  # 用于存放收消息的套接字
w1ist = []        # 用于存放发消息的套接字
wdata = {}  # 用于存放将要发送的消息

while True:
    rl,wl,xl = select.select(rlist,w1ist,[],3)  # 监测select代理的套接字
    print("rl",rl)
    print("wl",wl)

    for sock in rl:
        if sock == server:
            conn,addr = sock.accept()
            rlist.append(conn)   # 把conn这个套接字添加到 rlist这个收消息的套接字列表中
        else:
            try: # 捕捉客户端单方面断开连接的异常
                data = sock.recv(1024)
                if not data:  # linux系统
                    sock.close()
                    rlist.remove(sock)
                    continue
                w1ist.append(sock)
                wdata[sock] = data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in w1ist:
        data = wdata[sock]
        sock.send(data)
        wdata.pop(sock)  # 发完之后要把消息从wdata中删除
        w1ist.remove(sock)

server.close()

异步IO(Asynchronous IO):

Linux下的 asynchronous IO其实用的不多;它的流程如下:

用户进程发起read操作之后,立刻就开始去做其他的事。另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block,然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发一个signal,告诉它read操作已经完成了

原文地址:https://www.cnblogs.com/neozheng/p/8683263.html

时间: 2024-11-05 23:24:04

网络编程进阶:并发编程之协程、IO模型的相关文章

python_day10 多线程 协程 IO模型

多线程协程IO模型 多线程 #线程的PID与主进程PID一致 from threading import Thread from multiprocessing import Process import os def task(): print('%s is running' %os.getpid()) if __name__ == '__main__': t1=Thread(target=task,) t2=Thread(target=task,) # t1=Process(target=t

Python3 网络编程和并发编程总结

目录 网络编程 开发架构 OSI七层模型 socket subprocess 粘包问题 socketserver TCP UDP 并发编程 多道技术 并发和并行 进程 僵尸进程和孤儿进程 守护进程 互斥锁 队列 IPC进程间通信 生产者与消费者模型 线程 GIL 多线程与多进程的选择 死锁 递归锁 信号量 线程队列 event事件 进程池与线程池 协程 gevent IO模型 网络编程 开发架构 B/S: browser/server C/S: client/server OSI七层模型 应用层

Java网络编程和NIO详解3:IO模型与Java网络编程模型

Java网络编程和NIO详解3:IO模型与Java网络编程模型 基本概念说明 用户空间与内核空间 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方).操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限.为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间.针对linux操作系统而言,将最高的1G字节(从虚拟地址

【Java并发编程】并发编程大合集

转载自:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Java并发编程]Volatile关键字(上)

Http协议和IO模型

HTTP 协议和IO模型 一:HTTP协议 http协议:HyperText Transfer Procotol超文本传输协议,http协议是无状态的,监听在80端口,TCP协议上.HTTP协议的特点有以下几点: 1.支持客户/服务器模式.2.简单快速:客户向服务器请求服务时,只需传送请求方法和路径.请求方法常用的有GET.HEAD.POST.每种方法规定了客户与服务器联系的类型不同.由于HTTP协议简单,使得HTTP服务器的程序规模小,因而通信速度很快.3.灵活:HTTP允许传输任意类型的数据

Cpython解释器下实现并发编程——多进程、多线程、协程、IO模型

一.背景知识 进程即正在执行的一个过程.进程是对正在运行的程序的一个抽象. 进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一.操作系统的其他所有内容都是围绕进程的概念展开的.   一.操作系统相关的知识 详情见链接:http://www.cnblogs.com/linhaifeng/p/6295875.html 即使可以利用的CPU只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力.将一个单独的CPU变成多个虚拟的CPU(多道技术:时

Python并发编程理解yield from、协程

一.首页,认识一下可迭代,迭代器和生成器 可迭代,如:list,dict,tuple,deque等都是可迭代对象: 验证,需要借助collections.abc这个模块(python2中没有)使用isinstance()来类别一个对象是否是可迭代的(Iterable),是否是迭代器(Iterator),是否是生成器(Generator) 代码如下:import collections   from collections.abc import Iterable.Iterator.Generato

python 网络编程和并发编程题

1.答: 应用层与其它计算机进行通讯的一个应用,它是对应应用程序的通信服务的.例如,一个没有通信功能的字处理程序就不能执行通信的代码,从事字处理工作的程序员也不关心OSI的第7层.但是,如果添加了一个传输文件的选项,那么字处理器的程序员就需要实现OSI的第7层.示例:TELNET,HTTP,FTP,NFS,SMTP等.表示层这一层的主要功能是定义数据格式及加密.例如,FTP允许你选择以二进制或ASCII格式传输.如果选择二进制,那么发送方和接收方不改变文件的内容.如果选择ASCII格式,发送方将

网络编程与并发编程总结

目录 软件开发架构: C/S架构: B/S架构: 一.网络编程: 1.互联网协议OSI七层协议 2.socket 3.手撸socket套接字模板 4.subprocess(了解) 5.粘包问题 6.struct解决粘包问题 7.上传大文件数据 8.socketserver(现阶段,了解) 二.并发编程 1.并发与并行: 2.进程调度: 3.进程的三个状态: 4.同步与异步: 5.阻塞与非阻塞 7.回收进程的两种条件: 9.互斥锁: 10.队列 11.线程 12.全局解释器锁 14.递归锁 15.

网络编程和并发编程

进程 进程:正在进行的一个过程或者说一个任务.而负责执行任务则是cpu 举例(单核+多道) egon在一个时间段内有很多任务要做:python备课的任务,写书的任务,交女朋友的任务,王者荣耀上分的任务 进程与程序的区别 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程 并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务 并发:是伪并行,即看起来是同时运行.单个cpu