socket编程
什么是socket
Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。
所以,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规定去编程,写出的程序自然就是遵循tcp/udp标准的。
基于文件类型的套接字家族
套接字家族的名字:AF_UNIX
unix一切皆文件,基于文件的套接字调用的就是底层的文件系统来取数据,两个套接字进程运行在同一机器,可以通过访问同一个文件系统间接完成通信
基于网络类型的套接字家族
套接字家族的名字:AF_INET
(还有AF_INET6被用于ipv6,还有一些其他的地址家族,不过,他们要么是只用于某个平台,要么就是已经被废弃,或者是很少被使用,或者是根本没有实现,所有地址家族中,AF_INET是使用最广泛的一个,python支持很多种地址家族,但是由于我们只关心网络编程,所以大部分时候我么只使用AF_INET)
套接字函数用法
服务端套接字函数 s.bind() 绑定(主机,端口号)到套接字 s.listen() 开始TCP监听 s.accept() 被动接受TCP客户的连接,(阻塞式)等待连接的到来 客户端套接字函数 s.connect() 主动初始化TCP服务器连接 s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常 公共用途的套接字函数 s.recv() 接收TCP数据 s.send() 发送TCP数据(send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完) s.sendall() 发送完整的TCP数据(本质就是循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时数据不丢失,循环调用send直到发完) s.recvfrom() 接收UDP数据 s.sendto() 发送UDP数据 s.getpeername() 连接到当前套接字的远端的地址 s.getsockname() 当前套接字的地址 s.getsockopt() 返回指定套接字的参数 s.setsockopt() 设置指定套接字的参数 s.close() 关闭套接字 面向锁的套接字方法 s.setblocking() 设置套接字的阻塞与非阻塞模式 s.settimeout() 设置阻塞套接字操作的超时时间 s.gettimeout() 得到阻塞套接字操作的超时时间 面向文件的套接字的函数 s.fileno() 套接字的文件描述符 s.makefile() 创建一个与该套接字相关的文件
重启服务端可能出现的错误
#加入一条socket配置,重用ip和端口 phone=socket(AF_INET,SOCK_STREAM) phone.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加 phone.bind((‘127.0.0.1‘,8080))
基于tcp的套接字模拟打电话
服务端
import socket # 1.先买手机 phone = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # socket.SOCK_STREAM指的是TCP协议 # 2.绑定电话卡 # phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 就是它,在bind前加,防止操作系统未及时清理链接,导致端口被占用的问题 phone.bind((‘127.0.0.1‘,8080)) # 端口范围0-65535,1-1024系统占用 # 3.开机 phone.listen(5) # 4.等电话 print(‘starting....‘) while True: # 链接循环 conn,addr = phone.accept() # (conn,client_addr) # print(‘=========>‘) # print(conn,addr) # 5.收发消息 while True: # 通信循环 try: # print(‘========>ready recv‘) data = conn.recv(1024) # 1024最大接收的字节数 conn.send(data.upper()) except ConnectionResetError: break # 6.挂电话 conn.close() # 7.关机 phone.close()
客户端
import socket # 1.先买手机 phone = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # socket.SOCK_STREAM指的是TCP协议 # 2.打电话 phone.connect((‘127.0.0.1‘,8080)) # 3.发收消息 while True: msg = input(‘>>:‘).strip() if not msg:continue phone.send(msg.encode(‘utf-8‘)) data = phone.recv(1024) print(data.decode(‘utf-8‘)) # 4.关闭 phone.close()
第三十天
远程执行命令
服务端
from socket import * import subprocess server = socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8080)) server.listen(5) while True: conn,addr = server.accept() while True: try: cmd = conn.recv(1024) if not cmd:break # linux系统中客户端断开不会抛异常,而是会不停收空 # 执行命令 cmd = cmd.decode(‘utf-8‘) # 调用模块,执行命令,并且收集命令的执行结果,而不是打印 obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout = obj.stdout.read() stderr = obj.stderr.read() # 发送命令的结果 conn.send(stdout+stderr) except ConnectionResetError: break conn.close() server.close()
客户端
from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) while True: cmd = input(‘>>:‘).strip() if not cmd:continue client.send(cmd.encode(‘utf-8‘)) data = client.recv(1024) print(data.decode(‘gbk‘)) client.close()
粘包问题
TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的
两种情况下会产生粘包
1.发送端需要等缓冲区满才发送出去,造成粘包(发送数据时间间隔很短,数据了很小,会合到一起,产生粘包)
2.接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)
远程执行命令粘包基本解决方法
服务端
from socket import * import subprocess import struct server = socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8080)) server.listen(5) while True: conn,addr = server.accept() while True: try: cmd = conn.recv(1024) if not cmd:break # linux系统中客户端断开不会抛异常,而是会不停收空 # 执行命令 cmd = cmd.decode(‘utf-8‘) # 调用模块,执行命令,并且收集命令的执行结果,而不是打印 obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout = obj.stdout.read() stderr = obj.stderr.read() # 第一步:制作报头 total_size = len(stdout) + len(stderr) header = struct.pack(‘i‘,total_size) # 第二步:先发报头(固定长度) conn.send(header) # 第三步:发送命令的结果 conn.send(stdout) conn.send(stderr) except ConnectionResetError: break conn.close() server.close()
客户端
from socket import * import struct client = socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) while True: cmd = input(‘>>:‘).strip() if not cmd:continue client.send(cmd.encode(‘utf-8‘)) # 第一步:收到报头:长度 header = client.recv(4) total_size = struct.unpack(‘i‘,header)[0] # 第二部:收完整真实的数据 recv_size = 0 res = b‘‘ while recv_size < total_size: recv_data = client.recv(1024) res += recv_data recv_size += len(recv_data) print(recv_data.decode(‘gbk‘)) client.close()
解决粘包问题终极版
服务端
import subprocess import struct import json from socket import * server=socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8086)) # print(server) server.listen(5) while True: conn,addr = server.accept() # print(conn) print(addr) while True: try: cmd = conn.recv(8096) if not cmd:break #针对linux #执行命令 cmd = cmd.decode(‘utf-8‘) #调用模块,执行命令,并且收集命令的执行结果,而不是打印 obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout = obj.stdout.read() stderr = obj.stderr.read() # 1:先制作报头,报头里放:数据大小, md5, 文件 header_dic = { ‘total_size‘:len(stdout)+len(stderr), ‘md5‘: ‘xxxxxxxxxxxxxxxxxxx‘, ‘filename‘: ‘xxxxx‘, ‘xxxxx‘:‘123123‘ } header_json = json.dumps(header_dic) header_bytes = header_json.encode(‘utf-8‘) header_size = struct.pack(‘i‘, len(header_bytes)) # 2: 先发报头的长度 conn.send(header_size) # 3:先发报头 conn.send(header_bytes) # 4:再发送真实数据 conn.send(stdout) conn.send(stderr) except ConnectionResetError: break conn.close() server.close()
客户端
import struct import json from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8086)) while True: cmd = input(‘>>: ‘).strip() if not cmd:continue client.send(cmd.encode(‘utf-8‘)) # 1:先收报头长度 obj = client.recv(4) header_size = struct.unpack(‘i‘, obj)[0] # 2:先收报头,解出报头内容 header_bytes = client.recv(header_size) header_json = header_bytes.decode(‘utf-8‘) header_dic = json.loads(header_json) print(header_dic) total_size = header_dic[‘total_size‘] # 3:循环收完整数据 recv_size = 0 res = b‘‘ while recv_size < total_size: recv_data = client.recv(1024) res += recv_data recv_size += len(recv_data) print(res.decode(‘gbk‘)) client.close()
第三十一天
udp协议得套接字通信
服务端
from socket import * server=socket(AF_INET,SOCK_DGRAM) server.bind((‘127.0.0.1‘,8083)) while True: data,client_addr=server.recvfrom(1024) print(‘客户端的数据: ‘,data) server.sendto(data.upper(),client_addr)
客户端
from socket import * client=socket(AF_INET,SOCK_DGRAM) while True: msg=input(‘>>: ‘).strip() client.sendto(msg.encode(‘utf-8‘),(‘127.0.0.1‘,8083)) data,server_addr=client.recvfrom(1024) print(data.decode(‘utf-8‘))
开启进程
from multiprocessing import Process import time def task(name): print(‘%s is running‘ %name) time.sleep(2) print(‘%s is done‘ %name) if __name__ == ‘__main__‘: # Process(target=task,kwargs={"name":‘egon‘}) p=Process(target=task,args=(‘egon‘,)) p.start() print(‘主‘)
subprocess和struct模块
subprocess
import subprocess obj = subprocess.Popen(‘dir‘,shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # 开启子进程执行命令,默认会将结果直接输出,stdout标准正确输出,PIPE管道,stderr错误输出 print(obj.stdout.read().decode(‘gbk‘)) # 打印正确的内容,为bytes类型,只能取一次 print(obj.stderr.read().decode(‘gbk‘)) # 读错误内容,没出错就没有内容
struct
import struct # 帮我们把数字转成固定长度的bytes类型 res = struct.pack(‘i‘,123123) print(struct.pack(‘i‘,123123)) # b‘\xf3\xe0\x01\x00‘ i表示int类型 转换为4位的bytes类型 res1 = struct.unpack(‘i‘,res) print(res1) # (123123,) res = struct.pack(‘q‘,12312344444) # q格式支持的数字范围更大,转换为8个bytes类型
操作系统简介
什么是操作系统
精简的说的话,操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序
操作系统位于计算机硬件与应用软件之间,本质也是一个软件。操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序员写的应用程序提供系统调用接口)两部分组成,所以,单纯的说操作系统是运行于内核态的,是不准确的
操作系统应该分成两部分功能
#一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。 例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节), #二:将应用程序对硬件资源的竞态请求变得有序化 例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,打印机可能打印一段a的内容然后又去打印c...,操作系统的一个功能就是将这种无序变得有序。
操作系统发展史
第一代计算机(1940~1955):真空管和穿孔卡片
第一代计算机的产生背景:
第一代之前人类是想用机械取代人力,第一代计算机的产生是计算机由机械时代进入电子时代的标志,从Babbage失败之后一直到第二次世界大战,数字计算机的建造几乎没有什么进展,第二次世界大战刺激了有关计算机研究的爆炸性进展。
lowa州立大学的john Atanasoff教授和他的学生Clifford Berry建造了据认为是第一台可工作的数字计算机。该机器使用300个真空管。大约在同时,Konrad Zuse在柏林用继电器构建了Z3计算机,英格兰布莱切利园的一个小组在1944年构建了Colossus,Howard Aiken在哈佛大学建造了Mark 1,宾夕法尼亚大学的William Mauchley和他的学生J.Presper Eckert建造了ENIAC。这些机器有的是二进制的,有的使用真空管,有的是可编程的,但都非常原始,设置需要花费数秒钟时间才能完成最简单的运算。
在这个时期,同一个小组里的工程师们,设计、建造、编程、操作及维护同一台机器,所有的程序设计是用纯粹的机器语言编写的,甚至更糟糕,需要通过成千上万根电缆接到插件板上连成电路来控制机器的基本功能。没有程序设计语言(汇编也没有),操作系统则是从来都没听说过。使用机器的过程更加原始,详见下‘工作过程’
特点:
没有操作系统的概念
所有的程序设计都是直接操控硬件
工作过程:
程序员在墙上的机时表预约一段时间,然后程序员拿着他的插件版到机房里,将自己的插件板街道计算机里,这几个小时内他独享整个计算机资源,后面的一批人都得等着(两万多个真空管经常会有被烧坏的情况出现)。
后来出现了穿孔卡片,可以将程序写在卡片上,然后读入机而不用插件板
优点:
程序员在申请的时间段内独享整个资源,可以即时地调试自己的程序(有bug可以立刻处理)
缺点:
浪费计算机资源,一个时间段内只有一个人用。
注意:同一时刻只有一个程序在内存中,被cpu调用执行,比方说10个程序的执行,是串行的
第二代计算机(1955~1965):晶体管和批处理系统
第二代计算机的产生背景:
由于当时的计算机非常昂贵,自认很自然的想办法较少机时的浪费。通常采用的方法就是批处理系统。
特点:
设计人员、生产人员、操作人员、程序人员和维护人员直接有了明确的分工,计算机被锁在专用空调房间中,由专业操作人员运行,这便是‘大型机’。
有了操作系统的概念
有了程序设计语言:FORTRAN语言或汇编语言,写到纸上,然后穿孔打成卡片,再讲卡片盒带到输入室,交给操作员,然后喝着咖啡等待输出接口
第二代如何解决第一代的问题/缺点:
1.把一堆人的输入攒成一大波输入,
2.然后顺序计算(这是有问题的,但是第二代计算也没有解决)
3.把一堆人的输出攒成一大波输出
现代操作系统的前身:(见图)
优点:批处理,节省了机时
缺点:
1.整个流程需要人参与控制,将磁带搬来搬去(中间俩小人)
2.计算的过程仍然是顺序计算-》串行
3.程序员原来独享一段时间的计算机,现在必须被统一规划到一批作业中,等待结果和重新调试的过程都需要等同批次的其他程序都运作完才可以(这极大的影响了程序的开发效率,无法及时调试程序)
第三代计算机(1965~1980):集成电路芯片和多道程序设计
第三代计算机的产生背景:
20世纪60年代初期,大多数计算机厂商都有两条完全不兼容的生产线。
一条是面向字的:大型的科学计算机,如IBM 7094,见上图,主要用于科学计算和工程计算
另外一条是面向字符的:商用计算机,如IBM 1401,见上图,主要用于银行和保险公司从事磁带归档和打印服务
开发和维护完全不同的产品是昂贵的,同时不同的用户对计算机的用途不同。
IBM公司试图通过引入system/360系列来同时满足科学计算和商业计算,360系列低档机与1401相当,高档机比7094功能强很多,不同的性能卖不同的价格
360是第一个采用了(小规模)芯片(集成电路)的主流机型,与采用晶体管的第二代计算机相比,性价比有了很大的提高。这些计算机的后代仍在大型的计算机中心里使用,此乃现在服务器的前身,这些服务器每秒处理不小于千次的请求。
如何解决第二代计算机的问题1:
卡片被拿到机房后能够很快的将作业从卡片读入磁盘,于是任何时刻当一个作业结束时,操作系统就能将一个作业从磁带读出,装进空出来的内存区域运行,这种技术叫做
同时的外部设备联机操作:SPOOLING,该技术同时用于输出。当采用了这种技术后,就不在需要IBM1401机了,也不必将磁带搬来搬去了(中间俩小人不再需要)
如何解决第二代计算机的问题2:
第三代计算机的操作系统广泛应用了第二代计算机的操作系统没有的关键技术:多道技术
cpu在执行一个任务的过程中,若需要操作硬盘,则发送操作硬盘的指令,指令一旦发出,硬盘上的机械手臂滑动读取数据到内存中,这一段时间,cpu需要等待,时间可能很短,但对于cpu来说已经很长很长,长到可以让cpu做很多其他的任务,如果我们让cpu在这段时间内切换到去做其他的任务,这样cpu不就充分利用了吗。这正是多道技术产生的技术背景
多道技术:
多道技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用。
空间上的复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序。
时间上的复用:当一个程序在等待I/O时,另一个程序可以使用cpu,如果内存中可以同时存放足够多的作业,则cpu的利用率可以接近100%,类似于我们小学数学所学的统筹方法。(操作系统采用了多道技术后,可以控制进程的切换,或者说进程之间去争抢cpu的执行权限。这种切换不仅会在一个进程遇到io时进行,一个进程占用cpu时间过长也会切换,或者说被操作系统夺走cpu的执行权限)
现代计算机或者网络都是多用户的,多个用户不仅共享硬件,而且共享文件,数据库等信息,共享意味着冲突和无序。 操作系统主要使用来 1.记录哪个程序使用什么资源 2.对资源请求进行分配 3.为不同的程序和用户调解互相冲突的资源请求。 我们可将上述操作系统的功能总结为:处理来自多个程序发起的多个(多个即多路)共享(共享即复用)资源的请求,简称多路复用 多路复用有两种实现方式 1.时间上的复用 当一个资源在时间上复用时,不同的程序或用户轮流使用它,第一个程序获取该资源使用结束后,在轮到第二个。。。第三个。。。 例如:只有一个cpu,多个程序需要在该cpu上运行,操作系统先把cpu分给第一个程序,在这个程序运行的足够长的时间(时间长短由操作系统的算法说了算)或者遇到了I/O阻塞,操作系统则把cpu分配给下一个程序,以此类推,直到第一个程序重新被分配到了cpu然后再次运行,由于cpu的切换速度很快,给用户的感觉就是这些程序是同时运行的,或者说是并发的,或者说是伪并行的。至于资源如何实现时间复用,或者说谁应该是下一个要运行的程序,以及一个任务需要运行多长时间,这些都是操作系统的工作。 2.空间上的复用 每个客户都获取了一个大的资源中的一小部分资源,从而减少了排队等待资源的时间。 例如:多个运行的程序同时进入内存,硬件层面提供保护机制来确保各自的内存是分割开的,且由操作系统控制,这比一个程序独占内存一个一个排队进入内存效率要高的多。 有关空间复用的其他资源还有磁盘,在许多系统中,一个磁盘同时为许多用户保存文件。分配磁盘空间并且记录谁正在使用哪个磁盘块是操作系统资源管理的典型任务。 这两种方式合起来便是多道技术
空间上的复用最大的问题是:程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制。如果内存彼此不分割,则一个程序可以访问另外一个程序的内存,
首先丧失的是安全性,比如你的qq程序可以访问操作系统的内存,这意味着你的qq可以拿到操作系统的所有权限。
其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操作系统的内存给回收了,则操作系统崩溃。
第三代计算机的操作系统仍然是批处理
许多程序员怀念第一代独享的计算机,可以即时调试自己的程序。为了满足程序员们很快可以得到响应,出现了分时操作系统
如何解决第二代计算机的问题3:
分时操作系统:
多个联机终端+多道技术
20个客户端同时加载到内存,有17在思考,3个在运行,cpu就采用多道的方式处理内存中的这3个程序,由于客户提交的一般都是简短的指令而且很少有耗时长的,索引计算机能够为许多用户提供快速的交互式服务,所有的用户都以为自己独享了计算机资源
CTTS:麻省理工(MIT)在一台改装过的7094机上开发成功的,CTSS兼容分时系统,第三代计算机广泛采用了必须的保护硬件(程序之间的内存彼此隔离)之后,分时系统才开始流行
MIT,贝尔实验室和通用电气在CTTS成功研制后决定开发能够同时支持上百终端的MULTICS(其设计者着眼于建造满足波士顿地区所有用户计算需求的一台机器),很明显真是要上天啊,最后摔死了。
后来一位参加过MULTICS研制的贝尔实验室计算机科学家Ken Thompson开发了一个简易的,单用户版本的MULTICS,这就是后来的UNIX系统。基于它衍生了很多其他的Unix版本,为了使程序能在任何版本的unix上运行,IEEE提出了一个unix标准,即posix(可移植的操作系统接口Portable Operating System Interface)
后来,在1987年,出现了一个UNIX的小型克隆,即minix,用于教学使用。芬兰学生Linus Torvalds基于它编写了Linux
第四代计算机(1980~至今):个人计算机
略
第三十二天
并发编程
进程
顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。
进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的
必备的理论基础
#一 操作系统的作用: 1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口 2:管理、调度进程,并且将多个进程对硬件的竞争变得有序 #二 多道技术: 1.产生背景:针对单核,实现并发 ps: 现在的主机一般是多核,那么每个核都会利用多道技术 有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个 cpu中的任意一个,具体由操作系统调度算法决定。 2.空间上的复用:如内存中同时有多道程序 3.时间上的复用:复用一个cpu的时间片 强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样 才能保证下次切换回来时,能基于上次切走的位置继续运行
并行与并发
并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)
并行:同时运行,只有具备多个cpu才能实现并行
IO阻塞
从磁盘读取数据是有延迟的,磁头寻道的平均延时为5毫秒,在磁道上找到数据的平均延时为4毫秒,在IO阻塞的时间里,
cpu就不会继续工作,这是操作系统就会把cpu分配给其它程序
使用multiprocessing模块开启进程
from multiprocessing import Process import time # win 创建进程接口 CreateProcess linux为fork # linux系统子进程的初始状态和父进程完全一样,win上不完全一样 def task(name): print(‘%s is running‘%name) time.sleep(2) print(‘%s is done‘ % name) if __name__ == ‘__main__‘: # Process(target=task,kwargs={‘name‘:‘egon‘}) p = Process(target=task,args=(‘egon‘,)) p.start() print(‘主‘)
开启进程的两种方式
# 方式一 from multiprocessing import Process import time def task(name): print(‘%s is running‘%name) time.sleep(2) print(‘%s is done‘ % name) if __name__ == ‘__main__‘: p = Process(target=task,args=(‘egon‘,)) p.start() print(‘主‘) # 方式二 from multiprocessing import Process import time class Myprocess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(‘%s is running‘ % self.name) time.sleep(2) print(‘%s is done‘ % self.name) if __name__ == ‘__main__‘: p = Myprocess(‘egon‘) p.start() print(‘主‘)
Process对象的属性和方法
# 运行一个python文件其实是用python解释器去执行 # import time,os # print(os.getpid(),os.getppid()) # pid当前进程的进程号,ppid父进程的进程号 # time.sleep() from multiprocessing import Process import time def task(name): print(‘%s is running‘%name) time.sleep(2) print(‘%s is done‘ % name) if __name__ == ‘__main__‘: p = Process(target=task,args=(‘egon‘,),name=‘xxx‘) p.start() # print(p.name) # 打印进程的名字 # print(p.pid) # 进程号 # print(p.is_alive()) # 是否存活 # p.terminate() # 杀死进程,向操作系统发出指令,不要用 # p.join() # join(p) 等待p执行完 p.join(timeout=2) 可以指定超时时间,一般不用 print(‘主‘)
进程池
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。
多进程是实现并发的手段之一,需要注意的问题是:
- 很明显需要并发执行的任务通常要远大于核数
- 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
- 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,
上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,
不会开启其他进程
‘‘‘ 提交/调用任务的方式有两种: 同步调用:提交/调用一个任务,然后就在原地等着,等到该任务执行完毕拿到结果,再执行下一行代码 异步调用:提交/调用一个任务,不在原地等着,直接执行下一行代码,结果 ‘‘‘ # from multiprocessing import Process,Pool from concurrent.futures import ProcessPoolExecutor import time,random,os def piao(name): print(‘%s is piaoing %s‘%(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == ‘__main__‘: p = ProcessPoolExecutor(4) # 从始至终只有4个进程干活,进程号不变 objs = [] for i in range(10): obj = p.submit(piao,‘alex%s‘%i) # obj.result() # 进程运行的结果 # res = p.submit(piao,‘alex%s‘%i).result() # 同步调用,要等待任务的执行结果 # obj = p.submit(piao, ‘alex%s‘%i) # 异步调用 objs.append(obj) for i in objs: print(i.result()) p.shutdown(wait=True) # 等待进程池中的任务都干完,同时禁止往池中添加任务了 # 使用Pool时,分两步 pool.close() pool.join() print(‘主‘,os.getpid())
第三十三天
线程
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程
车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
流水线的工作需要电源,电源就相当于cpu
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。
例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。
开启线程的两种方式
from threading import Thread import time,os # 第一种方式 def task(): print(‘%s is running‘ %os.getpid()) time.sleep(2) print(‘%s is done‘ %os.getpid()) if __name__ == ‘__main__‘: t = Thread(target=task,) t.start() print(‘主‘) ‘‘‘ 1.一个子进程内不开进程也不开子线程:主线程结束,该进程就结束 2.当一个进程内开启子进程时: 主线程结束,主进程要等,等所有子进程运行完毕给儿子收尸 3.当一个进程内开启多个线程时: 主线程结束并不意味着进程结束, 进程的结束指的是该进程内所有的线程都运行完毕,才应该回收进程 ‘‘‘ # 第二种方式 class Mythread(Thread): def __init__(self): super().__init__() def run(self): print(‘%s is running‘ % os.getpid()) time.sleep(2) print(‘%s is done‘ % os.getpid()) if __name__ == ‘__main__‘: t = Mythread() t.start() print(‘主‘)
多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。详细的讲分为4点:
1. 多线程共享一个进程的地址空间
2. 线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用
3. 若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。
4. 在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开进程开销要小的多。(这一条并不适用于python)
线程对象的其它属性和方法
from threading import Thread,current_thread,enumerate,active_count import time,os def task(): print(‘%s is running‘ %current_thread().getName()) # 线程名 current_thread()当前线程 time.sleep(2) print(‘%s is done‘ %os.getpid()) if __name__ == ‘__main__‘: # t = Thread(target=task,name=‘xxx‘) t = Thread(target=task) t.start() # t.join() # print(t.name) # 线程名 Thread-1 print(enumerate()) # 当前或者的线程对象 print(active_count()) # 当前活着线程的线程数 print(‘主‘,current_thread().getName()) # MainThread
线程与进程内存空间占用
# 进程之间内存空间隔离 from multiprocessing import Process n = 100 def task(): global n n = 0 if __name__ == ‘__main__‘: t = Process(target=task,) t.start() t.join() print(‘主‘,n) # 100 # 线程之间内存空间共享 from threading import Thread n = 100 def task(): global n n = 0 if __name__ == ‘__main__‘: t = Thread(target=task,) t.start() t.join() print(‘主‘,n) # 0
线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import time,random,os def task(n): print(‘%s is running‘%current_thread().getName()) time.sleep(random.randint(1,3)) return n**2 if __name__ == ‘__main__‘: # t = ProcessPoolExecutor() # 默认是cpu的核数 # print(os.cpu_count()) # 查看cpu核数 t = ThreadPoolExecutor(3) # 默认为cpu的核数*5 objs = [] for i in range(10): obj = t.submit(task,i) objs.append(obj) t.shutdown(wait=True) for obj in objs: print(obj.result()) print(‘主‘,current_thread().getName())
异步调用和回调函数
import requests from threading import Thread,current_thread from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time def get(url): print(‘%s GET %s‘%(current_thread().getName(),url)) response = requests.get(url) if response.status_code == 200: return {‘url‘:url,‘text‘:response.text} # print(type(response.text)) # <class ‘str‘> def parse(obj): res = obj.result() print(‘[%s] <%s> (%s)‘ % (current_thread().getName(), res[‘url‘],len(res[‘text‘]))) # print(‘[%s] parse res [%s]‘%(res[‘url‘],len(res[‘text‘]))) if __name__ == ‘__main__‘: urls = [ ‘https://www.python.org‘, ‘https://www.baidu.com‘, ‘https://www.jd.com‘ ] t = ThreadPoolExecutor(2) # t = ProcessPoolExecutor(2) for url in urls: t.submit(get,url).add_done_callback(parse) # parse(obj) t.shutdown(wait=True) print(‘主‘) ‘‘‘ 异步调用: 提交完任务(为该任务绑定一个回调函数),不用在原地等任务执行完毕拿到结果,可以直接提交下一个任务 一个任务一旦执行完毕就会自动触发回调函数的运行 回调函数的参数是单一的: 回调函数的参数就是它所绑定任务的返回值 ‘‘‘ # 进程池,回调的活由主进程干 # 线程池,回调的活都有可能干
第三十四天
守护进程
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
# 守护进程 from multiprocessing import Process import os,time,random def task(): print(‘%s is running‘%os.getpid()) time.sleep(2) print(‘%s is done‘%os.getpid()) if __name__ == ‘__main__‘: p = Process(target=task) p.daemon = True # 必须在p.start()之前,守护进程不能开启子进程 p.start() print(‘主‘) ‘‘‘ 举例说明守护进程的应用场景: 假设有两个任务要干,要玩出并发的效果,使用进程的话可以让主进程执行一个任务 然后开启一个子进程执行一个任务。 如果这两个任务毫无关系,那么就直接开启一个子进程 如果主进程的任务在执行完毕后,子进程的任务没有存在的意义了,那么该子进程应该 在开启之前就被设置成守护进程 ‘‘‘ # 迷惑人的例子 #主进程代码运行完毕,守护进程就会结束 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == ‘__main__‘: p1 = Process(target=foo) p2 = Process(target=bar) p1.daemon = True p1.start() p2.start() print("main-------") # 打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
守护线程
对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
# 守护线程:等到该进程内所有非守护线程都运行完才死掉 from threading import Thread import os,time,random def task(): print(‘%s is running‘%os.getpid()) time.sleep(2) print(‘%s is done‘%os.getpid()) if __name__ == ‘__main__‘: t = Thread(target=task) t.daemon = True # 必须在p.start()之前 t.start() print(‘主‘) # 迷惑人的例子 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == ‘__main__‘: t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("main-------")
互斥锁
# 进程 from multiprocessing import Process,Lock import os,time,random def task(mutex): mutex.acquire() print(‘%s print 1‘%os.getpid()) time.sleep(random.randint(1,3)) print(‘%s print 2‘%os.getpid()) mutex.release() if __name__ == ‘__main__‘: mutex = Lock() p1 = Process(target=task,args=(mutex,)) p2 = Process(target=task,args=(mutex,)) p3 = Process(target=task,args=(mutex,)) p1.start() p2.start() p3.start() # 线程 from threading import Thread,Lock import time n = 100 def task(): # global n # mutex.acquire() # temp = n # time.sleep(0.1) # n = temp - 1 # mutex.release() global n with mutex: temp = n time.sleep(0.1) n = temp - 1 if __name__ == ‘__main__‘: mutex = Lock() t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() print(n)
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性
模拟抢票
# 创建db.txt文件 # {"count": 1} from multiprocessing import Process,Lock import json,os,time,random def search(): with open(‘db.txt‘,encoding=‘utf-8‘) as f: dic = json.load(f) print(‘%s 剩余票数 %s‘%(os.getpid(),dic[‘count‘])) def get(): with open(‘db.txt‘,encoding=‘utf-8‘) as read_f: dic = json.load(read_f) if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(random.randint(1,3)) # 模拟手速和网速 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as write_f: json.dump(dic,write_f) print(‘%s 抢票成功‘%os.getpid()) def task(mutex): search() mutex.acquire() get() mutex.release() if __name__ == ‘__main__‘: mutex = Lock() for i in range(20): p = Process(target=task,args=(mutex,)) p.start()
信号量
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁,信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念 from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() print(‘%s 占到一个茅坑‘ %user) time.sleep(random.randint(0,3)) #模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了 sem.release() if __name__ == ‘__main__‘: sem=Semaphore(5) p_l=[] for i in range(13): p=Process(target=go_wc,args=(sem,‘user%s‘ %i,)) p.start() p_l.append(p) for i in p_l: i.join() print(‘============》‘)
进程队列和线程队列
from multiprocessing import Queue # 进程队列 import queue # 线程队列 q = Queue(3) q.put({‘a‘:1}) q.put(‘xxxx‘) q.put(3) print(q.get()) print(q.get()) print(q.get()) # 队列 q = queue.Queue(3) q.put({‘a‘:1}) q.put(‘xxxx‘) q.put(3) print(q.get()) print(q.get()) print(q.get()) # 优先级队列,数字越小,优先级越高 q = queue.PriorityQueue(3) q.put((10,{‘a‘:1})) q.put((-1,‘xxxx‘)) q.put((0,3)) print(q.get()) print(q.get()) print(q.get()) # 堆栈 q = queue.LifoQueue(3) q.put({‘a‘:1}) q.put(‘xxxx‘) q.put(3) print(q.get()) print(q.get()) print(q.get())
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Queue,Process import time,random,os def producer(q): for i in range(10): res = ‘包子%s‘%i time.sleep(0.5) q.put(res) print(‘%s 生产了 %s‘%(os.getpid(),res)) def consumer(q): while True: res = q.get() if res is None:break print(‘%s 吃 %s‘%(os.getpid(),res)) time.sleep(random.randint(2,3)) if __name__ == ‘__main__‘: q = Queue() p = Process(target=producer,args=(q,)) c = Process(target=consumer,args=(q,)) p.start() c.start() p.join() q.put(None) print(‘主‘)
第三十五天
生产者消费者模型高级一点的方法
#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 #参数介绍: maxsize是队列中允许最大项数,省略则无大小限制。 #方法介绍: JoinableQueue的实例p除了与Queue对象相同的方法之外还具有: q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常 q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
GIL解释器锁
在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念,所以这里要先明确一点:
GIL并不是Python的特性,Python完全可以不依赖于GIL
GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,
以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。
可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
每一个cpython进程内都有一个GIL
GIL导致同一进程内的多个线程同一时间只能有一个运行
之所以有GIL,是因为cpython的内存管理不是线程安全的
对于计算密集型用多进程,对于IO密集型用多线程
死锁和递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁
from threading import Thread,Lock,RLock import time # mutexA = Lock() # mutexB = Lock() mutexA = mutexB = RLock() # 一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止 class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘%s 拿到A锁‘%self.name) mutexB.acquire() print(‘%s 拿到A锁‘ % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘%s 拿到B锁‘%self.name) time.sleep(0.1) mutexA.acquire() print(‘%s 拿到A锁‘ % self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t = MyThread() t.start()
协程
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的
利用yield实现单线程下的并发
import time def consumer(): ‘‘‘任务1:接收数据,处理数据‘‘‘ while True: x=yield def producer(): ‘‘‘任务2:生产数据‘‘‘ g = consumer() next(g) for i in range(10000000): g.send(i) start = time.time() # 基于yield保存状态,实现两个任务直接来回切换,即并发的效果 # PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. producer() stop = time.time() print(stop-start) # 2.0272178649902344
gevent模块
time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前
或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
# pip3 install gevent # 1.切换+保存状态 # 2.检测IO,实现遇到IO切换 from gevent import monkey monkey.patch_all() # 将程序中的所有IO操作打标记,使gevent能识别 import gevent import time def eat(name): print(‘%s eat 1‘%name) time.sleep(2) print(‘%s eat 2‘ % name) def play(name): print(‘%s play 1‘%name) time.sleep(3) print(‘%s play 2‘ % name) g1 = gevent.spawn(eat,‘alex‘) g2 = gevent.spawn(play,‘egon‘) g1.join() g2.join()
第三十六天
IO阻塞模型(blocking IO)
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
服务端
客户端
非阻塞IO模型
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
服务端
客户端
IO多路复用
IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
强调:
1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
服务端
客户端
socketserver模块
TCP
服务端
客户端
UDP
服务端
客户端
paramiko模块
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实
下载安装
pip3 install paramiko #在python3中
SSHClient
用于连接远程服务器并执行基本命令
基于用户名密码连接:
基于公钥密钥连接:
客户端文件名:id_rsa
服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)
SFTPClient
用于连接远程服务器并执行上传下载
基于用户名密码上传下载