IO并发

IO分类

IO分类:阻塞IO,非阻塞IO,IO多路复用,异步IO等

阻塞IO

  1. 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
  2. 效率:阻塞IO是效率很低的一种IO。但是由于逻辑简单所以是默认IO行为。
  3. 阻塞情况:
  • 因为某种执行条件没有满足造成的函数阻塞

        如:accept        input        recv等

  • 处理IO的时间较长产生的阻塞状态

        如:网络传输,大文件读写等

非阻塞IO

  • 定义:通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态
  • 设置套接字为非阻塞IO

sockfd.setblocking(bool)

功能:设置套接字为非阻塞IO

参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞

  • 超时检测:设置一个最长阻塞时间,超过该时间后则不再阻塞等待。

sockfd.settimeout(sec)

功能:设置套接字的超时时间

参数:设置的时间

代码示例:

 1 from socket import *
 2 from time import *
 3
 4 # 日志文件
 5 f = open(‘log.txt‘,‘a+‘)
 6
 7 # tcp 服务端
 8 sockfd = socket()
 9 sockfd.bind((‘0.0.0.0‘,8888))
10 sockfd.listen(5)
11
12 # 非阻塞设置
13 # sockfd.setblocking(False)
14
15 # 超时时间
16 sockfd.settimeout(2)
17
18
19 while True:
20     print("Waiting from connect...")
21     try:
22         connfd,addr = sockfd.accept()
23     except (BlockingIOError,timeout) as e:
24         sleep(2)
25         f.write("%s : %s\n"%(ctime(),e))
26         f.flush()
27     else:
28         print("Connect from",addr)
29         data = connfd.recv(1024).decode()
30         print(data)

IO多路复用

  • 定义

同时监控多个IO事件,当那个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。

  • 具体方案

select方法:Windows  Linux  unix

poll方法:Linux  unix

epoll方法:Linux

select方法

rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数:rlist  列表  存放关注的等待发生的IO事件
      wlist  列表  存放关注的要主动处理的IO事件
      xlist  列表  存放关注的出现异常要处理的IO
      timeout  超时时间

返回值: rs 列表  rlist中准备就绪的IO
        ws 列表  wlist中准备就绪的IO
	xs 列表  xlist中准备就绪的IO

代码示例:

 1 from select import select
 2 from socket import *
 3
 4 s = socket()
 5 s.bind((‘0.0.0.0‘,8888))
 6 s.listen(3)
 7
 8 f = open(‘log.txt‘,‘r+‘)
 9
10 print("开始监控IO")
11 rs,ws,xs = select([s],[],[])
12 print(rs)
13 print(ws)
14 print(xs)

select实现TCP服务

【1】 将关注的IO放入对应的监控类别列表

【2】通过select函数进行监控

【3】遍历select返回值列表,确定就绪IO事件

【4】处理发生的IO事件

注意

  1. wlist中如果存在IO事件,则select立即返回给ws
  2. 处理IO过程中不要出现死循环占有服务端的情况
  3. IO多路复用消耗资源较少,效率较高

select tcp代码示例:

 1 from socket import *
 2 from select import select
 3
 4 # 创建监听套接字
 5 s = socket()
 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 7 s.bind((‘0.0.0.0‘,8888))
 8 s.listen(5)
 9
10 # 设置关注的IO列表
11 rlist = [s]  # s 用于等待处理连接
12 wlist = []
13 xlist = []
14
15 # 循环IO监控
16 while True:
17     # print("++++",rlist)
18     rs,ws,xs = select(rlist,wlist,xlist)
19     # print(‘----‘,rs)
20     # 遍历返回值列表,判断哪个IO就绪
21     for r in rs:
22         if r is s:
23             c,addr = r.accept()
24             print("Connect from",addr)
25             rlist.append(c) # 增加新的关注的IO
26         else:
27             # 表明有客户端发送消息
28             data = r.recv(1024).decode()
29             print(data)
30             r.send(b‘OK‘)
31
32     for w in ws:
33         pass
34
35     for x in xs:
36         pass

