生产者消费者模型
生产者消费者模型 主要是为了解耦 可以借助队列来实现生产者消费者模型 栈 : 先进后出(First In Last Out 简称 FILO) 队列 : 先进先出(First In First Out 简称 FIFO) import queue #不能进行多进程之间的数据传输(1) from multiprocessing import Queue #借助Queue解决生产者消费者模型,队列是安全的q=Queue(num)num : 队列的最大长度q.get() #阻塞等待获取数据,如果有数据就直接获取,如果没有数据就阻塞等待q.put() #阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待q.get nowait() #不阻塞,如果有数据直接获取,没有数据就报错q.put nowait() #不阻塞,如果哦可以继续往队列中放数据,就直接放,不能放就报错 (2) from multiprocessing import JoinableQueue #可连接的队列JoinableQueue 是继承Queue,所以可以使用Queue中的方法 q.join() #用于生产者.等待 q.task done的返回结果,通过返回结果,生产者就能获得消费者消费了多少个数据 q.task done() #用于消费者,是指每消费队列中一个数据,就给join返回一个标识 回调函数的使用: 进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的操作 回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数
from multiprocessing import Queue,Process import time # 消费者 def con(q,name): while 1: info=q.get() #消费 if info: #如果有就打印,否则break print(‘%s拿走了%s‘%(name,info)) else: break # 生产者 def sh(q,product): for i in range(10): info=product+‘版娃娃%s号‘% str(i) print(info) q.put(info) #生产 q.put(None) if __name__==‘__main__‘: q=Queue(10) #队列长度10(可以自己设定) p=Process(target=sh,args=(q,‘大雄‘)) p_1=Process(target=con,args=(q,‘alex‘)) p.start() #执行子进程 p_1.start() #执行子进程
队列实现 生产者消费者模型模块:from multiprocessing import Queue,Process
from multiprocessing import Queue,Process def xiao(q,name,color): # 这里的 color 是颜色的传参 while 1: ret=q.get() # 消费 q.get() if ret: print(‘%s%s拿走了%s娃娃\033[0m‘% (color,name,ret)) # color接收的是颜色的传参而且是开头,所以color要放在前面 else: break #当消费者在数据队列中拿到None的时候,就是拿到了生产者不再生产数据的标识,此时消费者结束消费即可 def sheng(q,ban): for i in range(0,12): ret=ban+‘版娃娃第%s号‘% str(i) # print(ret) q.put(ret) # 生产 q.put(变量) if __name__==‘__main__‘: q=Queue(15) #队列长度 p2 = Process(target=sheng, args=(q, ‘小熊‘)) #开启生产者子进程 p1=Process(target=xiao,args=(q,‘ko‘,‘\033[31m‘)) p1_1=Process(target=xiao,args=(q,‘lp‘,‘\033[33m‘)) #这里的转换颜色虽然传的时候是在最后面,但是这是颜色的开头,在打印的时候需要放在前面 p_p=[p1_1,p1,p2] [i.start() for i in p_p] #让两个消费者轮流消费 p2.join() #主进程阻塞等待生产子进程执行完后(生产完)再继续向下执行 q.put(None) #几个消费者就要接收几个结束标识 q.put(None)
进程间共享内存 主进程的值与子进程的值是一样的用法: m=Manager() num = m.dict({键:值}) num = m.list([1,2,3])
from multiprocessing import Process,Manager,Value def func(num): num[0]-=1 print(‘子进程中的num值是‘,num) if __name__==‘__main__‘: m=Manager() num=m.list([1,2,3]) #共享内存,所以主进程的值与子进程的值是一样的 p=Process(target=func,args=(num,)) p.start() p.join() print(‘主进程中的num值是‘,num)
进程池进程池的三个方法(1) map(func,iterable) func : 进程池中的进程执行的任务函数 iterable : 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数(2) apply(func,args=()): 同步的效率,也就是说池中的进程一个一个的去执行任务 func : 进程池中的进程执行的任务函数 args : 可迭代对象型的参数,是传给任务函数的参数 同步处理任务时,不需要close和join 同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束) (3) apply_async(func,args=(),callback=None): 异步的效率,也就是说池中的进程一次性都去执行任务 func : 进程池中的进程执行的任务函数 args : 可迭代对象型的参数,是传给任务函数的参数 callback : 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步处理, 回调函数只有异步才有同步是没有的, 异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束) 异步处理任务时,必须要加上close和join map返回值:
from multiprocessing import Pool def func(num): num +=1 print(num) return num if __name__==‘__main__‘: p=Pool() res=p.map(func,[i for i in range(10)]) p.close() p.join() print(‘主进程中的map返回值‘,res)
进程池异步处理问题(异步:开启多个进程,并且同时处理多个任务)
from multiprocessing import Pool import time def func(num): num += 1 return num if __name__ == ‘__main__‘: p = Pool(5) #设置进程数(最好是比自己电脑核数多一个) start = time.time() l = [] for i in range(10000): res = p.apply_async(func,args=(i,))# 异步处理这100个任务,异步是指,进程中有5个进程,一下就处理5个任务,接下来哪个进程处理完任务了,就马上去接收下一个任务 l.append(res) p.close() p.join() print(time.time() - start)
进程池同步处理任务(同步:虽然有多个进程,但是还是一个进程一个进程的去处理)
from multiprocessing import Pool import time def func(num): num += 1 return num if __name__==‘__main__‘: p = Pool(5) #开5个进程 start = time.time() #开启进程前记下时间 l=[] for i in range(10000): res = p.apply(func,args=(i,)) #同步处理任务,虽然有五个进程,但是依然一个进程一个进程的去处理任务 l.append(res) #把10000个数放进列表 print(l) print(time.time()-start) #进程结束的时间减去开启进程前的时间
同步和异步的效率的对比
from multiprocessing import Pool import requests import time def func(url): res = requests.get(url) print(res.text) if res.status_code == 200: return ‘ok‘ if __name__ == ‘__main__‘: p = Pool(5) l = [‘https://www.baidu.com‘, ‘http://www.jd.com‘, ‘http://www.taobao.com‘, ‘http://www.mi.com‘, ‘http://www.cnblogs.com‘, ‘https://www.bilibili.com‘, ] start = time.time() for i in l: p.apply(func,args=(i,)) apply_= time.time() - start start = time.time() for i in l: p.apply_async(func, args=(i,)) p.close() p.join() print(‘同步的时间是%s,异步的时间是%s‘%(apply_, time.time() - start))
原文地址:https://www.cnblogs.com/hdy19951010/p/9526364.html
时间: 2024-11-04 21:30:12