python 中的multiprocessing 模块

multiprocessing.Pipe([duplex])

返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.不同于os.open之处在于os.pipe()返回2个文件描述符(r, w),表示可读的和可写的

实例如下:

#!/usr/bin/python
#coding=utf-8
import os
from multiprocessing import Process, Pipe

def send(pipe):
    pipe.send([‘spam‘] + [42, ‘egg‘])
    pipe.close()

def talk(pipe):
    pipe.send(dict(name = ‘Bob‘, spam = 42))
    reply = pipe.recv()
    print(‘talker got:‘, reply)

if __name__ == ‘__main__‘:
    (con1, con2) = Pipe()
    sender = Process(target = send, name = ‘send‘, args = (con1, ))
    sender.start()
    print "con2 got: %s" % con2.recv()#从send收到消息
    con2.close()

    (parentEnd, childEnd) = Pipe()
    child = Process(target = talk, name = ‘talk‘, args = (childEnd,))
    child.start()
    print(‘parent got:‘, parentEnd.recv())
    parentEnd.send({x * 2 for x in ‘spam‘})
    child.join()
    print(‘parent exit‘)

输出如下:

con2 got: [‘spam‘, 42, ‘egg‘]
(‘parent got:‘, {‘name‘: ‘Bob‘, ‘spam‘: 42})
(‘talker got:‘, set([‘ss‘, ‘aa‘, ‘pp‘, ‘mm‘]))
parent exit

  

multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print ‘Run child process %s (%s)...‘ % (name, os.getpid())

if __name__==‘__main__‘:
    print ‘Parent process %s.‘ % os.getpid()
    p = Process(target=run_proc, args=(‘test‘,))
    print ‘Process will start.‘
    p.start()
    p.join()
    print ‘Process end.‘

  

在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool
from multiprocessing import Pool
import os, time

def long_time_task(name):
    print ‘Run task %s (%s)...‘ % (name, os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print ‘Task %s runs %0.2f seconds.‘ % (name, (end - start))

if __name__==‘__main__‘:
    print ‘Parent process %s.‘ % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print ‘Waiting for all subprocesses done...‘
    p.close()
    p.join()
    print ‘All subprocesses done.‘

  

pool创建子进程的方法与Process不同,是通过

p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始。

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in [‘A‘, ‘B‘, ‘C‘]:
        print ‘Put %s to queue...‘ % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print ‘Get %s from queue.‘ % value
            time.sleep(random.random())
        else:
            break

if __name__==‘__main__‘:
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 等待pw结束:
    pw.join()
    # 启动子进程pr,读取:
    pr.start()
    pr.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    print
    print ‘所有数据都写入并且读完‘

  

关于上面代码的几个有趣的问题

if __name__==‘__main__‘:
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()

    print ‘所有数据都写入并且读完‘

  

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__==‘__main__‘:
    manager = multiprocessing.Manager()
    # 父进程创建Queue,并传给各个子进程:
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    time.sleep(0.5)
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()

    print
    print ‘所有数据都写入并且读完‘

  

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 写数据进程执行的代码:
def write(q,lock):
    lock.acquire() #加上锁
    for value in [‘A‘, ‘B‘, ‘C‘]:
        print ‘Put %s to queue...‘ % value
        q.put(value)
    lock.release() #释放锁  

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(False)
            print ‘Get %s from queue.‘ % value
            time.sleep(random.random())
        else:
            break

if __name__==‘__main__‘:
    manager = multiprocessing.Manager()
    # 父进程创建Queue,并传给各个子进程:
    q = manager.Queue()
    lock = manager.Lock() #初始化一把锁
    p = Pool()
    pw = p.apply_async(write,args=(q,lock))
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()

    print
    print ‘所有数据都写入并且读完‘

  

原文地址:https://www.cnblogs.com/MY0213/p/8997576.html

时间: 2024-11-02 03:34:10

python 中的multiprocessing 模块的相关文章

Python之进程 - multiprocessing模块

? 我们已经了解了,运行中的程序就是一个进程.所有的进程都是通过它的父进程来创建的.因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程.多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快.以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块. ? 仔细说来,multiprocess不是一个模块而是python中一个操作.管理进程的包. 之所以叫multi是取自multipl

Python中的random模块,来自于Capricorn的实验室

Python中的random模块用于生成随机数.下面介绍一下random模块中最常用的几个函数. random.random random.random()用于生成一个0到1的随机符点数: 0 <= n < 1.0 random.uniform random.uniform的函数原型为:random.uniform(a, b),用于生成一个指定范围内的随机符点数,两个参数其中一个是上限,一个是下限.如果a > b,则生成的随机数n: a <= n <= b.如果 a <

python中查看可用模块

1.这种方式的问题是,只列出当前import进上下文的模块. 进入python命令行.输入以下代码: >>>import sys >>>sys.modules 2.在python命令行下输入: >>>help() help>modulespython中查看可用模块,布布扣,bubuko.com

python中动态导入模块

如果导入的模块不存在,Python解释器会报 ImportError 错误: >>> import something Traceback (most recent call last): File "<stdin>", line 1, in <module> ImportError: No module named something 有的时候,两个不同的模块提供了相同的功能,比如 StringIO 和 cStringIO 都提供了Strin

Python中的random模块

Python中的random模块 (转载自http://www.cnblogs.com/yd1227/archive/2011/03/18/1988015.html) Python中的random模块用于生成随机数.下面介绍一下random模块中最常用的几个函数. random.random random.random()用于生成一个0到1的随机符点数: 0 <= n < 1.0 random.uniform random.uniform的函数原型为:random.uniform(a, b),

解决linux系统下python中的matplotlib模块内的pyplot输出图片不能显示中文的问题

问题: 我在ubuntu14.04下用python中的matplotlib模块内的pyplot输出图片不能显示中文,怎么解决呢? 解决: 1.指定默认编码为UTF-8: 在python代码开头加入如下代码 import sys reload(sys) sys.setdefaultencoding('utf-8') 2.确认你ubuntu系统环境下拥有的中文字体文件: 在终端运行命令"fc-list :lang=zh",得到自己系统的中文字体 命令输出如下: /usr/share/fon

(转)Python中的random模块

Python中的random模块用于生成随机数.下面介绍一下random模块中最常用的几个函数. random.random random.random()用于生成一个0到1的随机符点数: 0 <= n < 1.0 random.uniform random.uniform的函数原型为:random.uniform(a, b),用于生成一个指定范围内的随机符点数,两个参数其中一个是上限,一个是下限.如果a > b,则生成的随机数n: a <= n <= b.如果 a <

转载:python中的copy模块(浅复制和深复制)

主要是介绍python中的copy模块. copy模块包括创建复合对象(包括列表.元组.字典和用户定义对象的实例)的深浅复制的函数. ########copy(x)########创建新的复合对象并通过引用复制x的成员来创建x的浅复制.更加深层次说,它复制了对象,但对于对象中的元素,依然使用引用.对于内置类型,此函数并不经常使用.而是使用诸如list(x), dict(x), set(x)等调用方式来创建x的浅复制,要知道像这样直接使用类型名显然比使用copy()快很多.但是它们达到的效果是一样

Python中的logging模块【转】

基本用法 下面的代码展示了logging最基本的用法. 1 # -*- coding: utf-8 -*- 2 3 import logging 4 import sys 5 6 # 获取logger实例,如果参数为空则返回root logger 7 logger = logging.getLogger("AppName") 8 9 # 指定logger输出格式 10 formatter = logging.Formatter('%(asctime)s %(levelname)-8s: