前情回顾:waitpid 创建二级子进程 注意事项:先根据功能计划模块,确定技术点做好整体架构的设计模块的编写 ----》 模块的测试发现bug或者进行优化积累调试经验 multiprocessing Process(name,target,args,kwargs) ---》 进程对象pp.pid p.name p.start() p.join() p.is_alive()p.daemon 文件拷贝 cookie :size = os.path.getsize(‘file‘)功能:获取一个文件的大小 ************************************************创建自己的进程类 class Parent(object): #固定方法,希望父类给实现好 def __call__(): run(1,2,3) def run(a,b,c): pass class Child(Parent): def __call__(): run(1,2,3) def run(a,b,c): .... .... 多进程 优点 : 并行多个任务,提高运行效率 空间独立,数据安全,创建方便 缺点 : 进程创建销毁的过程中消耗较多的计算机资源 进程池 在需要频繁的创建删除较多进程的情况下,导致计算机资源消耗过多 进程池如何处理 1. 创建进程池,在池内放入适量的进程2. 将事件加入进程池等待队列3. 使用进程池中的进程不断处理事件4. 所有事件处理后,回收关闭进程池 from multiprocessing import Pool Pool()功能 : 创建进程池参数 : processes : 指定进程池中进程数量返回 : 得到进程池对象 pool.apply_async() 功能 : 异步方式将事件放入进程池执行参数 : func : 要执行的事件函数 args : 同Process中args 给函数传参 kwds : 同Process中kwargs 给函数传参返回值 : 返回一个对象 该对象可以通过get()方法得到 func函数的返回值 pool.close() 功能 : 关闭进程池,使其无法加入新的事件pool.join()功能 : 阻塞等待进程池退出 (当所有事件处理完毕后) pool.apply() 用法和apply_async一样,只是需要顺序执行,一个事件结束在执行另一个事件 pool.map(func,iter)功能 : 类似于内建函数map 将第二个参数的迭代数传递个第一个参数的函数执行。同时兼容了使用进程池执行返回值: 返回func的返回值列表 r = pool.map(fun,test)===> r = []for i in test: res = pool.apply_async(fun,(i,)) r.append(res.get()) 进程间通信 磁盘交互 : 1. 速度慢 2. 不安全 socket 本地套接字 管道 消息队列 共享内存 信号 信号量 套接字 管道通信 pipe 在内存中开辟一块空间,对多个进程可见,通过管道,多进程进行通信 multiprocessing --- 》 Pipe fd1,fd2 = Pipe(duplex = True)功能 : 创建一个管道参数 : duplex 默认为True 表示双向管道 设置为False 则表示单向管道返回值 : 返回两个管道流对象,表示管道的两端 如果是双向管道则连个均可读写 如果为单向管道则,fd1只能读,fd2只能写 fd1.recv()功能 : 接收消息 (每次接收一条)参数 : 无返回值: 接收到的消息 * 如果管道没有消息会阻塞 fd2.send(data) 功能 : 发送消息 可以是字符串或其他类型 参数:要发送的内容 * 如果没有接收端则管道破裂 消息队列 队列 : 先进先出 在内存中开辟队列模型,用来存放消息。认可拥有队列的进程都可以存取消息 创建队列 q = Queue(maxsize = 0)功能 : 创建一个消息队列参数 : maxsize 默认为0 表示队列可存放消息由内存而定 > 0 表示队列最多存放多少条消息返回值 : 返回消息队列对象 q.put() 功能 : 向队列中存放消息 参数 : 要存的消息 (字符串 整数 列表)* 当队列满时会阻塞 q.full()判断队列是否为满 满返回True q.get()功能 : 向队列中取出消息 返回值 :取出的消息* 当队列空时会阻塞 q.empty()判断队列是否为空 空返回True q.qsize()得到当前队列中消息的个数 q.close() 关闭队列 * put get中均有可选参数 block 和timeoutblock 默认为True 表示阻塞函数 如果设置为False则不阻塞timeout block 为True 时 设置超时时间 共享内存 在内存中开辟一段空间,存储数据,对多个进程可见。每次写入共享内存的数据会覆盖之前的内容。由于对内存格式化较少,所以存取速度快 from multiprocessing import Value,Array obj = Value(ctype,obj)功能 : 开辟共享内存空间参数 : ctype str 要转变的c类型 (对照ctype表) obj 写入共享内存的初始值返回值 : 返回一个共享内存对象 obj.value 即可得到共享内存中的值 obj = Array(ctype,obj)功能 : 开辟共享内存空间参数 : ctype 要转换的类型 obj 存入到共享内存中的数据 是一个列表,要求列表中数类型一致 正整数,则表示开辟一个多大的序列空间 返回值: 返回一个共享内存对象 管道 消息队列 共享内存开辟空间 内存 内存 内存 读写方式 双向/单向 先进先出 操作覆盖内存 效率 一般 一般 快 应用 多用于亲 方便灵活 较复杂 缘进程 广泛 是否需要 否 否 需要互斥机制 信号 一个进程向另一个进程通过信号传递某种讯息 kill -l 查看信号kill -signame PID 给PID的进程发送一个信号 关于信号: 信号名称 : 系统定义,信号的名字 信号的含义 : 系统定义 ,信号的作用 信号的默认处理方法 : 系统定义,信号给接收进程带来的行为 一般有 终止 暂停 忽略 python 如何操作信号 发送os.kill(pid,sig)功能:向一个进程发送一个信号参数 : pid : 要发送信号的进程PID sig : 要发送的信号 signal.alarm(sec)功能 : 向自身发送一个时钟信号 SIGALRM参数 : sec 时钟秒数 * 信号属于异步通信方式,信号的发送不会影响进程的持续执行 *一个进程中只能同时有一个时钟,后面的时钟时间会覆盖前面的 处理 : signal.pause() 功能: 阻塞等待一个信号的发生 作业: 1. 对进程间通信方式进行描述连接2. 熟悉进程间通信代码 **********************************************************import signal import time #3秒后向自己发送个SIGALRM信号signal.alarm(3)time.sleep(2) signal.alarm(8) #阻塞等待接收一个信号signal.pause() while True: time.sleep(1) print("等待时钟.....")**********************************************************from multiprocessing import Array,Process import time def fun(shm): for i in shm: print(i) shm[2] = 1000 #开辟共享内存空间,可容纳6个整数#初始值是[1,2,3,4,5,6]# shm = Array(‘i‘,[1,2,3,4,5,6])#表示在共享内存中开辟一个包含6个整形的空间shm = Array(‘i‘,6) p = Process(target = fun,args = (shm,))p.start()p.join()for i in shm: print(i)**********************************************************from multiprocessing import Process import time class ClockProcess(Process): def __init__(self,value): self.value = value Process.__init__(self) # 在自定义的进程类中 重写父类的这个方法 def run(self): n = 5 while n > 0: print("The time is {}".\ format(time.ctime())) time.sleep(self.value) n -= 1 #用自己的进程类创建进程p = ClockProcess(2) # 自动执行run方法p.start()p.join()***********************************************************import os from multiprocessing import Process #获取文件大小size = os.path.getsize(‘file.jpg‘) # 在创建进程前获取文件对象,父子进程操作同一个文件流# 会造成操作混乱# f = open(‘file.jpg‘,‘rb‘) #复制前半部分def copy1(): f = open(‘file.jpg‘,‘rb‘) n = size // 2 fw = open(‘copy1.jpg‘,‘wb‘) while True: if n < 64: data = f.read(n) fw.write(data) break data = f.read(64) fw.write(data) n -= 64 fw.close() #复制后半部分def copy2(): f = open(‘file.jpg‘,‘rb‘) fw = open("copy2.jpg",‘wb‘) f.seek(size //2,0) while True: data = f.read(64) if not data: break fw.write(data) fw.close() p1 = Process(target = copy1)p2 = Process(target = copy2) p1.start()p2.start() p1.join()p2.join()*****************************************************import os import signal #向24051进程发送SIGKILL信号os.kill(24051,signal.SIGKILL)*****************************************************from multiprocessing import Process,Pipe import os,time #如果参数为False则fd1只能recv fd2只能send# fd1,fd2 = Pipe(False) #创建一个双向管道fd1,fd2 = Pipe()# fd1.close() def fun(name): time.sleep(3) #发字符串到管道 fd2.send("hello " + str(name)) print(os.getppid(),"----",os.getpid()) jobs = [] for i in range(5): p = Process(target = fun,args = (i,)) jobs.append(p) p.start() #接收子进程发送的消息for i in range(5): data = fd1.recv() print(data) for i in jobs: i.join()*******************************************************from multiprocessing import Pool from time import sleep import os def worker(msg): sleep(2) print(msg) return msg + " over" # 创建进程池pool = Pool(processes = 4) result = []# 放入事件for i in range(10): msg = "hello %d"%i #加入事件后进程就会立即操作事件 #apply_async 的返回值对象,该对象可以获取worker返回结果 r = pool.apply_async(worker,(msg,)) # pool.apply(worker,(msg,)) result.append(r) # sleep(3)# print("++++++++")#关闭进程池 不能再加入事件pool.close() # sleep(3)# print("*********") #阻塞等待回收pool.join() print("===========")#通过apply_async()返回对象get()方法获取返回值for res in result: print(res.get())********************************************************from multiprocessing import Pool import time def fun(fn): time.sleep(1) return fn * fn test = [1,2,3,4,5,6] pool = Pool(processes = 4)#使用map迭代r = pool.map(fun,test)print(r)pool.close()pool.join()***********************************************************from multiprocessing import Queue from time import sleep#创建队列q = Queue(3) q.put(1)print(q.full())q.put(2)q.put(3)print(q.full()) # 设置超时事件为3sec# q.put(4,True,3) print(q.get())print("队列中还有%d条消息"%q.qsize())print(q.empty())q.close() #关闭队列*************************************************************from multiprocessing import Process,Queue import time #创建消息队列q = Queue() def fun1(): time.sleep(1) q.put("我是进程1") def fun2(): time.sleep(2) print("取消息:",q.get()) p1 = Process(target = fun1)p2 = Process(target = fun2)p1.start()p2.start() p1.join()p2.join()***************************************************************from multiprocessing import Value,Process import time import random #向共享内存存钱def deposite(money): for i in range(100): time.sleep(0.03) money.value += random.randint(1,200)#从共享内存取钱def withdraw(money): for i in range(100): time.sleep(0.02) money.value -= random.randint(1,150) #创建共享内存对象money = Value(‘i‘,2000) d = Process(target = deposite,args = (money,))w = Process(target = withdraw,args = (money,))d.start()w.start()d.join()w.join() print(money.value)***************************************************************import os import time while True: time.sleep(2) print(os.getpid()) ***************************************************************
原文地址:https://www.cnblogs.com/wcin/p/9119054.html
时间: 2024-10-16 17:03:15