poll方法

p = select.poll()
功能 : 创建poll对象
返回值: poll对象

p.register(fd,event)
功能: 注册关注的IO事件
参数:fd  要关注的IO
      event  要关注的IO事件类型
  	     常用类型:POLLIN  读IO事件(rlist)
		      POLLOUT 写IO事件 (wlist)
		      POLLERR 异常IO  (xlist)
		      POLLHUP 断开连接
		  e.g. p.register(sockfd,POLLIN|POLLERR)

p.unregister(fd)
功能:取消对IO的关注
参数:IO对象或者IO对象的fileno

events = p.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IO
        events格式  [(fileno,event),()....]
        每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型

poll步骤

【1】 创建套接字

【2】 将套接字register

【3】 创建查找字典,并维护

【4】 循环监控IO发生

【5】 处理发生的IO

poll代码示例:

 1 from socket import *
 2 from select import *
 3
 4 # 创建监听套接字,作为关注的IO
 5 s = socket()
 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 7 s.bind((‘0.0.0.0‘,8888))
 8 s.listen(3)
 9
10 # 创建poll对象
11 p = poll()
12
13 # 建立查找字典,通过IO的fileno查找io对象
14 # 始终与register的IO保持一直
15 fdmap = {s.fileno():s}
16
17 # 关注 s
18 p.register(s,POLLIN|POLLERR)
19
20 # 循环监控IO发生
21 while True:
22     events = p.poll() # 阻塞等待IO发生
23     # 循环遍历查看哪个IO准备就绪
24     for fd,event in events:
25         print(events)
26         if fd == s.fileno():
27             c,addr = fdmap[fd].accept()
28             print("Connect from",addr)
29             # 关注客户端连接套接字
30             p.register(c,POLLIN|POLLHUP)
31             fdmap[c.fileno()] = c  # 维护字典
32         elif event & POLLIN:
33             data = fdmap[fd].recv(1024).decode()
34             if not data:
35                 p.unregister(fd) # 取消监控
36                 fdmap[fd].close()
37                 del fdmap[fd] # 从字典删除
38                 continue
39             print(data)
40             p.register(fdmap[fd],POLLOUT)
41         elif event & POLLOUT:
42             fdmap[fd].send(b‘OK‘)
43             p.register(fdmap[fd], POLLIN)

epoll方法

使用方法:

  1. 基本与poll相同
  2. 生成对象改为epoll()
  3. 将所有事件类型改为EPOLL类型

epoll特点

  1. epoll效率比select poll要高
  2. epoll监控IO数量方式比poll要多
  3. epoll的触发方式比poll要多(EPOLLET边缘触发)

epoll完成tcp并发代码示例:

 1 from socket import *
 2 from select import *
 3
 4 # 创建监听套接字,作为关注的IO
 5 s = socket()
 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
 7 s.bind((‘0.0.0.0‘,8888))
 8 s.listen(3)
 9
10 # 创建epoll对象
11 ep = epoll()
12
13 # 建立查找字典,通过IO的fileno查找io对象
14 # 始终与register的IO保持一直
15 fdmap = {s.fileno():s}
16
17 # 关注 s
18 ep.register(s,EPOLLIN|EPOLLERR)
19
20 # 循环监控IO发生
21 while True:
22     events = ep.poll() # 阻塞等待IO发生
23     print("你有新的IO需要处理哦")
24     # 循环遍历查看哪个IO准备就绪
25     for fd,event in events:
26         print(events)
27         if fd == s.fileno():
28             c,addr = fdmap[fd].accept()
29             print("Connect from",addr)
30             # 关注客户端连接套接字
31             ep.register(c,EPOLLIN|EPOLLET) # 设置边缘触发
32             fdmap[c.fileno()] = c  # 维护字典
33         # elif event & EPOLLIN:
34         #     data = fdmap[fd].recv(1024).decode()
35         #     if not data:
36         #         ep.unregister(fd) # 取消监控
37         #         fdmap[fd].close()
38         #         del fdmap[fd] # 从字典删除
39         #         continue
40         #     print(data)
41         #     ep.unregister(fd) # 先取消关注再重新添加
42         #     ep.register(fdmap[fd], EPOLLOUT)
43         # elif event & POLLOUT:
44         #     fdmap[fd].send(b‘OK‘)
45         #     ep.unregister(fd)
46         #     ep.register(fdmap[fd], EPOLLIN)

