作为使用队列的另一种形式,还可以使用管道在进程回见执行消息传递。
Pipe( [ duplex])
在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1和conn2是表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,而conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。
Pipe()方法返回的Connection对象的实例c具有以下方法和属性。
c.close()
关闭连接。如果c被垃圾收集,将自动调用此方法。
c.fileno()
返回连接使用的证书文件描述符。
c.poll( [timeout] )
如果连接上的数据可用,返回True.timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达。
c.recv()
接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常。
c.recv_bytes( [maxlength] )
接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大数,将引发IOEError异常。
c.recv_bytes_into(buffer [,offset] )
接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区放置消息处的字节位移。返回值是接收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferToolShoot异常。
c.send(obj)
通过连接发送对象。obj是与序列化兼容的任意对象。
c.send_bytes(buffer [,offset [,size] ] ) )
通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。
可以通过与队列类似的方式使用管道。下面这个例子说明如何使用管道实现前面的生产者-使用者问题:
import multiprocessing
def consumer(pipe):
output_p,input_p=pipe
input_p.close() #关闭管道的输入端
while True:
try:
item=output_p.recv()
except EOFError:
break
#处理项目
print item #可替换有用的工作
#关闭
print "Consumer close"
#生产项目并将其放置到队列上,sequence是代表要处理项目的可迭代对象
def producer(sequence,input_p):
for item in sequence:
#将项目放置在队列上
input_p.send(item)
if __name__=="__main__":
(output_p,input_p)=multiprocessing.Pipe()
#启动使用者进程
cons_p=multiprocessing.Process(target=consumer,args=((output_p,input_p),))
cons_p.start()
#关闭生产者中的输出管道
output_p.close()
#生产项目
sequence=[1,2,3,4]
producer(sequence,input_p)
#关闭输入管道,表示完成
input_p.close()
#等待使用者进程关闭
cons_p.join()
应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。
管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:
import multiprocessing
def adder(pipe):
server_p,client_p=pipe
client_p.close()
while True:
try:
x,y=server_p.recv()
except EOFError:
break
result=x+y
server_p.send(result)
#关闭
print "server done"
if __name__=="__main__":
(server_p,client_p)=multiprocessing.Pipe()
#启动服务器进程
adder_p=multiprocessing.Process(target=adder,args=((server_p,client_p),))
adder_p.start()
#关闭客户端中的服务器管道
server_p.close()
#在服务器上提出一些请求
client_p.send((3,4))
print client_p.recv()
client_p.send(("hello","world"))
print client_p.recv()
#完成,关闭管道
client_p.close()
#等待消费者进程关闭
adder_p.join()
在这个例子中,adder()函数以服务器的形式运行,等待消息到达管道的端点。收到之后,它会执行一些处理并将结果发送回给管道。要记住,send()和recv()方法使用pickle模块对对象进行序列化。在本例中,服务器接收到原则(x,y)并将其作为输入,然后返回结果x+y。但对于使用远程调用的高级应用程序而言,应该使用下一博客描述的进程池。
版权声明:本文为博主原创文章,未经博主允许不得转载。