joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道

一、生产者消费者

  主要是为解耦(借助队列来实现生产者消费者模型)

  import queue  # 不能进行多进程之间的数据传输

  (1)from multiprocessing import Queue    借助Queue解决生产者消费者模型,队列是安全的。

    q = Queue(num)

    num :为队列的最大长度

    q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待

    q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

    q.get_nowait() # 不阻塞,如果有数据直接获取,没有数据就报错

    q.put_nowait() # 不阻塞,如果能往队列中放数据直接放,不可以就报错

    

  (2)from multiprocessing import JoinableQueue  # 可连接的队列

    JoinableQueue 是继承Queue  ,所以可以使用Queue中的方法

    并且JoinableQueue 又多了两个方法

    q.join() # 用于生产者。等待q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据。

    q.task_done() # 用于消费者,是指每消费队列中的一个数据,就给join返回一个标识。

 1 from multiprocessing import Queue,Process,Pool,JoinableQueue
 2
 3
 4 def consumer(q,lis):
 5     while 1:
 6         for i in lis:
 7             print(i + ‘拿走了‘ + q.get())
 8             q.task_done() # get() 一次就会给生产者的join返回一次数据
 9
10
11 def producer(q,name1):
12     for i in range(1,9):
13         q.put(name1 + ‘第%s号剑‘% i)
14     q.join() # 记录了生产者往队列中添加了8个数据,此时会阻塞,等待消费返回8次数据,后生产者进程才会结束
15
16
17 if __name__ == ‘__main__‘:
18     q = JoinableQueue() # 实例化一个队列
19     p = Process(target=consumer,args=(q,[‘盖聂‘,‘卫庄‘,‘高渐离‘,‘胜七‘,‘掩日‘]))
20     p1 = Process(target=producer,args=(q,‘越王八剑‘))
21     p.daemon = True # 注意是把消费者设置为守护进程,会随着主进程的结束而结束。
22     p.start()
23     p1.start()
24     p1.join() # 主进程会等待生产者进程结束后才结束,而生产者进程又会等待消费者进程消费完以后才结束。

二、进程之间的共享内存

  from multiprocessing import Manager,Value

  m = Manager()

  num = m.dict({键 :值})

  num = m.list([1,2,3])

from multiprocessing import Process,Manager,Value

def func(num):
    for i in num:
        print(i - 1) # 结果为:0,1,2

if __name__ == ‘__main__‘:
    m = Manager() # 用来进程之间共享数据的
    num = m.list([1,2,3])
    p = Process(target=func,args=(num,))
    p.start()
    p.join() # 等待func子进程执行完毕后结束

#################Value################

from multiprocessing import Process,Manager,Value

def func1(num):
    print(num)
    num.value += 1 # 和Manager用法不一样
    print(num.value)

if __name__ == ‘__main__‘:
    num = Value(‘i‘,123) # Manager里面不需要传参数
    p = Process(target=func1,args=(num,))
    p.start()
    p.join()

三、进程池

  进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就去处理。

  进程池还会帮程序员去管理池中的进程。

  from multiprocessing import Pool

  p = Pool(os.cpu_count() + 1)

  进程池有三个方法:

    map(func,iterable)     有返回值

    iterable:可迭代对象,是把可迭代对象中的每个元素一次传给任务函数当参数 

from multiprocessing import Pool

def func(num):
    num += 1
    print(num)
    return num # 返回给map方法

if __name__ == ‘__main__‘:
    p = Pool()
    res = p.map(func,[i for i in range(10)]) # 参数为目标对象和可迭代对象
    p.close()
    p.join() # 等待子进程结束
    print(‘主进程‘,res) # res是一个列表

    apply(func,args=()) :apply的实现是进程之间是同步的,池中的进程一个一个的去执行。

    func :进程池中的进程执行的任务函数。

    args :可迭代对象型的参数,是传给任务函数的参数。

    同步处理任务时,不需要close和join

    同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其子进程执行结束)

from multiprocessing import Pool