协程技术

基础概念

  1. 定义:纤程,微线程。是允许在不同入口点不同位置暂停的计算机程序,简单来说,协程就是可以暂停执行的函数。
  2. 协程原理:记录一个函数的上下文,协程调度切换时会将记录的上下文保存,在切换回来时进行调取,恢复原有的执行内容,以便从上一次执行位置继续执行
  3. 协程优缺点:

优点:

  1.   协程完成多任务占用计算资源很少
  2.   由于协程的多任务切换在应用层完成,因此切换开销少
  3.   协程为单线程程序,无需进行共享资源同步互斥处理

缺点:

  1.   协程的本质是一个单线程,无法利用计算机多核资源

扩展延申@标准库协程的实现

python3.5以后,使用标准库asyncio和async/await 语法来编写并发代码。asyncio库通过对异步IO行为的支持完成python的协程。虽然官方说asyncio是未来的开发方向,但是由于其生态不够丰富,大量的客户端不支持awaitable需要自己去封装,所以在使用上存在缺陷。更多时候只能使用已有的异步库(asyncio等),功能有限。

第三方协程模块

greenlet模块

  • 安装:sudo pip3 install greenlet
  • 函数
greenlet.greenlet(func)
功能:创建协程对象
参数:协程函数

g.switch()
功能:选择要执行的协程函数

协程行为代码示例:

 1 from greenlet import greenlet
 2
 3 def fun1():
 4     print("执行 fun1")
 5     gr2.switch()
 6     print("结束 fun1")
 7     gr2.switch()
 8
 9 def fun2():
10     print("执行 fun2")
11     gr1.switch()
12     print("结束 fun2")
13
14 # 将函数变为协程
15 gr1 = greenlet(fun1)
16 gr2 = greenlet(fun2)
17
18 gr1.switch() # 选择执行的协程函数

gevent模块

安装:sudo pip3 install gevent

函数:

gevent.spawn(func,argv)
功能: 生成协程对象
参数:func  协程函数
     argv  给协程函数传参(不定参)
返回值: 协程对象

gevent.joinall(list,[timeout])
功能: 阻塞等待协程执行完毕
参数:list  协程对象列表
     timeout 超时时间

gevent.sleep(sec)
功能: gevent睡眠阻塞
参数:睡眠时间

* gevent协程只有在遇到gevent指定的阻塞行为时才会自动在协程之间进行跳转
如gevent.joinall(),gevent.sleep()带来的阻塞

gevent生成协程示例:

 1 import gevent
 2 from gevent import monkey
 3 monkey.patch_time() # 修改对time模块中阻塞的解释行为
 4 from time import sleep
 5
 6 # 协程函数
 7 def foo(a,b):
 8     print("Running foo ...",a,b)
 9     # gevent.sleep(3)
10     sleep(3)
11     print("Foo again..")
12
13 def bar():
14     print("Running bar ...")
15     # gevent.sleep(2)
16     sleep(2)
17     print("Bar again..")
18
19 # 生成协程对象
20 f = gevent.spawn(foo,1,2)
21 g = gevent.spawn(bar)
22
23 gevent.joinall([f,g]) #阻塞等待f,g代表的协程执行完毕

基于协程的TCP开发代码示例:

 1 import gevent
 2 from gevent import monkey
 3 monkey.patch_all() # 执行脚本,修改socket
 4 from socket import *
 5
 6 def handle(c):
 7     while True:
 8         data = c.recv(1024).decode()
 9         if not data:
