多进程multiprocess模块
multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
这个模块的语法结构与threading模块基本相似。
查找到一个多进程的博客连接:https://www.cnblogs.com/Mr-Murray/p/9026955.html
from multiprocessing import Process import time import os def info(): print("\033[35;1mThe time is %s\033[0m" % time.ctime()) print("The parent is %s; The Child process is %s" % (os.getppid(), os.getpid())) if __name__ == "__main__": for i in range(10): p = Process(target=info) p.start() p.join()
这段代码和之前用threading模块创建的多线程并没有任何的区别,但是如果在windows系统上执行时,必须加上if __name__ == "__main__":语句,原因如下:
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。 以实例化的方式产生进程:
import os, time from multiprocessing import Process class Myprocess(Process): def __init__(self): super(Myprocess, self).__init__() def run(self): print("\033[35;1mThe time is %s\033[0m" % time.ctime()) print("The parent is %s; The Child process is %s" % (os.getppid(), os.getpid())) if __name__ == "__main__": p = Myprocess() p.start()#在调用p.start的时候,会自动执行类中的run方法,run方法是必不可少的。和threading类中的Thread中的run方法一样。
进程中的守护进程和threading模块中的守护线程是同一个意思,主进程结束的时候,子进程也会随之结束。
import os, time from multiprocessing import Process def fun1(): print("starting %s".center(50,"-") % time.ctime()) time.sleep(3) print("Stopping %s".center(50,"-") % time.ctime()) if __name__ == "__main__": p = Process(target=fun1) p.daemon = True p.start() time.sleep(1) -------------------------------------------------------------------------- 子进程中是要执行3s的,但主进程中只执行了1s,设置了守护进程之后,主进程结束,不管子进程的状态,子进程都要退出
利用多进程实现简易的socket并发连接。
server端代码: import socket, time from multiprocessing import Process def handle(conn,addr): print("The connection is from %s at %s port" % addr) conn.send(b"Hello world") data = conn.recv(1024) conn.send(data) if __name__ == "__main__": HOST = "localhost" PORT = 51423 ADDR = (HOST, PORT) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(ADDR) s.listen(5) while True: conn, addr = s.accept() p = Process(target=handle, args=(conn,addr)) # 进程间的数据是不共享的,因此需要把coon作为参数传递 p.start() s.close() client端代码: import socket HOST = "localhost" PORT = 51423 ADDR = (HOST, PORT) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(ADDR) data = s.recv(1024) print(data.decode()) while True: info = input(">>>: ") s.send(info.encode()) data = s.recv(1024) print(data.decode())
注意:
进程之间的数据是彼此之间不能互相访问的,因此conn必须作为参数传递。
进程之间的通信该怎么做?
进程中的队列和管道:
from multiprocessing import Process, Queue from time import sleep import random # 利用进程写一个生成消费者模型, # 生成者向队列插入一个随机数,消费者取出队列中的一个数值 def productor(q): while True: sleep(0.3) print("store an num") q.put(random.randint(1,100)) def consumer(q): while True: sleep(0.2) print("Getting an num") q.get() if __name__ == "__main__": q = Queue() proc1 = Process(target=productor, args=(q,)) proc2 = Process(target=consumer, args=(q,)) proc1.start() proc2.start()
这只是一个简易模型,来说明队列在进程之间的通信。
这里的队列用法和线程中队列用法基本相同,只是一个用于进程通信,一个用于线程通信。
管道
管道的简单实用:
from multiprocessing import Process, Pipe def f(conn): conn.send("Hello world") conn.close() if __name__ == ‘__main__‘: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()
manager
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
.
一个简单实用的实例:
from multiprocessing import Process, Manager import random def f(list): list.append(random.randint(0,100)) print(list) if __name__ == "__main__": p_list = [] with Manager() as manager: l = manager.list() for i in range(10): p = Process(target=f,args=(l,)) p.start() p_list.append(p) for res in p_list: res.join()
每一个进程给列表中添加一个数据,执行结果如下:
[95]
[95, 25]
[95, 25, 31]
[95, 25, 31, 70]
[95, 25, 31, 70, 9]
[95, 25, 31, 70, 9, 17]
[95, 25, 31, 70, 9, 17, 96]
[95, 25, 31, 70, 9, 17, 96, 71]
[95, 25, 31, 70, 9, 17, 96, 71, 96]
[95, 25, 31, 70, 9, 17, 96, 71, 96, 3]
进程池:
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
from multiprocessing import Process, Pool import time def Foo(i): time.sleep(2) return i + 100 def Bar(arg): print(‘-->exec done:‘, arg) if __name__ == "__main__": pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) # pool.apply(func=Foo, args=(i,)) print(‘end‘) pool.close() pool.join()
注意:其中在调用apply_async时候使用了callback回调函数
进程池详解,看到的一片博文:https://www.cnblogs.com/qiangyuge/p/7455814.html
原文地址:https://www.cnblogs.com/wxzhe/p/9101890.html