def func(num):
    num += 1
    return num

if __name__ == ‘__main__‘:
    p = Pool(5) # 实例化5个进程
    for i in range(100):
        res = p.apply(func,args=(i,)) # 这里传的参数是元祖,这里是同步执行
        print(res)

    apply_async(func,args=(),callback=None) :进城之间是异步的,

    func :进程池中的进程执行的任务函数。

    args :可迭代对象型的参数,是传给任务函数的参数

from multiprocessing import Pool

def func(num):
    num += 1
    return num

if __name__ == ‘__main__‘:
    p = Pool(5) # 实例化5个进程
    lis = []
    for i in range(100):
        res = p.apply_async(func,args=(i,)) # 异步执行,5个进程同时去调用func
        lis.append(res)
        print(res) # 打印结果为 <multiprocessing.pool.ApplyResult object at 0x0347F3D0>
    p.close() # Pool中用apply_async异步执行时必须关闭进程
    p.join() # 因为是异步执行所以需要等待子进程结束
    print(lis) # 100个<multiprocessing.pool.ApplyResult object at 0x0347F3D0> 这种存放在列表中
    [print(i.get()) for i in lis] # 输出100个数字[1......100]

    callback :回调函数,就是说每当进程池中有进程处理完任务,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的

    异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)

    异步处理任务时,必须要加上close和join

    回调函数的使用:

      进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作

      回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。

from multiprocessing import Pool
import requests
import os

def func(ulr):
    res = requests.get(ulr)
    print(‘func进程的pid:%s‘ % os.getpid(),‘父进程的pid:%s‘ % os.getppid())
    if res.status_code == 200:
        return ulr,res.text

def cal_back(sta): # func中返回的值被自动调用,并当成形参传进来
    ulr,text = sta
    print(‘callback回调函数的pid:%s‘% os.getpid(),‘父进程的pid:%s‘ % os.getppid())
    # 回调函数的pid和父进程的pid一样

if __name__ == ‘__main__‘:
    p = Pool(5)
    lis = [‘https://www.baidu.com‘,
         ‘http://www.jd.com‘,
         ‘http://www.taobao.com‘,
         ‘http://www.mi.com‘,
         ‘http://www.cnblogs.com‘,
         ‘https://www.bilibili.com‘,
         ]
    print(‘父进程的pid:%s‘ % os.getpid())
    for i in lis:
        p.apply_async(func,(i,),callback=cal_back)
    # 异步的执行每一个进程,这里的传参和Process不同,这里必须这样写callback=cal_back
    # 异步执行程序func,在每个任务结束后,在func中return回一个结果,这个结果会自动的被callback函数调用,并当成形参来接收。
    p.close() # 进程间异步必须加上close()
    p.join()  # 等待子进程的结束

四、管道机制

  from multiprocessing import Pipe

  con1,con2 = Pipe()

  管道是不安全的

  管道是用于多进程之间通信的一种方式。

  如果在单进程中使用管道,con1发数据,那么就用con2来收数据

              con2发数据,那么就用con1来收数据

  如果在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发

                    父进程使用con1发,子进程就必须使用con2收

                    父进程使用con2收,子进程就必须使用con1发

                    父进程使用con2发,子进程就必须使用con1收

  在管道中有一个著名的错误叫做EOFError。是指,父进程如果关闭了发送端,子进程还继续收数据,那么就会引发EOFError。 

# 单进程中管道的应用
from multiprocessing import Pipe

con1,con2 = Pipe() # 管道机制

con1.send(‘123‘) # con1发送,需要con2来接收   是固定
print(con2.recv())
con2.send(‘456‘) # con2发送,需要con1来接收   是固定
print(con1.recv())
# 多进程中管道的应用
from multiprocessing import Process,Pipe

