multiprocessing在python中的高级应用-IPC 之 Pipe

作为使用队列的另一种形式,还可以使用管道在进程回见执行消息传递。

Pipe( [ duplex])

在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1和conn2是表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,而conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。

Pipe()方法返回的Connection对象的实例c具有以下方法和属性。

c.close()

关闭连接。如果c被垃圾收集,将自动调用此方法。

c.fileno()

返回连接使用的证书文件描述符。

c.poll( [timeout] )

如果连接上的数据可用,返回True.timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达。

c.recv()

接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常。

c.recv_bytes( [maxlength] )

接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大数,将引发IOEError异常。

c.recv_bytes_into(buffer [,offset] )

接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区放置消息处的字节位移。返回值是接收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferToolShoot异常。

c.send(obj)

通过连接发送对象。obj是与序列化兼容的任意对象。

c.send_bytes(buffer [,offset [,size] ] ) )

通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。

可以通过与队列类似的方式使用管道。下面这个例子说明如何使用管道实现前面的生产者-使用者问题:

import multiprocessing

def consumer(pipe):
    output_p,input_p=pipe
    input_p.close() #关闭管道的输入端
    while True:
        try:
            item=output_p.recv()
        except EOFError:
            break
        #处理项目
        print item #可替换有用的工作
        #关闭
        print "Consumer close"

#生产项目并将其放置到队列上,sequence是代表要处理项目的可迭代对象
def producer(sequence,input_p):
    for item in sequence:
        #将项目放置在队列上
        input_p.send(item)
if __name__=="__main__":
    (output_p,input_p)=multiprocessing.Pipe()
    #启动使用者进程
    cons_p=multiprocessing.Process(target=consumer,args=((output_p,input_p),))
    cons_p.start()

    #关闭生产者中的输出管道
    output_p.close()
    #生产项目
    sequence=[1,2,3,4]
    producer(sequence,input_p)
    #关闭输入管道,表示完成
    input_p.close()
    #等待使用者进程关闭
    cons_p.join()

应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:

import multiprocessing

def adder(pipe):
    server_p,client_p=pipe
    client_p.close()
    while True:
        try:
            x,y=server_p.recv()
        except EOFError:
            break
        result=x+y
        server_p.send(result)
    #关闭
        print "server done"
if __name__=="__main__":
    (server_p,client_p)=multiprocessing.Pipe()
    #启动服务器进程
    adder_p=multiprocessing.Process(target=adder,args=((server_p,client_p),))
    adder_p.start()
    #关闭客户端中的服务器管道
    server_p.close()
    #在服务器上提出一些请求
    client_p.send((3,4))
    print client_p.recv()

    client_p.send(("hello","world"))
    print client_p.recv()
    #完成,关闭管道
    client_p.close()
    #等待消费者进程关闭
    adder_p.join()

在这个例子中,adder()函数以服务器的形式运行,等待消息到达管道的端点。收到之后,它会执行一些处理并将结果发送回给管道。要记住,send()和recv()方法使用pickle模块对对象进行序列化。在本例中,服务器接收到原则(x,y)并将其作为输入,然后返回结果x+y。但对于使用远程调用的高级应用程序而言,应该使用下一博客描述的进程池。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-08 00:17:02

multiprocessing在python中的高级应用-IPC 之 Pipe的相关文章

multiprocessing在python中的高级应用-IPC 之 Queue

multiprocessing模块支持进程间通信的两种主要形式:管道和队列.这两种方法都使用了消息传递实现的,但队列接口有意模仿线程程序中常见的队列用法. 有关Queue编程实例可以查看微博内容. Queue([maxsize]) 创建共享的进程队列.maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制.底层队列使用管道和锁定实现.另外,还需要运行支持线程以便队列中的数据传输到底层管道中. Queue的实例q具有以下方法: q.cancel_join_thread() 不会再进程退

multiprocessing在python中的高级应用-进程

本篇主要讲解multiprocessing中的重要模块-进程. Process([group [,target [,name [,args [,kwargs]]]]]) 这个类表示运行在一个子进程中的任务,应该使用关键字参数来指定构造函数中的参数.target是当前进程启动时执行的可调用对象,args是传递给target的位置参数的元组,而kwargs是传递给target的关键字参数的字典.如果省略args和kwargs参数,将不带参数调用target.name是为进程指定描述性名称额字符串.g

