生产者消费者模型及队列,进程池

生产者消费者模型

生产者消费者模型    主要是为了解耦    可以借助队列来实现生产者消费者模型

栈 : 先进后出(First In Last Out     简称  FILO)    队列 : 先进先出(First In First Out     简称  FIFO)

import queue    #不能进行多进程之间的数据传输(1) from multiprocessing import Queue     #借助Queue解决生产者消费者模型,队列是安全的q=Queue(num)num : 队列的最大长度q.get()  #阻塞等待获取数据,如果有数据就直接获取,如果没有数据就阻塞等待q.put()  #阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待q.get nowait()  #不阻塞,如果有数据直接获取,没有数据就报错q.put nowait()  #不阻塞,如果哦可以继续往队列中放数据,就直接放,不能放就报错

(2)  from multiprocessing import JoinableQueue  #可连接的队列JoinableQueue 是继承Queue,所以可以使用Queue中的方法    q.join() #用于生产者.等待 q.task done的返回结果,通过返回结果,生产者就能获得消费者消费了多少个数据    q.task done() #用于消费者,是指每消费队列中一个数据,就给join返回一个标识

回调函数的使用:    进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的操作    回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数
from multiprocessing import Queue,Process
import time
# 消费者
def con(q,name):
    while 1:
        info=q.get()    #消费
        if info:    #如果有就打印,否则break
            print(‘%s拿走了%s‘%(name,info))
        else:
            break
# 生产者
def sh(q,product):
    for i in range(10):
        info=product+‘版娃娃%s号‘% str(i)
        print(info)
        q.put(info)     #生产
    q.put(None)

if __name__==‘__main__‘:
    q=Queue(10)     #队列长度10(可以自己设定)
    p=Process(target=sh,args=(q,‘大雄‘))
    p_1=Process(target=con,args=(q,‘alex‘))
    p.start()       #执行子进程
    p_1.start()     #执行子进程