def func(con):
    con1,con2 = con
    con1.close() # 因为子进程只用con2与父进程通信,所以关闭了
    while 1:
        try:
            print(con2.recv()) # 接收父进程con1发来的数据
        except EOFError: # 如果父进程的con1发完数据,并关闭管道,子进程的con2继续接收数据,就会报错。
            con2.close() # 当接到报错,此时数据已经接收完毕,关闭con2管道。
            break # 退出循环

if __name__ == ‘__main__‘:
    con1,con2 = Pipe()
    p = Process(target=func,args=(con1,con2))
    p.start()
    con2.close() # 因为父进程是用con1来发数据的,con2提前关闭。
    for i in range(10): # 生产数据
        con1.send(‘郭%s‘ % i) # 给子进程的con2发送数据
    con1.close() # 生产完数据,关闭父进程的con1管道

原文地址:https://www.cnblogs.com/wjs521/p/9520988.html

时间: 2024-08-01 05:11:21

joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道的相关文章

生产者消费者模型及队列,进程池

生产者消费者模型 生产者消费者模型 主要是为了解耦 可以借助队列来实现生产者消费者模型 栈 : 先进后出(First In Last Out 简称 FILO) 队列 : 先进先出(First In First Out 简称 FIFO) import queue #不能进行多进程之间的数据传输(1) from multiprocessing import Queue #借助Queue解决生产者消费者模型,队列是安全的q=Queue(num)num : 队列的最大长度q.get() #阻塞等待获取数

队列、生产者消费者模型

目录 队列.生产者消费者模型.初识线程 一.用进程锁来优化抢票小程序 1.1 进程锁 1.2 优化抢票小程序 二.队列 2.1 队列的介绍 2.2 创建队列的类 2.3 使用队列的案例 三.生产者消费者模型 3.1 用队列Queue实现生产者消费者模型 3.2 用队列JoinableQueue实现生产者消费者模型 队列.生产者消费者模型.初识线程 一.用进程锁来优化抢票小程序 1.1 进程锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没有问题的.而共享带来

1-7 生产者消费者模型

一 生产者消费者模型介绍 为什么要使用生产者消费者模型 生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者和消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生

进击的Python【第九章】:paramiko模块、线程与进程、各种线程锁、queue队列、生产者消费者模型

一.paramiko模块 他是什么东西? paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接. 先来个实例: 1 import paramiko 2 # 创建SSH对象 3 ssh = paramiko.SSHClient() 4 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ss

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

进程 &gt;&gt; 互斥锁、队列与管道、生产者消费者模型

目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重新实例化一把锁,我们的目的是想让所有的子进程使用同一把锁,所以需要把锁传递给子进程在使用 锁名.acquire():开锁->所有子进程开始抢位置 锁名.release():关锁->位置排好了,开始执锁起来执行. join与互斥锁的区别:join是把所有的子进程代码变为串行的,而互斥锁则可以规定那几

生产者消费者模型,管道,进程之间共享内存,进程池

课程回顾: 并行:在同一时间点上多个任务同时执行 并发:在同一时间段上多个任务同时执行 进程的三大基本状态: 就绪状态:所有进程需要的资源都获取到了,除了CPU 执行状态:获取到了所有资源包括CPU,进程处于运行状态 阻塞状态:程序停滞不在运行,放弃CPU,进程此时处于内存里 什么叫进程? 正在运行的程序 有代码段,数据段,PCB(进程控制块) 进程是资源分配的基本单位. 进程之间能不能直接通信? 正常情况下,多进程之间是无法进行通信的.因为每个进程都有自己独立的空间 锁: 为了多进程通信时,保

python—day29 守护进程、互斥锁、模拟抢票、IPC通信机制、生产者消费者模型

1.守护进程: 什么是守护进程,假如你是皇帝,每日每夜守护你的就是太监,守护进程就相当于太监,当皇帝驾崩后太监也需要陪葬,所以守护进程当父进程销毁时就一起销毁: 1 from multiprocessing import Process 2 3 import time 4 5 def task(name): 6 7 time.sleep(0.5) 8 print('%s' %name) 9 10 11 if __name__ == '__main__': 12 p = Process(targe