python多进程总结

阅读目录

  • 1. Process
  • 2. Lock
  • 3. Semaphore
  • 4. Event
  • 5. Queue
  • 6. Pipe
  • 7. Pool

序. multiprocessing
python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

1. Process

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),

  target表示调用对象,

  args表示调用对象的位置参数元组。

  kwargs表示调用对象的字典。

  name为别名。

  group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。

  其中,Process以start()启动某个进程。

属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。

  其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

例1.1:创建函数并将其作为单个进程

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

结果


1

2

3

4

5

6

7

8

p.pid: 8736

p.name: Process-1

p.is_alive: True

The time is Tue Apr 21 20:55:12 2015

The time is Tue Apr 21 20:55:15 2015

The time is Tue Apr 21 20:55:18 2015

The time is Tue Apr 21 20:55:21 2015

The time is Tue Apr 21 20:55:24 2015

例1.2:创建函数并将其作为多个进程

import multiprocessing
import time

def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")

def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")

def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

结果


1

2

3

4

5

6

7

8

9

10

11

The number of CPU is:4

child   p.name:Process-3    p.id7992

child   p.name:Process-2    p.id4204

child   p.name:Process-1    p.id6380

END!!!!!!!!!!!!!!!!!

worker_1

worker_3

worker_2

end worker_1

end worker_2

end worker_3

例1.3:将进程定义为类

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == ‘__main__‘:
    p = ClockProcess(3)
    p.start()      

:进程p调用start()时,自动调用run()

结果


1

2

3

4

5

the time is Tue Apr 21 20:31:30 2015

the time is Tue Apr 21 20:31:33 2015

the time is Tue Apr 21 20:31:36 2015

the time is Tue Apr 21 20:31:39 2015

the time is Tue Apr 21 20:31:42 2015

例1.4:daemon程序对比结果

#1.4-1 不加daemon属性

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("end!")

结果


1

2

3

end!

work start:Tue Apr 21 21:29:10 2015

work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print("end!")

结果


1

end!

:因子进程设置了daemon属性,主进程结束,它们就随着结束了。

#1.4-3 设置daemon执行完结束的方法

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

结果


1

2

3

work start:Tue Apr 21 22:16:32 2015

work end:Tue Apr 21 22:16:35 2015

end!

2. Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, ‘a+‘)
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, ‘a+‘)
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

结果(输出文件)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

3. Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

结果


1

2

3

4

5

6

7

8

9

10

11

12

13

14

Process-1acquire

Process-1release

Process-2acquire

Process-3acquire

Process-2release

Process-5acquire

Process-3release

Process-4acquire

Process-5release

Process-4release

4. Event

Event用来实现进程间同步通信。

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

结果


1

2

3

4

5

wait_for_event: starting

wait_for_event_timeout:starting

wait_for_event_timeout:e.is_set->False

main: event is set

wairt_for_event: e.is_set()->True

5. Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

import multiprocessing

def writer_proc(q):
    try:
        q.put(1, block = False)
    except:
        pass   

def reader_proc(q):
    try:
        print(q.get(block = False) )
    except:
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))
    reader.start()  

    reader.join()
    writer.join()

结果


1

1

6. Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print("send: %s" %(i))
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print("proc2 rev:", pipe.recv())
        time.sleep(1)

def proc3(pipe):
    while True:
        print("PROC3 rev:", pipe.recv())
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()

结果

7. Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