队列实现 生产者消费者模型模块:from multiprocessing import Queue,Process
from multiprocessing import Queue,Process
def xiao(q,name,color):     # 这里的 color 是颜色的传参
    while 1:
        ret=q.get()     # 消费 q.get()
        if ret:
            print(‘%s%s拿走了%s娃娃\033[0m‘% (color,name,ret))   # color接收的是颜色的传参而且是开头,所以color要放在前面
        else:
            break       #当消费者在数据队列中拿到None的时候,就是拿到了生产者不再生产数据的标识,此时消费者结束消费即可

def sheng(q,ban):
    for i in range(0,12):
        ret=ban+‘版娃娃第%s号‘% str(i)
        # print(ret)
        q.put(ret)      # 生产  q.put(变量)

if __name__==‘__main__‘:
    q=Queue(15)         #队列长度
    p2 = Process(target=sheng, args=(q, ‘小熊‘))      #开启生产者子进程
    p1=Process(target=xiao,args=(q,‘ko‘,‘\033[31m‘))
    p1_1=Process(target=xiao,args=(q,‘lp‘,‘\033[33m‘))      #这里的转换颜色虽然传的时候是在最后面,但是这是颜色的开头,在打印的时候需要放在前面
    p_p=[p1_1,p1,p2]
    [i.start() for i in p_p]        #让两个消费者轮流消费
    p2.join()   #主进程阻塞等待生产子进程执行完后(生产完)再继续向下执行
    q.put(None)     #几个消费者就要接收几个结束标识
    q.put(None)
进程间共享内存    主进程的值与子进程的值是一样的用法:  m=Manager()       num = m.dict({键:值})       num = m.list([1,2,3])
from multiprocessing import Process,Manager,Value
def func(num):
    num[0]-=1
    print(‘子进程中的num值是‘,num)

if __name__==‘__main__‘:
    m=Manager()
    num=m.list([1,2,3])     #共享内存,所以主进程的值与子进程的值是一样的
    p=Process(target=func,args=(num,))
    p.start()
    p.join()
    print(‘主进程中的num值是‘,num)
进程池进程池的三个方法(1) map(func,iterable)    func : 进程池中的进程执行的任务函数    iterable : 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数(2) apply(func,args=()): 同步的效率,也就是说池中的进程一个一个的去执行任务    func : 进程池中的进程执行的任务函数    args : 可迭代对象型的参数,是传给任务函数的参数    同步处理任务时,不需要close和join    同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

(3) apply_async(func,args=(),callback=None):   异步的效率,也就是说池中的进程一次性都去执行任务    func : 进程池中的进程执行的任务函数    args : 可迭代对象型的参数,是传给任务函数的参数    callback : 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步处理,               回调函数只有异步才有同步是没有的,  异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)               异步处理任务时,必须要加上close和join

map返回值:
from multiprocessing import Pool
def func(num):
    num +=1
    print(num)
    return num

if __name__==‘__main__‘:
    p=Pool()
    res=p.map(func,[i for i in range(10)])
    p.close()
    p.join()
    print(‘主进程中的map返回值‘,res)
进程池异步处理问题(异步:开启多个进程,并且同时处理多个任务)
from multiprocessing import Pool
import time

def func(num):
    num += 1
    return num

if __name__ == ‘__main__‘:
    p = Pool(5)         #设置进程数(最好是比自己电脑核数多一个)
    start = time.time()
    l = []
    for i in range(10000):
        res = p.apply_async(func,args=(i,))# 异步处理这100个任务,异步是指,进程中有5个进程,一下就处理5个任务,接下来哪个进程处理完任务了,就马上去接收下一个任务
        l.append(res)
    p.close()
    p.join()
    print(time.time() - start)
进程池同步处理任务(同步:虽然有多个进程,但是还是一个进程一个进程的去处理)
from multiprocessing import Pool
import time

def func(num):
    num += 1
    return num
if __name__==‘__main__‘:
    p = Pool(5)     #开5个进程
    start = time.time()   #开启进程前记下时间
    l=[]
    for i in range(10000):
        res = p.apply(func,args=(i,))   #同步处理任务,虽然有五个进程,但是依然一个进程一个进程的去处理任务
        l.append(res)     #把10000个数放进列表
    print(l)
    print(time.time()-start)      #进程结束的时间减去开启进程前的时间
同步和异步的效率的对比
from multiprocessing import Pool
import requests
import time

def func(url):
    res = requests.get(url)
    print(res.text)
    if res.status_code == 200:
        return ‘ok‘

if __name__ == ‘__main__‘:
    p = Pool(5)
    l = [‘https://www.baidu.com‘,
         ‘http://www.jd.com‘,
         ‘http://www.taobao.com‘,
         ‘http://www.mi.com‘,
         ‘http://www.cnblogs.com‘,
         ‘https://www.bilibili.com‘,
         ]
    start = time.time()
    for i in l:
        p.apply(func,args=(i,))

    apply_= time.time() - start

    start = time.time()
    for i in l:
        p.apply_async(func, args=(i,))
    p.close()
    p.join()
    print(‘同步的时间是%s,异步的时间是%s‘%(apply_, time.time() - start))

原文地址:https://www.cnblogs.com/hdy19951010/p/9526364.html

时间: 2024-11-04 21:30:12

生产者消费者模型及队列,进程池的相关文章

joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道

一.生产者消费者 主要是为解耦(借助队列来实现生产者消费者模型) import queue  # 不能进行多进程之间的数据传输 (1)from multiprocessing import Queue    借助Queue解决生产者消费者模型,队列是安全的. q = Queue(num) num :为队列的最大长度 q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待 q.get_now

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

Python学习笔记——进阶篇【第九周】———线程、进程、协程篇(队列Queue和生产者消费者模型)

Python之路,进程.线程.协程篇 本节内容 进程.与线程区别 cpu运行原理 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 参考链接http://www.cnblogs.com/alex3714/articles/5230609.html

使用阻塞队列实现生产者-消费者模型

生产者-消费者模问题 /** * 使用阻塞队列实现生产者-消费者模型 * 阻塞队列只允许元素以FIFO的方式来访问 * @author Bingyue * */ public class ProducerCustomerPattern { public static void main(String[] args) { //生产者和消费者共享的存储区域 BlockingQueue<Integer> blockQueue=new LinkedBlockingQueue(); /** * 此处外部

生产者消费者模型,管道,进程之间共享内存,进程池

课程回顾: 并行:在同一时间点上多个任务同时执行 并发:在同一时间段上多个任务同时执行 进程的三大基本状态: 就绪状态:所有进程需要的资源都获取到了,除了CPU 执行状态:获取到了所有资源包括CPU,进程处于运行状态 阻塞状态:程序停滞不在运行,放弃CPU,进程此时处于内存里 什么叫进程? 正在运行的程序 有代码段,数据段,PCB(进程控制块) 进程是资源分配的基本单位. 进程之间能不能直接通信? 正常情况下,多进程之间是无法进行通信的.因为每个进程都有自己独立的空间 锁: 为了多进程通信时,保

进击的Python【第九章】:paramiko模块、线程与进程、各种线程锁、queue队列、生产者消费者模型

一.paramiko模块 他是什么东西? paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接. 先来个实例: 1 import paramiko 2 # 创建SSH对象 3 ssh = paramiko.SSHClient() 4 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ss

进程 &gt;&gt; 互斥锁、队列与管道、生产者消费者模型

目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重新实例化一把锁,我们的目的是想让所有的子进程使用同一把锁,所以需要把锁传递给子进程在使用 锁名.acquire():开锁->所有子进程开始抢位置 锁名.release():关锁->位置排好了,开始执锁起来执行. join与互斥锁的区别:join是把所有的子进程代码变为串行的,而互斥锁则可以规定那几

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

生产者消费者模型 线程池

1.生产者消费者模型 主要是为解耦 借助队列来实现生产者消费这模型 栈:先进后出(First In Last Out 简称:FILO) 队列:先进先出(FIFO) import queue from multiprocessing import Queue 借助Queue解决生产者消费这模型队列是安全的 q=Queue(m) q = Queue(num) num : 队列的最大长度 q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put()# 阻塞,如果可以继