multiprocessing在python中的高级应用-共享数据与同步

通常,进程之间彼此是完全孤立的,唯一的通信方式是队列或管道.但可以使用两个对象来表示共享数据.其实,这些对象使用了共享内存(通过mmap模块)使访问多个进程成为可能. Value( typecode, arg1, - argN, lock ) 在共享内容中常见ctypes对象.typecode要么是包含array模块使用的相同类型代码(如'i','d'等)的字符串,要么是来自ctypes模块的类型对象(如ctypes.c_int.ctypes.c_double等).所有额外的位置参数arg1,

multiprocessing在python中的高级应用-进程池

下面的类可以创建进程池,可以吧各种数据处理任务都提交给进程池.进程池提供的功能有点类似于列表解析和功能性编程操作(如映射-规约)提供的功能. Pool( [ numprocess [, initializer [, initargs] ] ] ) 创建工作进程池. numprocess是要创建的进程数.如果省略此参数,将使用cpu_count()的值.[这里简单介绍一下: from multiprocessing import cpu_count print(cpu_count()) #获得电脑

Python中的高级数据结构(转)

add by zhj: Python中的高级数据结构 数据结构 数据结构的概念很好理解,就是用来将数据组织在一起的结构.换句话说,数据结构是用来存储一系列关联数据的东西.在Python中有四种内建的数据 结构,分别是List.Tuple.Dictionary以及Set.大部分的应用程序不需要其他类型的数据结构,但若是真需要也有很多高级数据结构可供 选择,例如Collection.Array.Heapq.Bisect.Weakref.Copy以及Pprint.本文将介绍这些数据结构的用法,看 看它

Python中的高级数据结构详解

这篇文章主要介绍了Python中的高级数据结构详解,本文讲解了Collection.Array.Heapq.Bisect.Weakref.Copy以及Pprint这些数据结构的用法,需要的朋友可以参考下 数据结构 数据结构的概念很好理解,就是用来将数据组织在一起的结构.换句话说,数据结构是用来存储一系列关联数据的东西.在Python中有四种内建的数据结构,分别是List.Tuple.Dictionary以及Set.大部分的应用程序不需要其他类型的数据结构,但若是真需要也有很多高级数据结构可供选择

Python中的高级数据结构

数据结构 数据结构的概念很好理解,就是用来将数据组织在一起的结构.换句话说,数据结构是用来存储一系列关联数据的东西.在Python中有四种内建的数据结构,分别是List.Tuple.Dictionary以及Set.大部分的应用程序不需要其他类型的数据结构,但若是真需要也有很多高级数据结构可供选择,例如Collection.Array.Heapq.Bisect.Weakref.Copy以及Pprint.本文将介绍这些数据结构的用法,看看它们是如何帮助我们的应用程序的. 关于四种内建数据结构的使用方

XPath在python中的高级应用

XPath在python的爬虫学习中,起着举足轻重的地位,对比正则表达式 re两者可以完成同样的工作,实现的功能也差不多,但XPath明显比re具有优势,在网页分析上使re退居二线. XPath介绍: 是什么? 全称为XML Path Language 一种小型的查询语言 说道XPath是门语言,不得不说它所具备的优点: 1) 可在XML中查找信息 2) 支持HTML的查找 3) 通过元素和属性进行导航 python开发使用XPath条件: 由于XPath属于lxml库模块,所以首先要安装库lx

python 中的高级函数sorted()

Python内置的 sorted()函数可对list进行排序: >>>sorted([36, 5, 12, 9, 21]) [5, 9, 12, 21, 36] 但 sorted()也是一个高阶函数,它可以接收一个比较函数来实现自定义排序,比较函数的定义是,传入两个待比较的元素 x, y,如果 x 应该排在 y 的前面,返回 -1,如果 x 应该排在 y 的后面,返回 1.如果 x 和 y 相等,返回 0. 因此,如果我们要实现倒序排序,只需要编写一个reversed_cmp函数: de