例7.1:使用进程池(非阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行结果


1

2

3

4

5

6

7

8

9

10

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0

msg: hello 1

msg: hello 2

end

msg: hello 3

end

end

end

Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

例7.2:使用进程池(阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print("Sub-process(es) done.")

一次执行的结果


1

2

3

4

5

6

7

8

9

10

msg: hello 0

end

msg: hello 1

end

msg: hello 2

end

msg: hello 3

end

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~

Sub-process(es) done.

  

例7.3:使用进程池,并关注结果

import multiprocessing
import time

def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print(Sub-process(es) done.")

一次执行结果


1

2

3

4

5

6

7

8

9

10

msg: hello 0

msg: hello 1

msg: hello 2

end

end

end

::: donehello 0

::: donehello 1

::: donehello 2

Sub-process(es) done.

例7.4:使用多个进程池

#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
    end = time.time()
    print(‘Task Lee, runs %0.2f seconds.‘ %(end - start))

def Marlon():
    print("\nRun task Marlon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print(‘Task Marlon runs %0.2f seconds.‘ %(end - start))

def Allen():
    print("\nRun task Allen-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print(‘Task Allen runs %0.2f seconds.‘ %(end - start))

def Frank():
    print("\nRun task Frank-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print(‘Task Frank runs %0.2f seconds.‘ %(end - start))

if __name__==‘__main__‘:
    function_list=  [Lee, Marlon, Allen, Frank]
    print("parent process %s" %(os.getpid()))

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中

    print(‘Waiting for all subprocesses done...‘)
    pool.close()
    pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
    print(‘All subprocesses done.‘)

一次执行结果


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

parent process 7704

Waiting for all subprocesses done...

Run task Lee-6948

Run task Marlon-2896

Run task Allen-7304

Run task Frank-3052

Task Lee, runs 1.59 seconds.

Task Marlon runs 8.48 seconds.

Task Frank runs 15.68 seconds.

Task Allen runs 18.08 seconds.

All subprocesses done.

时间: 2024-10-25 22:21:58

python多进程总结的相关文章

Python 多进程多线编程模板

一.Python 多进程多线程原理介绍 1. Python 全局解释器锁GIL a) Python的全局解释器锁GIL是互斥锁,能够防止本机多个线程一次执行Python字节码:由于CPython的内存管理在线程级别是不安全的(内存泄露),所以这个全局解释器锁是必须的.每个Python进程只能申请使用一个GIL锁,因此Python的多线程虽然是并发的但不能并行处理.Python的解释器每次只能执行一个线程,待GIL锁释放后再执行下一个线程,这样线程轮流被执行. b) Python2.x里,GIL的

Python 多进程实战 & 回调函数理解与实战

这篇博文主要讲下笔者在工作中Python多进程的实战运用和回调函数的理解和运用. 多进程实战 实战一.批量文件下载 从一个文件中按行读取 url ,根据 url 下载文件到指定位置,用多进程实现. #!/usr/local/python27/bin/python2.7 from multiprocessing import Process,Pool import os,time,random,sys import urllib # 文件下载函数 def filedown(url,file):  

Python多进程使用

[Python之旅]第六篇(六):Python多进程使用 香飘叶子 2016-05-10 10:57:50 浏览190 评论0 python 多进程 多进程通信 摘要:   关于进程与线程的对比,下面的解释非常好的说明了这两者的区别:     这里主要说明关于Python多进程的下面几点: 1 2 3 4 5 6 7 1.多进程的使用方法 2.进程间的通信之multiprocessing.Manager()使用 3.Python进程池 ... 关于进程与线程的对比,下面的解释非常好的说明了这两者

Python多进程并发(multiprocessing)用法实例详解

http://www.jb51.net/article/67116.htm 本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心.Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,

Python多进程(1)——subprocess与Popen()

Python多进程方面涉及的模块主要包括: subprocess:可以在当前程序中执行其他程序或命令: mmap:提供一种基于内存的进程间通信机制: multiprocessing:提供支持多处理器技术的多进程编程接口,并且接口的设计最大程度地保持了和threading模块的一致,便于理解和使用. 本文主要介绍 subprocess 模块及其提供的 Popen 类,以及如何使用该构造器在一个进程中创建新的子进程.此外,还会简要介绍 subprocess 模块提供的其他方法与属性,这些功能上虽然没

Python多进程相关的坑

Python的multiprocessing模块实现了多进程功能,但官方文档上只有一些比较简单的用法,主要是使用函数作为process的target,而如何在class中使用多进程并没有多讲解.google出两篇比较详细的文章,建议从它们入门: https://pymotw.com/2/multiprocessing/basics.html https://pymotw.com/2/multiprocessing/communication.html 下面记录一下自己这周在python多进程上碰

【Python之旅】第六篇(六):Python多进程使用

关于进程与线程的对比,下面的解释非常好的说明了这两者的区别: 这里主要说明关于Python多进程的下面几点: 1.多进程的使用方法 2.进程间的通信 3.Python进程池 (1)比较简单的例子 (2)多个进程多次并发的情况 (3)验证apply.async方法是非阻塞的 (4)验证apply.async中的get()方法是阻塞的 1.多进程的使用方法 直接给出下面程序代码及注释: from multiprocessing import Process    #从多进程模块中导入Process

最简单方法远程调试Python多进程子程序

Python 2.6新增的multiprocessing,即多进程,给子进程代码调试有点困难,比如python自带的pdb如果直接在子进程代码里面启动会抛出一堆异常,原因是子进程的stdin/out/err等文件都已关闭,pdb无法调用.据闻winpdb.Wing IDE的调试器能够支持这样的远程调试,但似乎过于重量级(好吧前者比后者要轻多了,但一样要wxPython的环境,再说pdb的灵活可靠它们难以比拟). 其实只需稍作改动即可用pdb继续调试子进程的代码,思路来自这个博客:子进程的stdi

python多进程的理解 multiprocessing Process join run

最近看了下多进程. 一种接近底层的实现方法是使用 os.fork()方法,fork出子进程.但是这样做事有局限性的.比如windows的os模块里面没有 fork() 方法. windows:.linux: 另外还有一个模块:subprocess.这个没整过,但从vamei的博客里看到说也同样有局限性. 所以直接说主角吧 --- multiprocessing模块. multiprocessing模块会在windows上时模拟出fork的效果,可以实现跨平台,所以大多数都使用multiproce

python多进程中使用pool

Python 多进程中使用pool,pool中指定每次运行几个进程,当其中一个进程结束完毕后,会加入新的进程 #!/usr/bin/env python #coding: utf-8 import multiprocessing import os,time,random def Lee(): print "Run task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID start=time.time() time.sleep(random