(一)进程锁
抢票的例子:
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {‘count‘: 1} # 仅剩最后一张票 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as f: json.dump(count, f) # 返回剩余票数 def search(): dic = json.load(open(‘db.txt‘)) print(‘剩余票数%s‘ % dic[‘count‘]) return dic def get_ticket(dic): time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open(‘db.txt‘, ‘w‘)) print(‘购票成功,剩余:{}‘.format(dic[‘count‘])) else: print(‘抢票失败,去邀请好友助力!‘) def ticket_purchase(lock, i): print(‘第{}个用户‘.format(i)) # lock.acquire() get_ticket(search()) # lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): # 模拟并发10个客户端抢票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
结果:
第6个用户 剩余票数1 第4个用户 剩余票数1 第7个用户 剩余票数1 第1个用户 剩余票数1 第10个用户 剩余票数1 第3个用户 剩余票数1 第5个用户 剩余票数1 第8个用户 剩余票数1 第2个用户 剩余票数1 第9个用户 剩余票数1 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0 购票成功,剩余:0
十个用户会同时把票抢走,因为每次search同一时间能查到只有一个票
multipleprocessing.Lock
- 非递归的锁定对象,非常类似threading.Lock.一旦进程或线程获得了锁,后续尝试从任何进程或线程获取它,将被阻塞直到被释放; 任何进程或线程都可以释放它。
- Lock支持上下文管理协议,可以在with中使用。
acquire(block=True, timeout=None)
- 获取一个锁,阻塞(block=True)或不阻塞(block=False)
- 当block设置为True的时候(默认设为True),如果锁处于锁定状态,调用该方法会阻塞,直到锁被释放;然后将锁设置为锁定状态,并返回True。
- 当block设置为False的时候,调用该方法不会阻塞。如果锁处于锁定状态,则返回False,否则将锁设置为锁定状态,并返回True。
- 当timeout为正数时,只要无法获取锁,最多阻塞超时指定的秒数。超时值为负值相当于超时值为零。超时值为“None”(默认值)的调用将超时时间设置为无限。请注意,超时的负值或无值的处理方式与threading.lock.acquire()中实现的行为不同。如果block参数设置为false,超时参数没有实际意义,因此会忽略timeout参数,。如果获取了锁,则返回true;如果过了超时时间,则返回false。
release()
- 解锁,可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。
- 大部分行为和threading.lock.release()相同,但在未锁定状态时调用引发ValueError 【后者引发RuntimeError】
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
如果抢票步骤没有加锁,那么可能会有几个人同时把票抢走,因为每次search都能查到有一个票,加了锁以后只能一个一个抢
加锁:
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {‘count‘: 1} # 仅剩最后一张票 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as f: json.dump(count, f) # 返回剩余票数 def search(): dic = json.load(open(‘db.txt‘)) print(‘剩余票数%s‘ % dic[‘count‘]) return dic def get_ticket(dic): time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open(‘db.txt‘, ‘w‘)) print(‘购票成功,剩余:{}‘.format(dic[‘count‘])) else: print(‘抢票失败,去邀请好友助力!‘) def ticket_purchase(lock, i): print(‘第{}个用户‘.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): # 模拟并发10个客户端抢票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
结果:
第2个用户 剩余票数1 第1个用户 第9个用户 第10个用户 第5个用户 第7个用户 第8个用户 第3个用户 第6个用户 第4个用户 购票成功,剩余:0 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力!
从结果可以看出,并不是手速最快的才能抢到
multiprocessing.RLock
- 递归的锁对象,必须由获取它的进程或线程释放递归锁。一旦进程或线程获得了递归锁定,相同的进程或线程就可以再次获取它而不会阻塞; 该进程或线程获取和释放锁的次数必须相等。
RLock
支持上下文管理器协议,因此可以在with
语句中使用。
acquire(block=True, timeout=None)
- 获取一个锁,阻塞(block=True)或不阻塞(block=False)
- 当在block参数设置为True的情况下调用时,除非该锁已由当前进程或线程拥有,否则将一直阻塞到该锁处于未锁定状态(此时它不属于任何进程或线程)。然后,当前进程或线程取得锁的所有权(如果它还没有所有权),锁内的递归级别将增加一,返回true。
- 当block设置为False时,调用时不阻塞。如果锁已经被另一个进程或线程获取,则当前进程或线程不占用所有权,并且锁中的递归级别不会更改,返回值为False; 如果锁处于解锁状态,则当前进程或线程将获得所有权,递归级别将递增,返回True.
release()
- 释放锁,递减递归级别。如果在递减递归级别为零之后,将锁重置为解锁状态(不由任何进程或线程拥有),并且如果任何其他进程或线程正在阻塞,等待解锁,仅允许其中一个继续执行。如果在递减之后递归级别仍然非零,则锁保持锁定并仍由调用进程或线程拥有。
- 只有在调用进程或线程拥有锁时才调用此方法。如果此方法由所有者以外的进程或线程调用,或者锁处于未锁定(无主)状态,则引发
AssertionError
。请注意,在这种情况下引发的异常类型与threading.rlock.release()中实现的行为不同 【后者引发RuntimeError
】。
若你的线程处理中会有一些比较复杂的代码逻辑过程,比如很多层的函数调用,而这些函数其实都需要进行加锁保护数据访问。这样就可能会反复的多次加锁,因而用RLock就可以进行多次加锁,解锁,直到最终锁被释放,而如果用普通的lock,当你一个函数A已经加锁,它内部调用另一个函数B,如果B内部也会对同一个锁加锁,那么这种情况就也会导致死锁。
(二)信号量
multiprocessing.Semaphore
- 信号量对象:近似的类比
threading.Semaphore
此类实现信号量对象。信号量管理表示release()调用数减去acquire()调用数再加上原子计数器的初始值。当计数器为0时,acquire()方法将一直阻塞,直到它可以返回而不使计数器为负为止。如果未给定,则值默认为1。
- 可选参数提供内部计数器的初始值;默认值为1。如果给定的值小于0,则会引发ValueError。
- 其
acquire
方法的第一个参数被命名为block,和multiprocessing.Lock.acquire() 一致
acquire(block=True,timeout=None)
- 获取信号量。
- 在不带参数的情况下调用:
- 如果内部计数器大于零,则将其递减1并立即返回true。
- 如果内部计数器为零,则阻塞直到调用
release()唤醒
。一旦唤醒(计数器大于0),将计数器递减1并返回true。每次调用release()都会唤醒一个进程。不应依赖进程被唤醒的顺序。
- 当block设置为false 时,不阻塞。如果此时调用没有参数acquire()会阻塞(即此时计数器为0),则立即返回false; 否则,执行与不带参数调用时相同的操作,并返回true。
- 当timeout是None以外的值,它将最多阻止timeout秒。如果在该时间间隔内(计数器一直为0),则返回false。否则返回true。
release()
- 释放信号量,将内部计数器递增1。当计数器为零并且另一个进程正在等待它再次大于零时唤醒该进程。
【acquire()和release()不一定一对一,是否阻塞要取决于计数器的值】
# -*- coding:utf-8 -*- from multiprocessing import Semaphore, Process import time import random def enter_room(smp, i): if smp.acquire(block=True, timeout=random.randint(1, 3)): # 超时还未获取,返回false,反之返回True print(‘用户%d进入了房间‘ % i) time.sleep(1) smp.release() print(‘用户%d离开了房间‘ % i) else: print(‘等太久,走人‘) if __name__ == ‘__main__‘: smp = Semaphore(2) for i in range(10): p = Process(target=enter_room, args=(smp, i)) p.start()
结果:
用户5进入了房间 用户8进入了房间 用户5离开了房间 用户0进入了房间 用户8离开了房间用户9进入了房间 等太久,走人 用户0离开了房间 用户2进入了房间 用户9离开了房间 用户4进入了房间 等太久,走人 等太久,走人 用户2离开了房间用户6进入了房间 用户4离开了房间 用户6离开了房间
(三)事件
multiprocessing.Event
- 克隆
threading.Event
- 这是进程之间通信的最简单机制之一:一个进程发出事件信号,其他进程等待它。
- 事件对象管理一个内部flag,该标志可以使用
set()
方法设置为true,并使用clear()方法重置为false 。flag = False,wait()
方法将阻塞,直到该flag为True。flag初始值是Flase。
is_set()
- 当且仅当内部标志为真时返回true。
set()
- 将内部标志设置为true。一旦标志为真,调用wait()的线程将不阻塞。
clear()
- 将内部标志重置为false。随后,调用 wait的进程
将阻塞,直到set()被调用以再次将内部标志设置为true。
wait(timeout=None)
- 阻塞直到内部flag为True: 如果调用时flag就为True,则立即返回, 否则,直到另一个进程调用set()将内部flag设置为True,或者阻塞超时。
- 当timeout不为None时,它指定超时的时间(单位:秒)
- 除非给出超时参数并且阻塞超时返回False,其它情况皆为True
红绿灯:
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time # 红绿灯 def light(e): while 1: if e.is_set(): # 为True,flag为True print(‘红灯‘) e.clear() # 重置为False,调用wait()的进程阻塞 time.sleep(5) else: print(‘绿灯‘) e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先获取锁,确认下一辆通行的车 e.wait() # 红灯停,绿灯行 print(‘奔驰{}以两秒的时间飘过‘.format(i)) time.sleep(2) l.release() if __name__ == ‘__main__‘: e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5辆车 p = Process(target=car, args=(e, i, l)) p.start()
结果:
绿灯 奔驰4以两秒的时间飘过 奔驰2以两秒的时间飘过 奔驰1以两秒的时间飘过 红灯 绿灯 奔驰3以两秒的时间飘过 奔驰0以两秒的时间飘过 奔驰4以两秒的时间飘过 红灯
闯红灯的例子:
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time import random # 红绿灯 def light(e): while 1: if e.is_set(): # 为True,flag为True print(‘红灯‘) e.clear() # 重置为False,调用wait()的进程阻塞 time.sleep(5) else: print(‘绿灯‘) e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先获取锁,确认下一辆通行的车,如果没有锁那么就同时过红绿灯 if e.wait(random.randint(0, 3)): # 红灯停,绿灯行 print(‘奔驰{}以两秒的时间飘过‘.format(i)) else: print(‘奔驰{}闯红灯以两秒的时间飘过‘.format(i)) time.sleep(2) l.release() if __name__ == ‘__main__‘: e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5辆车 p = Process(target=car, args=(e, i, l)) p.start()
结果:
绿灯 奔驰4以两秒的时间飘过 奔驰2以两秒的时间飘过 奔驰1以两秒的时间飘过 红灯 奔驰3闯红灯以两秒的时间飘过 绿灯 奔驰0以两秒的时间飘过 奔驰4以两秒的时间飘过 红灯 奔驰2闯红灯以两秒的时间飘过 奔驰1闯红灯以两秒的时间飘过
(四)管道
multipleprocessing.Pipe([duplex])
- 返回一对的 代表的配管的端部的对象。
(conn1, conn2)
Connection.
- 如果duplex是
True(
默认值),则管道是双向的。如果duplex是False,
管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息 - multipleprocessing.connection.Connection
send(obj)
- 将对象发送到应该使用的连接的另一端
recv()
。 - 该对象必须是可选择的。非常大的pickle可能引发
ValueError
异常。(大约32 MiB +,虽然它取决于操作系统)
recv()
- 使用返回从连接另一端发送的对象
send()
。阻塞直到有东西要收到。如果没有什么留下来接收,而另一端被关闭。抛出EOFError
fileno()
- 返回连接使用的文件描述符或句柄。
close()
- 关闭连接。
- 当连接被垃圾收集时,会自动调用此方法。
poll([timeout])
- 查询是否有可供读取的数据。
- 未指定timeout,立即返回,如果timeout是一个数字,则阻塞timeout时间(单位:秒),如果是None,一直阻塞。
【若另一端已关闭,则触发BrokenPipeError异常】
send_bytes(buffer[, offset[, size]])
- 从类似bytes的对象中发送字节数据作为完整的消息。
- 如果给出offset,则从buffer中的该位置读取数据。如果给出size,则将从缓冲区中读取多个字节。非常大的缓冲区可能会引发
ValueError
异常(大约32 MiB +,取决于操作系统)
recv_bytes([maxlength])
- 返回从连接另一端发送的字节数据的完整消息。直到有数据要接收。如果没有要接收的内容,并且另一端已关闭,则引发EOFError
recv_bytes_into(buffer[, offset])
- 将连接另一端发送的字节数据的完整消息读入缓冲区,并返回消息中的字节数。直到有数据要接收。如果没有要接收的内容并且另一端已关闭,则引发EOFError。
- 缓冲区必须是与可写的和bytes类似的对象。如果给出了偏移量,那么消息将从该位置写入缓冲区。偏移量必须是小于缓冲区长度的非负整数(以字节为单位)。
- 如果缓冲区太短,则会引发BufferToSort异常,发送的完整消息可从e.args[0]获取,其中e是异常实例。
生产消费者模型:
传输字符串数据:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:436 生产者1生产第1包子 消费者1消费了一个包子 生产者1生产第2包子 消费者1消费了一个包子 生产者1生产第3包子 消费者1消费了一个包子
多个消费者:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c1 = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 c2 = Process(target=consume, args=(left, ‘消费者2‘)) # 消费 c3 = Process(target=consume, args=(left, ‘消费者3‘)) # 消费 p.start() c1.start() c2.start() c3.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:432 生产者1生产第1包子 消费者2消费了一个包子 生产者1生产第2包子 消费者2消费了一个包子 生产者1生产第3包子 消费者3消费了一个包子
请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
传输字节:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send_bytes(‘包子‘.encode()) time.sleep(1) right.send_bytes(‘包子包子包子‘.encode()) right.close() def consume(left, name): while 1: try: byte_content = bytearray(10) bytes_size = left.recv_bytes_into(byte_content) print(‘{}消费了一个{}‘.format(name, byte_content.decode())) print(‘接收了{}个数据‘.format(bytes_size)) except EOFError: # 关闭另一端,由recv触发此异常 left.close() break except BufferTooShort as e: print(‘数据太长,完整数据为:{}‘.format(e.args[0].decode())) if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:476 生产者1生产第1包子 消费者1消费了一个包子 接收了6个数据 生产者1生产第2包子 消费者1消费了一个包子 接收了6个数据 生产者1生产第3包子 消费者1消费了一个包子 接收了6个数据 数据太长,完整数据为:包子包子包子
奇怪的poll(),分析下面两个代码结果:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) time.sleep(3) right.close() print(‘right已关闭‘) def consume(left, name): while 1: try: print(‘poll阻塞‘) print(‘是否有可供读取的数据:{}‘.format(left.poll(None))) goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 已关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:544 poll阻塞 生产者1生产第1包子 生产者1生产第2包子 生产者1生产第3包子 是否有可供读取的数据:True 消费者1消费了一个包子 poll阻塞 是否有可供读取的数据:True 消费者1消费了一个包子 poll阻塞 是否有可供读取的数据:True 消费者1消费了一个包子 poll阻塞 right已关闭 是否有可供读取的数据:True
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print(‘{}生产第{}包子‘.format(name, i + 1)) right.send(‘包子‘) # time.sleep(3) right.close() print(‘right已关闭‘) def consume(left, name): while 1: try: print(‘poll阻塞‘) print(‘是否有可供读取的数据:{}‘.format(left.poll(None))) goods = left.recv() print(‘{}消费了一个{}‘.format(name, goods)) except EOFError: # 已关闭另一端,由recv触发此异常 left.close() break if __name__ == ‘__main__‘: left, right = Pipe() print(‘文件描述符:{}‘.format(left.fileno())) p = Process(target=produce, args=(right, ‘生产者1‘)) # 生产 c = Process(target=consume, args=(left, ‘消费者1‘)) # 消费 p.start() c.start() right.close() # 关闭多余的两端 left.close()
结果:
文件描述符:440 生产者1生产第1包子 生产者1生产第2包子 生产者1生产第3包子 right已关闭 poll阻塞 是否有可供读取的数据:True 消费者1消费了一个包子 poll阻塞 是否有可供读取的数据:True 消费者1消费了一个包子 poll阻塞 是否有可供读取的数据:True 消费者1消费了一个包子 poll阻塞 Process Process-2: Traceback (most recent call last): ...... BrokenPipeError: [WinError 109] 管道已结束。
第四次循环poll(None)的执行若先于管道的right端关闭代码right.close()的执行,poll(None)返回True,并以recv引发的异常结束。反之,poll(None)引发BrokenPipeError异常
(五)队列
队列是线程和进程安全的。
multiprocessing.Queue
- 返回使用管道和一些锁/信号量实现的进程共享队列。当一个进程第一次将一个项目放入队列时,就会启动一个feeder线程,它将对象从缓冲区传输到管道中。
- 通常的queue.empty和queue.full异常从标准库的queue模块引发,以发出信号超时。
- queue实现queue.queue的所有方法,但task_done()和join()除外。
qsize()
- 返回队列的大致大小。这个数字不可靠.
empty()
- 为空返回True,否则返回False。这个数字不可靠
full()
- 队列已满返回True,否则返回False。这个数字不可靠
put(obj[, block[, timeout]])
- 将obj放入队列。如果可选参数block为“True”(默认值),timeout为“None”(默认值),则根据需要阻塞,直到插槽可用。如果超时为正数,则最多会阻塞timeout秒数,如果在此时间内没有可用插槽,引发queue.Full异常。否则(block为False),如果空闲插槽立即可用,则将项目放入队列,否则引发queue.Full异常(在这种情况下,超时将被忽略)。
put_nowait()
- 顾名思义,等效于put(obj,False)
get([block,[, timeout])
- 从队列中移除并返回项目。如果可选参数block为True(默认值),timeout=None(默认值),则在item可用之前根据需要进行阻塞。如果超时为正数,则最多阻塞timeout秒,如果在该时间内没有可用项,则引发queue.Empty异常。否则(block为False),如果item立即可用,则返回该项,否则引发queue.Empty异常(在这种情况下忽略超时)。
get_nowait()
- 顾名思义,等效于get(False)
close()
- 指示当前进程不会将更多数据放入此队列。后台线程将所有缓冲数据刷新到管道后将退出。当队列被垃圾收集时,将自动调用此函数。
join_thread()
- join()后台线程。这只能在调用close()之后使用。它将一直阻塞,直到后台线程退出,以确保缓冲区中的所有数据都已刷新到管道中。
- 默认情况下,如果进程不是队列的创建者,那么在退出时,它将尝试加入队列的后台线程。进程可以调用cancel_join_thread()使join_thread()不做任何操作。
cancel_join_thread()
- 防止join_thread()阻塞。可防止后台线程在进程退出时自动join()——请参见join_thread()。
- 此方法的更好名称可能是allow_exit_without_flush()。但它很可能会导致排队的数据丢失,您几乎肯定不需要使用它。只有当您需要当前进程立即退出而不等待将排队的数据刷新到底层管道时,它才有存在的意义,而且您不关心丢失的数据。
队列进程安全
生产消费者模型,队列实现:
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, JoinableQueue import os def consumer(q): while True: print(‘消费者进程{}等吃‘.format(os.getpid())) res = q.get() if res is None: print(‘消费者进程{}结束‘.format(os.getpid(), res)) break # 收到结束信号则结束 else: print(‘消费者进程{}吃了{}‘.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print(‘生产者进程{}生产了 第{}个{}‘.format(os.getpid(), i + 1, food)) print(‘生产者进程{}生产完成‘.format(os.getpid())) if __name__ == ‘__main__‘: q = Queue() # 生产者 p1 = Process(target=producer, args=(‘包子‘, q)) p2 = Process(target=producer, args=(‘水果‘, q)) p3 = Process(target=producer, args=(‘米饭‘, q)) # 消费者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 开始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 有几个消费者就put几个None q.put(None) # 必须保证生产者全部生产完毕,才应该发送结束信号 q.put(None) q.put(None)
结果:
消费者进程12108等吃 消费者进程3648等吃 生产者进程19544生产了 第1个包子 生产者进程19544生产了 第2个包子 生产者进程19544生产完成 消费者进程12108吃了包子 消费者进程12108等吃 消费者进程3648吃了包子 消费者进程3648等吃 生产者进程828生产了 第1个米饭 消费者进程12108吃了米饭生产者进程828生产了 第2个米饭 消费者进程12108等吃 生产者进程828生产完成 消费者进程3648吃了米饭 消费者进程3648等吃 生产者进程20244生产了 第1个水果 消费者进程12108吃了水果生产者进程20244生产了 第2个水果 消费者进程12108等吃 生产者进程20244生产完成 消费者进程3648吃了水果 消费者进程3648等吃 消费者进程12108结束 消费者进程3648结束
由于消费者收到None才能结束,因此要注意两个问题,None必须在队列尾部,几个消费者,尾部就应该有几个None
multipleprocessing.JoinableQueue
task_done()
- 指示以前排队的任务已完成。由队列使用者使用。对于用于获取任务的每个get(),对task_done()的后续调用会告诉队列任务的处理已完成。
join()
- 阻塞,直到队列中的所有项目都被获取和处理。
- 每当将项目添加到队列时,未完成任务的计数就会增加。每当使用者调用task_done()以指示已检索到该项,并且对其进行的所有工作都已完成时,计数就会下降。当未完成任务的计数降至零时,join()将取消阻塞。
生产消费者模型,JoinableQueue实现
# -*- coding:utf-8 -*- from multiprocessing import Process,Queue, JoinableQueue import os def consumer(q): while 1: print(‘消费者进程{}等吃‘.format(os.getpid())) res = q.get() q.task_done() # Semaphore - 1 print(‘消费者进程{}吃了{}‘.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print(‘生产者进程{}生产了 第{}个{}‘.format(os.getpid(), i + 1, food)) print(‘生产者进程{}生产完成,等待消费者消费‘.format(os.getpid())) q.join() # 等待消费者进程 if __name__ == ‘__main__‘: q = JoinableQueue() # 生产者 p1 = Process(target=producer, args=(‘包子‘, q)) p2 = Process(target=producer, args=(‘水果‘, q)) p3 = Process(target=producer, args=(‘米饭‘, q)) # 消费者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 开始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()
结果:
消费者进程9952等吃 消费者进程3840等吃 生产者进程10980生产了 第1个包子 生产者进程10980生产了 第2个包子 生产者进程10980生产完成,等待消费者消费 消费者进程9952吃了包子 消费者进程9952等吃 消费者进程3840吃了包子 消费者进程3840等吃 生产者进程7452生产了 第1个水果 生产者进程18556生产了 第1个米饭 消费者进程9952吃了水果 消费者进程9952等吃 生产者进程7452生产了 第2个水果 生产者进程7452生产完成,等待消费者消费 生产者进程18556生产了 第2个米饭 生产者进程18556生产完成,等待消费者消费 消费者进程3840吃了米饭 消费者进程3840等吃 消费者进程9952吃了水果 消费者进程9952等吃 消费者进程3840吃了米饭 消费者进程3840等吃
其思路就是put之后,有个信号量计数器+1 ,每get一下调用一下taskdone,计数器就会-1。如果生产者很快生产完后,调用join,进程会等待,等到计数器为0的时候,所有调用join()的生产者会被唤醒。因此,生产者唤醒了-->意味着消费者已经消费完,消费者由于死循环还在等吃的(get阻塞)。设置消费者线程为守护线程,让主进程随着生产者进程的结束而结束,主进程 结束后,中止守护线程(消费者)
死锁:
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue def f(q): q.put(‘X‘ * 1000000) # q.cancel_join_thread() if __name__ == ‘__main__‘: queue = Queue() p = Process(target=f, args=(queue,)) p.start() print(‘join阻塞‘) p.join() # this deadlocks print(‘get阻塞‘) obj = queue.get() # q.cancel_join_thread()执行后,join()不阻塞,但是get()拿不到数据,数据丢失,导致阻塞 print(obj)
multiprocessing.Queue底层是基于Pipe构建的,但是数据传递时并不是直接写入Pipe,而是写入进程本地buffer,通过一个feeder线程写入底层Pipe,因此一次put数据很大的时候,会一直等待get()取出。没有get()就join该进程,会导致死锁
(六)进程池
你可以创建一个进程池,进程将使用Pool类执行提交给它的任务。
multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild[,context]]]]])
- 一个进程池对象,它控制可向其提交作业的工作进程池。它支持带超时和回调的异步结果,并具有并行映射实现。
- processes是要使用的工作进程数。如果processes为None,则使用os.cpu_count()返回的数字。
- 如果initializer设定项不是“None”,则每个工作进程在启动时都将调用initializer(*initargs) 。
- maxtasksperchild是工作进程在退出并用新的工作进程替换之前可以完成的任务数,以释放未使用的资源。 默认的maxtasksperchild是None,这意味着工作进程将与池一样长。
- context可用于指定用于启动工作进程的上下文。 通常使用函数multiprocessing.Pool()或上下文对象的Pool()方法创建池。 在两种情况下,上下文都是适当的。(如何指定上下文?怎么用?)
- 注意,pool对象的方法只能由创建pool的进程调用。
apply(fun[, args[, kwds]])
- 使用参数args和关键字参数kwds调用func。它会一直阻塞,直到结果准备就绪。apply_async() 更适合并行执行工作。此外,func只在池中的一个进程中执行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- apply()方法的一个变种,它返回一个结果对象o;o.get()获取func返回的结果。
- 如果指定了callback,则它应该接受单个参数,并且可调用。当结果就绪时,调用callback函数,如果调用失败,调用error_callback。
- 如果指定了error_callback,那么它应该是接受单个参数并且可调用。如果目标函数失败,则使用异常实例调用error_callback。
- 回调应该立即完成,否则处理结果的线程将被阻塞。
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def func(n): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n**2 if __name__ == ‘__main__‘: p = Pool(5) for i in range(10): # p.apply(func, (i,)) # 只在一个进程中执行,会阻塞主进程 p.apply_async(func, (i,)) # 适合并行,一下由五个进程处理五个任务,不阻塞主进程 print(‘主进程‘) p.close() p.join()
结果:
主进程 i=0, pid=6540 i=1, pid=1348 i=2, pid=17060 i=3, pid=7632 i=4, pid=7396 i=5, pid=6540 i=6, pid=1348 i=7, pid=7396 i=8, pid=7632 i=9, pid=17060
map(func, [iterable,[chunksize]])
- 相当于内置函数map,它仅支持可迭代的参数,他会阻塞,直到全部结果准备就绪。
- 此方法将iterable切换为多个块,并将其作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。
- 请注意,它可能会导致非常长的iterables的高内存使用率,考虑将imap() 或 imap_unordered() 与显式chunksize选项一起使用,以提高效率
map_async(func, iterable[, chunksize[,callback[,error_callback]]])
- map()方法的一个变种,它返回一个结果对象。
- 如果指定了回调,则它应该接受单个参数并且可调用。当结果就绪时,调用callback,如果调用失败,error_callback被应用。
- 如果指定了error_callback,那么它应该接受单个参数并可调用。如果目标函数调用失败,则使用异常实例调用error_callback。
- 回调应该立即完成,否则处理结果的线程将被阻塞。
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n if __name__ == ‘__main__‘: p = Pool(4) # result = p.map(fun, [(1, 2), (1, 2)], chunksize=1) # map阻塞主进程,结果出来后,再解除阻塞 result = p.map_async(fun, [(1, 2), (1, 2)], chunksize=1) # 异步,不阻塞主线程,任务还在子进程进行; print(‘主进程‘) # print(result) # map返回列表,可直接打印 print(result.get()) # map_async返回结果对象 p.close() p.join()
结果:
主进程 i=(1, 2), pid=2004 i=(1, 2), pid=5328 [(1, 2), (1, 2)]
imap(func, iterable[, chunksize])
- 惰性版本的map
- chunksize参数与map()方法使用的参数相同。 对于非常长的迭代,使用较大的chunksize值可以使作业比使用默认值1更快地完成。
- 此外,如果chunksize为1,则imap()方法返回的迭代器的next()方法具有可选的超时参数:如果在超时秒内无法返回结果,则next(timeout)将引发multiprocessing.TimeoutError。
imap_unorderable(func, iterable[,chunksize])
- 除了返回的结果是无序的(除非池中只有一个进程),其它跟imap一样
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n if __name__ == ‘__main__‘: p = Pool(4) # result = p.imap(fun, [(1, 2), (3, 4)], chunksize=1) # 异步 result = p.imap_unordered(fun, [(1, 2), (3, 4)], chunksize=1) # 异步,不阻塞主线程,任务还在子进程进行,结果无序; print(‘主进程‘) for i in result: # imap返回迭代器 print(i) p.close() p.join()
结果:
主进程 i=(1, 2), pid=17396 i=(3, 4), pid=12496 (1, 2) (3, 4)
startmap(func, iterable[, chunksize])
- 与map()类似,只是迭代的元素应该作为参数解包的迭代。因此,iterable=[(1,2),(3,4)]的结果是[func(1,2),func(3,4)]。
startmap_async(func, iterable[, chunksize[,callback[,error_callback]]])
- starmap()和map_async()的组合
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n, k): print(‘i={}, pid={}‘.format(n, os.getpid())) time.sleep(1) return n, k if __name__ == ‘__main__‘: p = Pool(4) # result = p.starmap(fun, [(1, 2), (3, 4)], chunksize=1) # 阻塞,直到全部结果处理完 result = p.starmap_async(fun, [(1, 2), (3, 4)], chunksize=1) # 异步,不阻塞主线程,任务还在子进程进行; print(‘主进程‘) # print(result) # starmap返回列表,直接打印 print(result.get()) p.close() p.join()
结果:
主进程 i=1, pid=14660 i=3, pid=10564 [(1, 2), (3, 4)]
close()
- 防止将任何其他任务提交到池中。 完成所有任务后,工作进程将退出。
terminate()
- 立即停止工作进程而不完成未完成的工作。 当池对象被垃圾收集时,将立即调用terminate()。
join()
- 等待工作进程退出。 必须在使用join()之前调用close()或terminate()。
池对象在3.3版本支持上下文管理协议
使用进程池实现抢票:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool, Manager import time import json count = {‘count‘: 1} # 仅剩最后一张票 with open(‘db.txt‘, ‘w‘, encoding=‘utf-8‘) as f: json.dump(count, f) # 返回剩余票数 def search(): dic = json.load(open(‘db.txt‘)) print(‘剩余票数%s‘ % dic[‘count‘]) return dic def get_ticket(dic): time.sleep(0.1) # 模拟读数据的网络延迟 if dic[‘count‘] > 0: dic[‘count‘] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open(‘db.txt‘, ‘w‘)) print(‘购票成功,剩余:{}‘.format(dic[‘count‘])) else: print(‘抢票失败,去邀请好友助力!‘) def ticket_purchase(lock, i): print(‘第{}个用户‘.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == ‘__main__‘: lock = Manager().Lock() # 要使用Manager().Lock() p = Pool(5) for i in range(10): # 模拟并发10个客户端抢票 p.apply_async(ticket_purchase, (lock, i + 1)) p.close() p.join()
结果:
第1个用户 剩余票数1 第2个用户 第3个用户 第4个用户 第5个用户 购票成功,剩余:0 剩余票数0 第6个用户 抢票失败,去邀请好友助力! 剩余票数0 第7个用户 抢票失败,去邀请好友助力! 剩余票数0 第8个用户 抢票失败,去邀请好友助力! 剩余票数0 第9个用户 抢票失败,去邀请好友助力! 剩余票数0 第10个用户 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力! 剩余票数0 抢票失败,去邀请好友助力!
maxtasksperchild和chunksize具体效果:
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import sys def func(x): print("pid: ", os.getpid(), " got: ", x) sys.stdout.flush() return [x, x+1] def got(r): print("got result: ", r) if __name__ == ‘__main__‘: pool = Pool(processes=1, maxtasksperchild=9) # 进程执行了九个任务就会退出,换新的进程执行 keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] result = pool.map_async(func, keys, chunksize=1, callback=got) # chunksize指定每chuncksize个元素为一个任务 # result = pool.map_async(func, keys, chunksize=2, callback=got) # chunksize为2说明此时只有五个任务,没有换新的进程执行 pool.close() pool.join()
结果:
pid: 8188 got: 1 pid: 8188 got: 2 pid: 8188 got: 3 pid: 8188 got: 4 pid: 8188 got: 5 pid: 8188 got: 6 pid: 8188 got: 7 pid: 8188 got: 8 pid: 8188 got: 9 pid: 10860 got: 10 got result: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10], [10, 11]]
参考:
原文地址:https://www.cnblogs.com/Magic-Dev/p/11432588.html