什么是进程process:
运行中的程序就称为进程,我们写好的程序,如果不运行的话就是一堆普通的字符,没有任何意义。
并行与并发:
并发:伪并行,看起来是同时运行,实际是cpu+多道技术实现,cpu在多个任务间快速切换,给我们造成的感觉就好像是在同时运行。
并行:只有具备多个cpu才能实现真正意义上的并行。多个任务同时进行,但是最大运行数为cpu个数。
multiprocessing 模块:
python中的多线程无法利用cpu的多核优势(GIL锁约束一个进程内同一时间内,只能运行一个线程)。如果想要充分利用cpu的多核资源,就要使用多进程。python中的multiprocessing模块为我们提供了开启子进程的方法,并在子进程中执行我们定制的任务。
multiprocessing 模块提供Process,Queue ,Pipe,Lock等组件。
与线程不同,进程的初始化完全copy其父进程内存地址,进程间数据不共享,进程数据的修改仅限于该进程内部。
Process类
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): ‘‘‘参数介绍:group(这个应该是不用的)target:要调用的对象,即要执行的任务(函数)name:子进程的名字 args:表示要要调用对象的位置参数,kwargs:表示要调用对象的字典。‘‘‘ self.name = ‘‘#进程名 self.daemon = False#默认为False设置为True 表示为守护进程,在start()之前设置,随着主进程结束而结束。 self.authkey = None#进程的身份验证键,默认为os.urandom()随机生成的32为字符串 self.exitcode = None#进程运行时为None,如果为-N,表示信号N结束 self.ident = 0 self.pid = 0#进程的pid self.sentinel = None def run(self):#进程启动,自定义进程时必须实现 pass def start(self):#启动进程 pass def terminate(self):#强制终止进程,不推荐使用,容易造成僵尸进程和死锁 pass def join(self, timeout=None):#设置超时时间 pass def is_alive(self):#判断进程是否正在运行,正在运行返回True return False
Process类介绍
创建线程的两种方式:
import time import random from multiprocessing import Process def piao(name): print(‘%s piaoing‘ %name) time.sleep(random.randrange(1,5)) print(‘%s piao end‘ %name) p1=Process(target=piao,args=(‘egon‘,)) #必须加,号 p2=Process(target=piao,args=(‘alex‘,)) p3=Process(target=piao,args=(‘wupeqi‘,)) p4=Process(target=piao,args=(‘yuanhao‘,)) p1.start() p2.start() p3.start() p4.start() print(‘主线程‘)
方式一
import time import random from multiprocessing import Process class MyProcess(Process): def __init__(self,name,money): super().__init__() self.name = name self.money = money def run(self): print("%s is piaoing"%self.name) time.sleep(random.randint(1,3)) print("%s is piao end"%self.name) print("%s pay %s yuan for piao"%(self.name,self.money)) if __name__ == ‘__main__‘: p1= MyProcess("egon",200) p2= MyProcess("alex",None) p3= MyProcess("yuanhao",500) p1.start()#start 会自动调用run方法 p2.start() p3.start() print("主进程".center(30,"*"))
方式二
进程实现socket并发
###服务端 from socket import * from multiprocessing import Process s = socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(("192.168.20.45",8800)) s.listen(5) def talk(conn,addr): while True: try: msg = conn.recv(1024) if not msg:continue conn.send(msg.upper()) except Exception: break if __name__ == ‘__main__‘: while True: conn,addr = s.accept() p = Process(target = talk,args = (conn,addr)) p.start() ####客户端 from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(("192.168.20.45",8800)) while True: msg = input(">>>>:").strip() if not msg:continue c.send(msg.encode("utf-8")) data = c.recv(1024) print(data.decode("utf-8"))
socket 并发
Process 对象的join方法:
from multiprocessing import Process import time,random def piaochang(name,money): print("%s is piaoing"%name) time.sleep(random.randrange(5)) print("%s is piao end"%name) print("%s pay %s yuan for piaochang"%(name,money)) if __name__ == ‘__main__‘: start = time.time() p1 = Process(target=piaochang,args = (‘egon‘,200)) p2 = Process(target=piaochang,args = (‘alex‘,None)) p3 = Process(target=piaochang,args = (‘wupeiqi‘,500)) p_l = [p1,p2,p3] for p in p_l: p.start() # p.join()#如果将join放在这里,就是串行的结果 主线程 9.459348678588867 for p in p_l: p.join()#join的意思是等待所有上面的程序执行完后再往下执行代码 主线程 4.645843029022217 #join是让主进程等待子进程运行结束,卡住的是主进程 print("主线程",time.time()-start)
Process join 方法
Process 其它属性或方法:
from multiprocessing import Process,current_process import time,random class Piao(Process): def __init__(self,name,money): super().__init__() self.name = name self.money = money def run(self): print("%s is piaoing"%self.name) time.sleep(random.randrange(5)) print("%s is piao end,pay %s yuan"%(self.name,self.money)) if __name__ == ‘__main__‘: p = Piao("egon",200) p.start() print(p.is_alive())#查看进程p是否存活 print(p.pid)#子进程ip print(p.name)#查看进程名,这里为"egon",默认为Process1 p.terminate()#强制关闭子进程 time.sleep(0.1)#这里需要注意,关闭进程不会立马关闭,所以is_alive 有可能还存活,这里让它停小会 print(p.is_alive())#存活状态为False
守护进程:
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
from multiprocessing import Process import time,random def piao(name): print("%s is piaoing"%name) time.sleep(random.randint(1,3)) print("%s is piao done"%name) if __name__ == ‘__main__‘: p = Process(target=piao,args = ("egon",)) p.daemon = True p.start() print("主")#执行完这里意味着p这个守护进程已经结束,它后面的代码也将不会运行
守护进程
同步锁:
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,,竞争带来的结果就是错乱,如何控制,就是加锁处理
# #不加锁,并发,效率高,数据错乱 from multiprocessing import Process,Lock import time,os def work(): print("%s is running"%os.getpid()) time.sleep(1) print("%s is done"%os.getpid()) if __name__ == ‘__main__‘: for i in range(5): p = Process(target=work) p.start() # 加锁,变并发伪串行,效率低,数据安全 def work(mutex): mutex.acquire() print("%s is running "%os.getpid()) time.sleep(1) print(‘%s is done‘%os.getpid()) mutex.release() if __name__ == ‘__main__‘: mutex = Lock() for i in range(5): p = Process(target=work,args = (mutex,)) p.start()
同步锁
模拟抢票对同一文件操作:
#文件db的内容为:{"count":1} #注意一定要用双引号,不然json无法识别 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open(‘db.txt‘)) print(‘\033[43m剩余票数%s\033[0m‘ %dic[‘count‘]) def get(): dic=json.load(open(‘db.txt‘)) time.sleep(0.1) #模拟读数据的网络延迟 if dic[‘count‘] >0: dic[‘count‘]-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open(‘db.txt‘,‘w‘)) print(‘\033[43m购票成功\033[0m‘) def task(lock): search() get() if __name__ == ‘__main__‘: lock=Lock() for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task,args=(lock,)) p.start()
并发效率高,数据错乱
from multiprocessing import Process,Lock import time,json,os def search(): dic = json.load(open(‘db.txt‘)) print("\033[40m 剩余票数%s\033[0m"%dic["count"]) def get_ticket(): dic = json.load(open(‘db.txt‘)) time.sleep(0.5)#模拟网络延迟 if dic["count"]>0: dic["count"]-=1 time.sleep(0.2) json.dump(dic,open("db.txt","w")) print("\033[43m%s购票成功\033[0m"%os.getpid()) def task(mutex): search() mutex.acquire() get_ticket() mutex.release() if __name__ == ‘__main__‘: mutex = Lock() for i in range(100): p = Process(target=task,args = (mutex,)) p.start()
加锁,变并发为串行,效率低,数据安全
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低
2.需要自己加锁处理
为此mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
对列:
进程相互隔离,multiprocessing 模块为我们提供了两种进程间的通信方式:对列和管道
class Queue(object): def __init__(self, maxsize=-1): self._maxsize = maxsize #对列中允许的最大项数,省略则无限制 def qsize(self):#没什么卵用,返回队列中目前项目的正确数量 return 0 def empty(self):#判断对列是否为空 return False def full(self):#判断对列是否满了 return False def put(self, obj, block=True, timeout=None):#插入数据到对列 timeout设置超时时间 pass def put_nowait(self, obj):#同put pass def get(self, block=True, timeout=None):#从对列中获取数据 pass def get_nowait(self):#同get pass def close(self):#关闭对列,防止对列中加入更多的数据 pass def join_thread(self):#连接后台线程 pass def cancel_join_thread(self):#不会在进程退出时自动连接后台线程 pass
对列属性和方法
应用:
from multiprocessing import Process,Queue q = Queue(3) for i in range(3): q.put(i) print(q.full())#True for i in range(3): q.get(i) print(q.empty())#True
生产者消费者模型:
什么是生产者消费者模型:
生产者消费者模型是通过一个容器来解决生产者和消费者强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞对列来进行通信,所以生产者生产完数据后,直接扔给阻塞对列,消费者不直接找生产者要数据,而是直接从阻塞对列里取,阻塞对列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
为什么要使用生产者消费者模型:
在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理的速度很快,而消费者的处理速度很慢,那么生产者就得等待消费者处理处理完,才能继续生产数据。同样,如果消费者处理速度大于生产者,那么消费者就必须等待消费者。为了解决生产者和消费者这种强耦合的问题,就引入了这个模型。
from multiprocessing import Process,Queue import time,os,random #消费者 def consumer(q): while True: res = q.get() time.sleep(random.randint(1,2)) print("\033[45m %s 吃 %s\033[0m"%(os.getpid(),res)) #生产者 def producer(q): for i in range(10): time.sleep(random.randint(1,2)) res = "包子%s"%i q.put(res) print("\033[44m %s 生产了 %s\033[0m"%(os.getpid(),res)) if __name__ == ‘__main__‘: q = Queue() #生产者 p = Process(target=producer,args = (q,)) #消费者 c = Process(target= consumer,args=(q,)) #开始进程 p.start() c.start() print("主") ##此时的进程会陷入死循环中,因为生产者将数据生产完后,消费者 #在取到空之后,一直会循环q.get()这一步。 #要解决这个问题,就得生产者在生产完成后往对列中加一个结束的信号,这样消费者在接收到结束信号 #后就可以break出死循环。 from multiprocessing import Process,Queue import time,os,random #消费者 def consumer(q): while True: res = q.get() if res is None:break####收到结束后,结束循环 time.sleep(random.randint(1,2)) print("\033[45m %s 吃 %s\033[0m"%(os.getpid(),res)) #生产者 def producer(q): for i in range(10): time.sleep(random.randint(1,2)) res = "包子%s"%i q.put(res) print("\033[44m %s 生产了 %s\033[0m"%(os.getpid(),res)) q.put(None)####发送结束信号 if __name__ == ‘__main__‘: q = Queue() #生产者 p = Process(target=producer,args = (q,)) #消费者 c = Process(target= consumer,args=(q,)) #开始进程 p.start() c.start() # p.join()####结束信号不一定由生产者发送,主进程也可以发送 # p.put(None)####但是主进程需要确认生产者结束后才发送信号,这里用join方法确认 print("主") ####但是如果有多个生产者和消费者我们就得发送很多次的None,这样也太low了 ###don‘t worry multiprocessing 模块为我们提供了JoinableQueue方法,允许项目的使用者 ###通知生成者项目已经被成功处理。通知进程是使用共享信号和条件变量来实现的。 ###JoinableQueue除了和Queue相同的方法外还新增了q.task.done()使用者使用此方法发出信号。表示q.get() ###的返回项目已经被处理。 ###q.join()生产者调用此方法进行阻塞,知道对列中所有的项目被处理。阻塞持续到对列中的每个项目均调用q.task_done()方法为止 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res = q.get() time.sleep(random.randint(1,3)) print("\033[40m %s eat %s\033[0m"%(os.getpid(),res)) q.task_done() def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res = "%s%s"%(name,i) q.put(res) print("\033[44m %s 生产了 %s\033[0m"%(os.getpid(),res)) q.join() if __name__ == ‘__main__‘: q = JoinableQueue() p1 = Process(target=producer,args=("包子",q)) p2 = Process(target=producer,args=("骨头",q)) p3 = Process(target=producer,args=("泔水",q)) c1 = Process(target=consumer,args=(q,)) c2 = Process(target=consumer,args=(q,)) c3 = Process(target=consumer,args=(q,)) c1.daemon = True c2.daemon = True c3.daemon = True p_l = [p1,p2,p3,c1,c2,c3] for p in p_l: p.start() p1.join() p2.join() p3.join() print("主") #p1,p2,p3结束了,证明c1,c2,c3肯定全都收完了p1,p2,p3发到队列的数据 #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
生产者消费者模型
管道:
from multiprocessing import Process,Pipe def consumer(p,name): left,right = p left.close() while True: try: baozi = right.recv() print("%s 收到包子:%s"%(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right = p right.close() for i in seq: left.send(i) else: left.close() if __name__ == ‘__main__‘: left,right = Pipe() c1 = Process(target=consumer,args=((left,right),"c1") ) c1.start() seq = (i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print("主")
管道,了解即可
共享数据:
进程间的数据是独立的,但是可以借助对列或管道实现通信,二者都是基于消息传递的。
虽然进程间数据独立,但是可以通过Manager实现数据共享。
from multiprocessing import Manager,Process,Lock def work(d,mutex): with mutex:#加锁处理 d["count"]-= 1 if __name__ == ‘__main__‘: mutex = Lock() m = Manager() share_dic = m.dict({"count":100}) p_l = [] for i in range(100): p = Process(target = work,args = (share_dic,mutex)) p_l.append(p) p.start() for p in p_l: p.join() print(share_dic)#不加锁{"count":90}发生数据错乱
共享数据 Manager
信号量:
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁 信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念 from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() print("%s 上厕所"%user) time.sleep(random.randint(1,3)) print("%s 上完厕所"%user) sem.release() if __name__ == ‘__main__‘: sem = Semaphore(5) p_l = [] for i in range(10): p = Process(target =go_wc,args = (sem,"user%s"%i)) p.start() p_l.append(p) for i in p_l: i.join() print("===========>")
信号量
Event 事件:
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。 from multiprocessing import Process ,Event import time,random def car(e,n): while True: if not e.is_set(): print("\033[31m 红灯亮\033[0m,car%s等着"%n) e.wait() print("\033[32m车%s 看见绿灯亮了\033[0m"%n) time.sleep(random.randint(3,6)) if not e.is_set(): continue print("走你,car",n) break def police_car(e,n): while True: if not e.is_set(): print("\033[31m红灯亮\033[0m,car%s等着"%n) e.wait(1) print("灯是%s,警车走了,car %s"%(e.is_set(),n)) break def traffic_lights(e,inverval): while True: time.sleep(inverval) if e.is_set(): e.clear() else: e.set() if __name__ == ‘__main__‘: e = Event() for i in range(5): p = Process(target=police_car,args = (e,i)) p.start() t = Process(target=traffic_lights,args = (e,10)) t.start() print("==========>")
Event
进程池:
进程不能无限开启,开启进程数目越多,效率反而会下降,需要一个机制来约束,这就是进程池。
主要方法:
p.apply() 同步提交任务
p.apply_async() 异步提交任务
p.close()关闭进程池
p.join()等待所有工作进程退出,此方法只能在close()或teminate()后调用
实例:
from multiprocessing import Pool import os,time def work(n): print("%s run"%os.getpid()) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p = Pool(3)#可以指定参数,也可以默认,默认为cpu的个数,进程池从无到有创建三个进程,以后一直都是这三个进程 res_l = [] for i in range(10): res = p.apply(work,args =(i,))#同步运行,阻塞,直到本次任务执行完毕拿到res res_l.append(res) print(res_l)
进程池,apply同步执行
from multiprocessing import Pool import os,time def work(n): print("%s run"%os.getpid()) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p = Pool(3) res_l = [] for i in range(10): res = p.apply_async(work,args = (i,)) res_l.append(res) # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion, # 等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() for res in res_l: print(res.get())#apply么有get方法,因为apply是同步执行,立刻可以获得结果,不需要get方法
进程池,apply_async()异步提交
使用进程池实现socket并发:
####服务端 from socket import * from multiprocessing import Pool import os s = socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(("192.168.20.45",8888)) s.listen(5) def talk(conn,addr): print("进程pid:%s"%os.getpid()) while True: try: msg = conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break conn.close() if __name__ == ‘__main__‘: p = Pool() while True: conn,addr = s.accept() p.apply_async(talk,args = (conn,addr)) ###客户端 from socket import * c = socket(AF_INET,SOCK_STREAM) c.connect(("192.168.20.45",8888)) while True: msg = input(">>>:") if not msg:continue c.send(msg.encode("utf-8")) msg = c.recv(1024) print(msg.decode("utf-8"))
并发多个客户端,服务端同一时间只有4个不同的pid,关闭一个经常后,后面进程才会进来。
回调函数:
需要回调函数的场景:
进程池中的任何一个任务一旦处理完成,就立即通知主进程,主进程调用一个函数去调用处理该结果,该函数 就是回调函数
我们可以把耗时的任务放到进程池中,然后指定回调函数负责处理,这样 进程就 省掉了I/O的过程,直接拿到任务结果。
def get_page(url): print(‘<%s> is getting [%s]‘ %(os.getpid(),url)) response=requests.get(url) time.sleep(2) print(‘<%s> is done [%s]‘ % (os.getpid(), url)) return {‘url‘:url,‘text‘:response.text} def parse_page(res): print(‘<%s> parse [%s]‘ %(os.getpid(),res[‘url‘])) with open(‘db.txt‘,‘a‘) as f: parse_res=‘url:%s size:%s\n‘ %(res[‘url‘],len(res[‘text‘])) f.write(parse_res) if __name__ == ‘__main__‘: p=Pool(4) urls = [ ‘https://www.baidu.com‘, ‘http://www.openstack.org‘, ‘https://www.python.org‘, ‘https://help.github.com/‘, ‘http://www.sina.com.cn/‘ ] for url in urls: p.apply_async(get_page,args=(url,),callback=parse_page) p.close() p.join() print(‘主‘)
回调函数
from multiprocessing import Pool import time import requests import re def get_page(url,pattern): response = requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): print(info) page_content,pattern = info res = re.findall(pattern,page_content) for item in res: dic = { ‘index‘: item[0], ‘title‘: item[1], ‘actor‘: item[2].strip()[3:], ‘time‘: item[3][5:], ‘score‘: item[4] + item[5] } print(dic) if __name__ == ‘__main__‘: pattern1 = re.compile(r‘<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<‘,re.S) url_dic = {"http://maoyan.com/board/7":pattern1,} p =Pool() res_l = [] for url,pattern in url_dic.items(): res = p.apply_async(get_page,args = (url,pattern),callback = parse_page) res_l.append(res) for i in res_l: i.get()
回调函数 应用爬虫
如果进程中等待进程池中所有任务都执行完毕后,再同一处理则无需回调函数
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == ‘__main__‘: p = Pool() res_l = [] for i in range(10): res = p.apply_async(work,args = (i,)) res_l.append(res) p.close() p.join() nums = [] for res in res_l: nums.append(res.get()) print(nums)