一、守护进程
守护进程:一个进程B守护另一个进程A,当被守护的进程A结束,进程B也就结束了。(不一定同生,但会同死)
两个特点:
①守护进程会在主进程代码执行结束后就终止
②守护进程内无法再开启子进程,否则抛出异常。
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
应用场景:如果主进程认为一旦自己结束,子进程也就没有继续运行的必要了,就可以将子进程设置为守护进程。(例如:qq正在调用自身的下载文件功能,但是此时退出了qq,下载进程也就可以直接关闭了)
方法为:process.daemon=True,必须放置在子进程开启前。
from multiprocessing import Processimport time?def task(name): print(‘%s is running‘ % name) time.sleep(3) print(‘%s is running‘ % name) # 等待三秒的时候,被守护进程已经执行完了,所以守护进程会被回收,此句话就不会打印了。??if __name__ == ‘__main__‘: obj = Process(target=task, args=(‘守护进程‘,)) obj.daemon=True # 必须在start前设置,会限制p创建子进程,且父进程结束子进程马上殉葬结束。 obj.start() # 发送信号给操作系统 time.sleep(1) print(‘被守护进程‘)输出结果:守护进程 is running被守护进程
二、互斥锁
互斥锁:将并发变为串行(即一个一个运行)
from multiprocessing import Process,Lockimport time,random?mutex=Lock() # 实例化互斥锁,也可以放到main下面。# 强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire(),否则会形成阻塞,程序卡死。?def task1(lock): lock.acquire() # 得到这把锁,此时其他的同级子进程都只能等待当前进程将锁释放之后才有机会运行。 print(‘task1:名字是egon‘) time.sleep(random.randint(1,3)) print(‘task1:性别是male‘) time.sleep(random.randint(1,3)) print(‘task1:年龄是18‘) lock.release() # 锁用完之后,一定要释放,否则其他子进程无法进行?def task2(lock): lock.acquire() # 是一个阻塞的函数 会等到别的进程释放锁才能继续执行 print(‘task2:名字是alex‘) time.sleep(random.randint(1,3)) print(‘task2:性别是male‘) time.sleep(random.randint(1,3)) print(‘task2:年龄是78‘) lock.release()??def task3(lock): lock.acquire() print(‘task3:名字是lxx‘) time.sleep(random.randint(1,3)) print(‘task3:性别是female‘) time.sleep(random.randint(1,3)) print(‘task3:年龄是30‘) lock.release()??if __name__ == ‘__main__‘: p1=Process(target=task1,args=(mutex,)) # 创建进程并传参,将锁传给各个子进程 p2=Process(target=task2,args=(mutex,)) p3=Process(target=task3,args=(mutex,)) p1.start() # 三个子程序都能启动,但是一旦碰到锁,谁先抢到谁先运行,其他的要等。 p2.start() p3.start()
互斥锁与join的区别:
二者原理都一样,都是将并发变为串行,从而保证数据增删改查的有序性,不至于让数据发生错乱。
二者的区别在于join是按照人为指定顺序执行,进程执行顺序是固定的,而互斥锁是所有进程公平竞争,谁先抢到锁,谁就能先执行;且互斥锁可以指定某一部分代码串行,其余部分代码并发,而join则是整个子进程代码完全串行。
互斥锁的本质是一个布尔类型的数据,在执行代码前,会先判断这个值。
互斥锁在抢票中的应用:
import jsonfrom multiprocessing import Process,Lockimport timeimport random?# 查看剩余票数def check_ticket(usr): time.sleep(random.randint(1,3)) with open("ticket.json","r",encoding="utf-8") as f: # 提前做好json文件,做成字典。 dic = json.load(f) print("%s查看 剩余票数:%s" % (usr,dic["count"]))?def buy_ticket(usr): with open("ticket.json","r",encoding="utf-8") as f: dic = json.load(f) if dic["count"] > 0: time.sleep(random.randint(1,3)) dic["count"] -= 1 with open("ticket.json", "w", encoding="utf-8") as f2: # json.dump(dic,f2) print("%s 购票成功!" % usr) else: print("票被抢走了!%s购票失败" % usr)??def task(usr,lock): check_ticket(usr) # 查票可以并发 lock.acquire() # 排票就要串行了 buy_ticket(usr) lock.release() # 买好票后要释放锁?if __name__ == ‘__main__‘: lock = Lock() for i in range(10): # 启动10个进程去买票,模仿10个人。 p = Process(target=task,args=("用户%s" % i,lock)) p.start() 输出结果: #每次结果可能都不一致用户0查看 剩余票数:1用户2查看 剩余票数:1用户3查看 剩余票数:1用户1查看 剩余票数:1用户0 购票成功!票被抢走了!用户2购票失败票被抢走了!用户3购票失败票被抢走了!用户1购票失败用户5查看 剩余票数:0票被抢走了!用户5购票失败用户4查看 剩余票数:0票被抢走了!用户4购票失败用户7查看 剩余票数:0票被抢走了!用户7购票失败用户6查看 剩余票数:0票被抢走了!用户6购票失败用户9查看 剩余票数:0票被抢走了!用户9购票失败用户8查看 剩余票数:0票被抢走了!用户8购票失败
Rlock
RLock 表示可重入锁,其特点是可以多次执行acquire而不会阻塞。如果在多进程中使用Rlock,并且一个进程a 执行了多次acquire,其他进程b要想获得这个锁,需要进程a把锁解开,并且锁了几次就要解几次。普通锁如果多次执行acquire将会锁死(阻塞)
from multiprocessing import RLock,Processimport timedef task(i,lock): lock.acquire() lock.acquire() print(i) time.sleep(3) lock.release() lock.release() #第一个过来 睡3秒 第二个过来了 睡3秒 第一个打印1 第二个打印2?if __name__ == ‘__main__‘: lock = RLock() # 重入锁,即多次锁 p1 = Process(target=task,args=(1,lock)) p1.start() p2 = Process(target=task,args=(2,lock)) p2.start()
死锁:指锁无法打开,导致程序无法继续运行。即两把锁交给了两个人,这两个人还都需要对方的那把锁,互相谁也无法释放锁,也得不到对方的锁。
程序中尽可能不要使用多把锁,防止发生死锁现象
from multiprocessing import Process,Lockimport timedef task1(l1,l2,i): l1.acquire() # 拿走了l1的锁,但是task2拿走了l2的锁,两个进程后续都无法进行。 print("盘子被%s抢走了" % i) time.sleep(1) l2.acquire() print("筷子被%s抢走了" % i) print("吃饭..") l1.release() l2.release()??def task2(l1,l2,i): l2.acquire() print("筷子被%s抢走了" % i) l1.acquire() print("盘子被%s抢走了" % i) print("吃饭..") l1.release() l2.release()??if __name__ == ‘__main__‘: l1 = Lock() l2 = Lock() Process(target=task1,args=(l1,l2,1)).start() Process(target=task2,args=(l1,l2,2)).start()
三、IPC通信
IPC:进程间通信, 进程间相互独立,所以要解决进程间相互传输数据的问题。 1、使用共享文件,多个进程同时读写一个文件 ? 受限IO,速度慢 2、管道,基于内存速度快,但是是单向的,比较麻烦 3、申请共享内存空间,进程们可以共享这片区域 ? 基于内存速度快,但是数据量不能太大
from multiprocessing import Process,Manager,Lock # 导入managerimport time?mutex=Lock()?def task(dic,lock): lock.acquire() # 如果不加锁,都会读取字典,然后修改,结果就是变成9 temp=dic[‘num‘] time.sleep(0.1) # 读取共享空间之后,沉睡0.1秒,让所有进程都能运行起来 dic[‘num‘]=temp-1 lock.release()?if __name__ == ‘__main__‘: m=Manager() # 实例化manager dic=m.dict({‘num‘:10}) # 建立一个字典对象,这是子进程的共享空间 l=[] for i in range(10): p=Process(target=task,args=(dic,mutex)) l.append(p) p.start()? for p in l: p.join() # 子进程都结束后,再去查看字典 print(dic)输出结果:{‘num‘: 0}如果不加锁,CPU和内存足够快,输出结果可能为{‘num‘:9}
四、队列
一种数据容器,特点是先进先出。(栈是后进先出)
优点:是进程的共享空间,可以自动处理锁的问题(如上小节中如果锁处理不好,就会发生数据错乱)
即便在多进程下,也可以保证数据不会错乱,因为put和get默认阻塞。
注意点:
1、队列用来处理进程间的数据,且在内存中,所以数据量不应过大;
2、限制maxsize的值如果超过了内存就变得毫无意义。
from multiprocessing import Queue?q = Queue(1) # 实例化一个队列,最多可以存一个数据q.put("张三") # 将数据放进队列中print(q.get()) # 将数据从队列中拿出q.put("李四") # 当容器装满时,put默认会阻塞 。如果上一步中没有get,此时就会阻塞q.put("福布斯",False) # False表示不会阻塞 无论容器是满了 都会强行塞 如果满了就抛异常?print(q.get())print(q.get()) # 当容器中数据时,get默认阻塞 print(q.get(timeout=3)) # timeout 仅用于阻塞时,此处表示会等待3秒,三秒后如果还取不到数据,就会报错queue.Empty?print("over") # 因为阻塞,此句不会打印
def put(self, obj, block=True, timeout=None): # block表示阻塞,timeout表示等待时间。 pass?def get(self, block=True, timeout=None): pass
五、生产者消费者模型
生产者:数据的生产者
消费者:处理数据者
生产者消费者模型三要素:生产者、消费者、队列
应用场景:程序中出现明显的两类,一类负责生产数据,另一类负责处理数据时。
该模型优点:
1、实现了数据生产者与数据消费者之间的解耦合
2、平衡了生产力和消费力,即生产者只关心生产即可,消费者只关心处理数据即可,二者通过队列沟通。
import randomfrom multiprocessing import Process,Queueimport time# 爬数据def get_data(q):? for num in range(5): print("正在爬取第%s个数据" % num) time.sleep(random.randint(1,2)) print("第%s个数据 爬取完成" % num) # 把数据装到队列中 q.put("第%s个数据" % num)??def parse_data(q): # 生产者只关心生产即可,队列数最大为5,如果队列满了就进入阻塞状态。 for num in range(5): # 取出数据 data = q.get() print("正在解析%s" % data) time.sleep(random.randint(1, 2)) print("%s 解析完成" % data)?if __name__ == ‘__main__‘: # 共享数据容器 q = Queue(5) #生产者进程 produce = Process(target=get_data,args=(q,)) produce.start() #消费者进程 customer = Process(target=parse_data,args=(q,)) customer.start()
原文地址:https://www.cnblogs.com/realadmin/p/10197747.html