python中的Queue与多进程(multiprocessing)

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

一、先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put(‘yang’)

q.put(4)

q.put([‘yan’,’xing’])

在队列中取值get()

默认的队列是先进先出的

>>> q.get()
‘yang‘
>>> q.get()
4
>>> q.get()
[‘yan‘, ‘xing‘]
>>>

 

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小 
Queue.empty() 如果队列为空,返回True,反之False 
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间 
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间 
Queue.put_nowait(item) 相当Queue.put(item, False)

 

二、multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print ‘Run child process %s (%s)...‘ % (name, os.getpid())

if __name__==‘__main__‘:
    print ‘Parent process %s.‘ % os.getpid()
    p = Process(target=run_proc, args=(‘test‘,))
    print ‘Process will start.‘
    p.start()
    p.join()
    print ‘Process end.‘

三、在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool

 

from multiprocessing import Pool
import os, time

def long_time_task(name):
    print ‘Run task %s (%s)...‘ % (name, os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print ‘Task %s runs %0.2f seconds.‘ % (name, (end - start))

if __name__==‘__main__‘:
    print ‘Parent process %s.‘ % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print ‘Waiting for all subprocesses done...‘
    p.close()
    p.join()
    print ‘All subprocesses done.‘

pool创建子进程的方法与Process不同,是通过

p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

三、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in [‘A‘, ‘B‘, ‘C‘]:
        print ‘Put %s to queue...‘ % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print ‘Get %s from queue.‘ % value
            time.sleep(random.random())
        else:
            break

if __name__==‘__main__‘:
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 等待pw结束:
    pw.join()
    # 启动子进程pr,读取:
    pr.start()
    pr.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    print
    print ‘所有数据都写入并且读完‘

四、关于上面代码的几个有趣的问题

if __name__==‘__main__‘:
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()

    print
    print ‘所有数据都写入并且读完‘ 

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__==‘__main__‘:
    manager = multiprocessing.Manager()
    # 父进程创建Queue,并传给各个子进程:
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    time.sleep(0.5)
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()

    print
    print ‘所有数据都写入并且读完‘

 

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 写数据进程执行的代码:
def write(q,lock):
    lock.acquire() #加上锁
    for value in [‘A‘, ‘B‘, ‘C‘]:
        print ‘Put %s to queue...‘ % value
        q.put(value)
    lock.release() #释放锁  

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(False)
            print ‘Get %s from queue.‘ % value
            time.sleep(random.random())
        else:
            break

if __name__==‘__main__‘:
    manager = multiprocessing.Manager()
    # 父进程创建Queue,并传给各个子进程:
    q = manager.Queue()
    lock = manager.Lock() #初始化一把锁
    p = Pool()
    pw = p.apply_async(write,args=(q,lock))
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()

    print
    print ‘所有数据都写入并且读完‘

参考文章:

http://blog.csdn.net/yatere/article/details/6668006

http://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/0013868323401155ceb3db1e2044f80b974b469eb06cb43000

python中的Queue与多进程(multiprocessing)

时间: 2024-10-15 13:49:03

python中的Queue与多进程(multiprocessing)的相关文章

练习--python中的Queue与多进程(multiprocessing)

按官方说法: This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS! Use modern alternatives like the multiprocessing module in the standard library or even an asynchroneous app

Python的多线程threading和多进程multiprocessing

python中的多线程就是在一个进程中存在着多个线程,在线程中,所有的线程都是共享资源的,线程之间的数据通信很简单.但是python仅支持一个线程的运行,因为python中存在一个全局解释器锁GIL(global interpreter lock),正是这个锁能保证同一时刻只有一个线程在运行,所以多线程依旧像是单线程的运行. GIL无疑就是一把对多线程有影响的全局锁,解决它对多线程的影响,不单单是释放GIL这么简单.GIL使得对象模型都是可以并发访问.GIL全局解释器锁解决多线程之间数据完整性和

python(32):多进程 multiprocessing

python 多线程:多线程 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心. Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,可以如下: import multiprocessing import time def func(msg): for i in xrange(3): pri

python中的多线程和多进程编程

注意:多线程和多线程编程是不同的!!! 第一点:一个进程相当于一个要执行的程序,它会开启一个主线程,多线程的话就会再开启多个子线程:而多进程的话就是一个进程同时在多个核上进行: 第二点:多线程是一种并发操作(伪并行),它相当于把CPU的时间片分成一段一段很小的片段,然后分给各个线程交替进行,由于每个片段都很短,所以看上去像平行操作: (1)多线程操作案例: import threading class MyThread(threading.Thread): def __init__(self ,

python 中的queue, deque

创建双向队列 import collections d = collections.deque() append(往右边添加一个元素) import collections d = collections.deque() d.append(1) d.append(2) print(d) #输出:deque([1, 2]) appendleft(往左边添加一个元素) import collections d = collections.deque() d.append(1) d.appendlef

Python多进程multiprocessing(二)

紧接上文 在上文Python多进程multiprocessing(一)中我们介绍了多进程multiprocessing的部分基础操作,在本文中,我们将继续介绍关于多进程的一些知识,比如进程池Pool这个有用的东东.马上开始吧! 使用实例 实例1 import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool(processes=2) res = pool.map(job,range(10))

Python中的GIL锁

在Python中,可以通过多进程.多线程和多协程来实现多任务. 在多线程的实现过程中,为了避免出现资源竞争问题,可以使用互斥锁来使线程同步(按顺序)执行. 但是,其实Python的CPython(C语言实现的)解释器上有一把GIL锁,也就是说Python的程序是处于一个解释器锁的环境中的. 一.GIL介绍 GIL (Global Interperter Lock) 称作全局解释器锁. GIL并不是Python语言的特性,它是在实现Python解释器时引用的一个概念.GIL只在CPython解释器

关于进程、线程、协程在python中的使用问题

描述 最近在python中开发一个人工智能调度平台,因为计算侧使用python+tensorflow,调度侧为了语言的异构安全性,也选择了python,就涉及到了一个调度并发性能问题,因为业务需要,需要能达到1000+个qps的业务量需求,对python调度服务的性能有很大挑战.具体的架构如下面所示: 补充:架构中使用的python为cpython,解释执行的语言,并非jpython或者pypython,cpython的社区环境比较活跃,很多开发包都是现在cpython下实现的,比如项目中计算模

python多进程multiprocessing模块中Queue的妙用

最近的部门RPA项目中,小爬为了提升爬虫性能,使用了Python中的多进程(multiprocessing)技术,里面需要用到进程锁Lock,用到进程池Pool,同时利用map方法一次构造多个process.Multiprocessing的使用确实能显著提升爬虫速度,不过程序交由用户使用时,缺乏一个好的GUI窗口来显示爬虫进度.之前的文章中使用了Chrome浏览器来渲染js脚本生成了进度条.但是鉴于Chrome在运行时十分吃内存资源,用Chrome只是生成一个进度条难免有些“大材小用”,所以,小