10             break
11         print(data)
12         c.send(b‘OK‘)
13     c.close()
14
15 # 创建tcp套接字
16 s = socket()
17 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
18 s.bind((‘0.0.0.0‘,8888))
19 s.listen(5)
20
21 # 循环接收来自客户端连接
22 while True:
23     c,addr = s.accept()
24     print("Connect from",addr)
25     # handle(c) # 处理具体客户端请求
26     gevent.spawn(handle,c) # 协程方案

  • monkey脚本

作用:在gevent协程中,协程只有遇到gevent指定类型的阻塞才能跳转到其他协程,因此,我们希望将普通IO阻塞行为转换为可以触发gevent协程跳转的阻塞,以提高执行效率。

转换方法:gevent提供了一个脚本程序monkey,可以修改底层解释IO阻塞的行为,将很多普通阻塞转换为gevent阻塞。

使用方法

【1】 导入monkey
		from gevent  import monkey
【2】 运行相应的脚本,例如转换socket中所有阻塞
		monkey.patch_socket()
【3】 如果将所有可转换的IO阻塞全部转换则运行all
		monkey.patch_all()
【4】 注意:脚本运行函数需要在对应模块导入前执行

基于协程的TCP开发代码示例:

 1 """
 2 思路 : 1. 每个客户函数端设置为协成
 3       2. 将socket模块下的阻塞变为可以触发协程跳转
 4 """
 5 import gevent
 6 from gevent import monkey
 7 monkey.patch_all() # 执行脚本,修改socket
 8 from socket import *
 9
10 def handle(c):
11     while True:
12         data = c.recv(1024).decode()
13         if not data:
14             break
15         print(data)
16         c.send(b‘OK‘)
17     c.close()
18
19 # 创建tcp套接字
20 s = socket()
21 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
22 s.bind((‘0.0.0.0‘,8888))
23 s.listen(5)
24
25 # 循环接收来自客户端连接
26 while True:
27     c,addr = s.accept()
28     print("Connect from",addr)
29     # handle(c) # 处理具体客户端请求
30     gevent.spawn(handle,c) # 协程方案

扩展:位运算

定义:将整数转换为二进制,按二进制位进行运算

运算符号:

  &  按位与
  |  按位或
  ^  按位异或
  << 左移
  >> 右移

 

      14 --> 01110
      19 --> 10011

14 & 19 = 00010 = 2  一0则0
14 | 19 = 11111 = 31 一1则1
14 ^ 19 = 11101 = 29 相同为0不同为1
14 << 2 = 111000 = 56 向左移动低位补0
14 >> 2 = 11 = 3  向右移动去掉低位

 

原文地址:https://www.cnblogs.com/-xiaolong/p/11371839.html

时间: 2024-08-29 23:14:56

IO并发的相关文章

Java IO: 并发IO

原文链接 作者: Jakob Jenkov 译者: 李璟 有时候你可能需要并发地处理输入和输出.换句话说,你可能有超过一个线程处理输入和产生输出.比如,你有一个程序需要处理磁盘上的大量文件,这个任务可以通过并发操作提高性能.又比如,你有一个web服务器或者聊天服务器,接收许多连接和请求,这些任务都可以通过并发获得性能的提升. 如果你需要并发处理IO,这里有几个问题可能需要注意一下: 在同一时刻不能有多个线程同时从InputStream或者Reader中读取数据,也不能同时往OutputStrea

Python高级编程和异步IO并发编程

Python高级编程和异步IO并发编程网盘地址:https://pan.baidu.com/s/1eB-BsUacBRhKxh7qXwndMQ 密码: tgba备用地址(腾讯微云):https://share.weiyun.com/5Z3x9V0 密码:7cdnb2 针对Python高级编程和异步IO并发编程,把每个Python高级知识点从起因到原理讲透的课程全网难寻 第1章 课程简介第2章 python中一切皆对象第3章 魔法函数第4章 深入类和对象第5章 自定义序列类第6章 深入python

IO并发原理

并发原理: 几乎所有的IO接口都是阻塞型的,处理过程中线程将被阻塞,无法进行任何操作直到返回调用结果,或超时. IO模型:系统内核   和   一个调用这个IO的线程 第一步  等待数据准备 第二部  将数据从内核拷贝到进程中 传统阻塞IO 用户线程发送IO请求(read操作)到系统内核,系统内核首先进行数据准备,然后进行数据拷贝.这两个过程中用户线程是完全阻塞的状态,啥也干不了. 非阻塞IO 用户线程发出IO请求,系统内核会开始准备数据并且直接返回一个error,然后用户线程接收到返回值可以非

IO复用(Reactor模式和Preactor模式)——用epoll来提高服务器并发能力

上篇线程/进程并发服务器中提到,提高服务器性能在IO层需要关注两个地方,一个是文件描述符处理,一个是线程调度. IO复用是什么?IO即Input/Output,在网络编程中,文件描述符就是一种IO操作. 为什么要IO复用? 1.网络编程中非常多函数是阻塞的,如connect,利用IO复用可以以非阻塞形式执行代码. 2.之前提到listen维护两个队列,完成握手的队列可能有多个就绪的描述符,IO复用可以批处理描述符. 3.有时候可能要同时处理TCP和UDP,同时监听多个端口,同时处理读写和连接等.

集群瓶颈:磁盘IO必读

首先需要知道什么是IO: IO是输入输出接口阅读本文章可以带着下面问题1.集群的瓶颈为什么IO?2.你对IO了解多少? 这里面只说个人观点:当我们面临集群作战的时候,我们所希望的是即读即得.可是面对大数据,读取数据需要经过IO,这里可以把IO理解为水的管道.管道越大越强,我们对于T级的数据读取就越快.所以IO的好坏,直接影响了集群对于数据的处理. 下面详细介绍IO 读/写IO磁盘控制器向磁盘发出一次读/写指令,给出开始扇区的地址和向后连续读/写的扇区的个数.读/写IO是一次IO,操作的扇区编号必

Java IO 学习总结 学习手册总结

Java IO 是一套Java用来读写数据(输入和输出)的API.大部分程序都要处理一些输入,并由输入产生一些输出.Java为此提供了java.io包. 代码 github地址:https://github.com/loveincode/StudyTest/tree/master/src/IO Java.io 包的范围 java.io 包并没有涵盖所有输入输出类型.例如,并不包含GUI或者网页上的输入输出,这些输入和输出在其它地方都涉及,比如Swing工程中的JFC (Java Foundati

go语言】Goroutines 并发模式

并发模式 让我们先来回顾一下boring函数的例子. func boring(msg string, c chan string) {    for i := 0; ; i++ {         c <- fmt.Sprintf("%s %d", msg, i)         time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond) } }             func main() { c := make(

异步陷阱之IO篇

很多教程和资料都强调流畅的用户体验需要异步来辅助,核心思想就是保证用户前端的交互永远有最高的优先级,让一切费时的逻辑通通放到后台,等到诸事完备,通知一下前端给个提示或者继续下一步.随着.NET发展,async和await关键字的推广,Task Parallel Library (TPL)的稳步发展, 异步编程也越来越多的被重视和采用,很多时候非常便利的解决各种性能问题,但同时也带来了很多的陷阱.?? 这里我抛出一个实际项目中遇到的陷阱,先简单交代一下故事背景:SpreadJS产品有一个Excel

大并发server架构 &amp;amp;&amp;amp; 大型站点架构演变

server的三条要求: 高性能:对于大量请求,及时高速的响应 高可用:7*24 不间断,出现问题自己主动转移.这叫fail over(故障转移) 伸缩性:使用跨机器的通信(TCP) 另外不论什么网络系统结构都能够抽象成C/S架构.我们常说的B/S模式本质上也是C/S架构(浏览器看作client). 一个典型的server架构: 注: epoll是linux下最高效的网络I/O 因为server须要高效处理大并发连接.因此多个位置均可能出现性能瓶颈,以下我们分析不同位置产生瓶颈的原因及其处理方法