第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法

内容大纲:    进程之间的通讯        进程队列        管道

进程之间的数据共享    进程池        使用进程池 开启进程        提交任务        获得返回值        回调函数1.进程队列    先进先出
from multiprocessing import Queue
    import queue
    q = Queue()
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())

  



    1
    2
    3

  



from multiprocessing import Queue
import queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
print(q.get())#q已经被取空 没法取值 程序会被夯住

  



from multiprocessing import Queue
import queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
print(q.get_nowait())#报错queue.Empty

  



1
2
3

  



from multiprocessing import Queue
import queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        break

  



1
2
3

  



from multiprocessing import Queue
import queue
q = Queue(3)#设置队列的最大容量
q.put(1)
q.put(2)
q.put(3)
q.put(4)#队列已经放满了 程序被夯住

  



from multiprocessing import Queue
import queue
q = Queue(3)#设置队列的最大容量
while True:
    try:
        q.put_nowait(1)#队列放满了报出异常
    except queue.Full:
        break

while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        break

  



1
1
1

  


#q.empty()q.full()这两个方法不是很可靠,因为别的进程会随时往队列里面添加或者取走元素
from multiprocessing import Queue
import queue
q = Queue(3)#设置队列的最大容量
while True:
    try:
        q.put_nowait(1)#队列放满了报出异常
    except queue.Full:
        break
print(q.empty())#判断队列是否为空
print(q.full())#判断队列是否已满
while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        break
print(q.empty())
print(q.full())

  



False
True
1
1
1
True
False

  



from multiprocessing import Process,Queue
def consume(q):
    print(q.get())

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target= consume,args=(q,))
    p.start()
    q.put({‘123‘:456})

  

{‘123‘: 456}
from multiprocessing import Process,Queue
def consume(q):
    print(‘son --->‘,q.get())
    q.put(‘abc‘)

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target= consume,args=(q,))
    p.start()
    q.put({‘123‘:456})
    p.join()
    print(‘Foo --->‘, q.get())

  



 son - --> {‘123‘: 456}
    Foo - --> abc

  

2.什么是生产者消费者模型
import time
import random
from multiprocessing import Process,Queue

def consumer(q,name):
    while True:
        food = q.get()#循环不停的从队列里面取走元素
        if food is None:break#取到None,退出循环
        time.sleep(random.uniform(0.5,1))
        print(‘%s吃了:%s‘%(name,food))

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.uniform(0.3,0.8))
        print(‘%s 生产了:%s%s‘%(name,food,i))
        q.put(food+str(i))

if __name__ == ‘__main__‘:
    q = Queue()

    c1 = Process(target=consumer,args=(q,‘alex‘))
    c1.start()

    p1 = Process(target=producer,args=(q,‘沙县小吃‘,‘鸡腿‘))
    p1.start()

    p1.join()#队列里面放元素 设置成一个同步事件.
    q.put(None)#生产(队列里面添加元素)结束之后 最后放一个None

  



沙县小吃 生产了:鸡腿0
沙县小吃 生产了:鸡腿1
alex吃了:鸡腿0
沙县小吃 生产了:鸡腿2
alex吃了:鸡腿1
沙县小吃 生产了:鸡腿3
沙县小吃 生产了:鸡腿4
alex吃了:鸡腿2
沙县小吃 生产了:鸡腿5
沙县小吃 生产了:鸡腿6
alex吃了:鸡腿3
沙县小吃 生产了:鸡腿7
alex吃了:鸡腿4
沙县小吃 生产了:鸡腿8
alex吃了:鸡腿5
沙县小吃 生产了:鸡腿9
alex吃了:鸡腿6
alex吃了:鸡腿7
alex吃了:鸡腿8
alex吃了:鸡腿9

  


3.可阻塞的队列 JoinableQueue  多了两个方法 q.task_done() q.join()
import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    while True:
        food = q.get()#循环不行的从队列里面取走元素
        # if food is None:break 这句代码在JoinableQueue中就不需要了
        time.sleep(random.uniform(0.5,1))
        print(‘%s吃了:%s‘%(name,food))
        q.task_done()#完成了任务向队列汇报 #只有消费者里面才需要汇报
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.3,0.8))
        print(‘%s 生产了:%s%s‘%(name,food,i))
        q.put(food+str(i))

if __name__ == ‘__main__‘:

    jq = JoinableQueue()#可阻塞的队列

    c1 = Process(target=consumer,args=(jq,‘alex‘))
    c2 = Process(target=consumer,args=(jq,‘taibai‘))
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    p1 = Process(target=producer,args=(jq,‘沙县小吃‘,‘鸡腿‘))
    p2 = Process(target=producer,args=(jq,‘黄焖鸡‘,‘炒米粉‘))
    p1.start()
    p2.start()

    p1.join()#生产者把所有的元素都放到队列里面才停止
    p2.join()

    jq.join()#可阻塞的队列 设置成阻塞的

  


程序执行完成后结束:
沙县小吃 生产了:鸡腿0
     黄焖鸡 生产了:炒米粉0
     黄焖鸡 生产了:炒米粉1
alex吃了:鸡腿0
     沙县小吃 生产了:鸡腿1
     黄焖鸡 生产了:炒米粉2
taibai吃了:炒米粉0
     沙县小吃 生产了:鸡腿2
alex吃了:炒米粉1
     黄焖鸡 生产了:炒米粉3
alex吃了:炒米粉2
taibai吃了:鸡腿1
     沙县小吃 生产了:鸡腿3
     黄焖鸡 生产了:炒米粉4
     沙县小吃 生产了:鸡腿4
alex吃了:鸡腿2
taibai吃了:炒米粉3
alex吃了:鸡腿3
taibai吃了:炒米粉4
alex吃了:鸡腿4

  


4,什么是管道?管道有左右两端,左边发送右边接收,或者右边发送,左边接收
from multiprocessing import Pipe
left,right = Pipe()
left.send(‘hello‘)
print(right.recv())

  



hello

  



from multiprocessing import  Process,Pipe
def consumer(pipe):
    print(pipe[1].recv())#pipe[1].recv()管道的右边接收
if __name__ == ‘__main__‘:
    pipe = Pipe()
    Process(target=consumer,args=(pipe,)).start()
    pipe[0].send(‘hello‘)

  

hello

  



from multiprocessing import  Process,Pipe
def consumer(left,right):
    print(right.recv())#pipe[1].r
    ecv()管道的右边接收
if __name__ == ‘__main__‘:
    left,right = Pipe()#生成的是一个元组(左端,右端)
    Process(target=consumer,args=(left,right)).start()
    left.send(‘hello‘)

  

hello

  


管道端口的关闭
from multiprocessing import  Pipe,Process
def consumer(left,right):
    left.close()
    while True:
        try:
            print(right.recv())
        except EOFError:
            break
if __name__ == ‘__main__‘:
    left,right = Pipe()
    Process(target=consumer,args=(left,right)).start()
    right.close()
    for i in range(5):
        left.send(‘鸡腿%s‘%i)
    left.close()

  

    鸡腿0
    鸡腿1
    鸡腿2
    鸡腿3
    鸡腿4

  


总结一下:    队列是基于管道实现的    管道是基于socket实现的    队列+锁 是一种简便的IPC机制,是的进程之间的数据变得安全,    什么是IPC inter-process-commucate进程之间的通讯    socket+pickle实现进程之间的通讯,同一台计算机通过文件的收发实现进程之间的通讯

5,什么是进程池?为什么要有进程池?开启过多的进程并不能够提高效率,反而会降低效率

进程的分类:    计算密集型:        重分占用CPU,多进程可以充利用CPU的多核        适合开启多个进程    IO密集型:        大部分的时间都在阻塞队列,而不是在运行状态,        根本不适合开启多个进程

信号量,多进程,进程池的概念区别:    现在需要生产500件衣服,应该买几台机器?雇佣几名工人???

信号量 模式:        500件衣服要生产      500个任务        雇佣500个人         开启了500个进程        购买4台机器          4核CPU

多进程模式:        500件衣服要生产      500个任务        雇佣500个人         开启了500个进程        购买4台机器          4核CPU

进程池模式:        500件衣服要生产      500个任务        雇佣4个人         4个人一人一台机器,不停地生产        购买4台机器          4核CPU
import  time
from  multiprocessing import Pool,Process
def func(num):
    print(‘生产了第%s件衣服‘%num)

if __name__ == ‘__main__‘:
    start = time.time()
    p = Pool(4)#创建进程池 池子的最大进程是4个进程

    for i in range(100):
        p.apply_async(func,args=(i,))#异步提交 func到子进程中执行
    p.close()#关闭池,用户不能再向池中提交任务
    p.join()#阻塞,直到进程池中所有的进程都执行完毕,主进程才能结束
    print(time.time()-start)

  



生产了第0件衣服
...
生产了第99件衣服
2.899761915206909#时间消耗

  


#多进程的方式
import  time
from  multiprocessing import Pool,Process
def func(num):
    print(‘生产了第%s件衣服‘%num)

if __name__ == ‘__main__‘:
    start = time.time()
    p_list = []
    for i in range(10):
        p = Process(target=func,args=(i,))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print(time.time()-start)

  



生产了第1件衣服
生产了第9件衣服
生产了第2件衣服
生产了第3件衣服
生产了第4件衣服
生产了第5件衣服
生产了第0件衣服
生产了第8件衣服
生产了第6件衣服
生产了第7件衣服
5.608381509780884

  


同步提交与异步提交的区别
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))

if __name__ == ‘__main__‘:
    p = Pool(4)
    for i in range(20):
        p.apply(task,args=(i,))#提交任务的方式是同步提交

  



0 : 4424
1 : 2748
2 : 9464
3 : 4176 #后面的pid是不断的重复上面4个pid(进程编号)
4 : 4424
5 : 2748
6 : 9464
7 : 4176
8 : 4424
9 : 2748
10 : 9464
11 : 4176
12 : 4424
13 : 2748
14 : 9464
15 : 4176
16 : 4424
17 : 2748
18 : 9464
19 : 4176

  


#同提交的方法可以得到返回值
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool(4)
    for i in range(20):
        res = p.apply(task,args=(i,))#提交任务的方式是同步提交
        print(‘-->‘,res)

  



0 : 9544
--> 0
1 : 4760
--> 1
2 : 9028
--> 4
3 : 9572
--> 9
4 : 9544
--> 16
5 : 4760
--> 25
6 : 9028
--> 36
7 : 9572
--> 49
8 : 9544
--> 64
9 : 4760
--> 81
10 : 9028
--> 100
11 : 9572
--> 121
12 : 9544
--> 144
13 : 4760
--> 169
14 : 9028
--> 196
15 : 9572
--> 225
16 : 9544
--> 256
17 : 4760
--> 289
18 : 9028
--> 324
19 : 9572
--> 361

  


#异步提交 不能拿到任务的结果 但是可以拿到 任务提交的情况
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(5):
        res = p.apply_async(task,args=(i,))#提交任务的方式是异步提交
        print(‘-->‘,res)

  



--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C438>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C518>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C5C0>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C6A0>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C780>

  



import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(5):
        p.apply_async(task,args=(i,))#提交任务的方式是同步提交
    p.close()#关闭池子 不能再往里面添加任务
    p.join()#进程池设置成阻塞 任务完成了主进程成才能关闭

  



0 : 10100
1 : 624
2 : 8288
3 : 9532
4 : 10100

  


进程池总结:    p = Pool()实例化的时候进程的个数  默认值是cpu的个数u,或者设置成cpu+1    提交任务:        同步提交:apply(函数名,args =())            #有返回值,返回值是子函数逇返回值            #一个任务接着一个任务按顺序同步执行,没有任何并发的结果        异步提交:apply_async            #返回值是任务提交的结果            #p.close()            #p.join()            #必须先close()再join(),p设置成阻塞,直到p中所有的进程都执行完毕,才结束主进程

#这种法法取值,与同步提交没有区别
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(5):
        res = p.apply_async(task,args=(i,))#提交任务的方式是同步提交
        print(res.get())

  



 0: 8836
    0
    1: 9248
    1
    2: 9780
    4
    3: 2588
    9
    4: 8836
    16

  


#将计算的结果放进列表
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    res_lsit = []
    for i in range(5):
        res = p.apply_async(task,args=(i,))#提交任务的方式是同步提交
        res_lsit.append(res)#将计算的结果放进列表
    for res in res_lsit:
        print(res.get())

  



0 : 9424
0
1 : 912
1
2 : 8572
4
3 : 7436
9
4 : 9424
16

  


#p.map(函数名,参数)
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    p.map(task,range(5))

  



0 : 2880
1 : 8388
2 : 7888
3 : 9540
4 : 2880

  



原文地址:https://www.cnblogs.com/cavalier-chen/p/9683892.html

时间: 2024-08-04 15:16:17

第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法的相关文章

112 python程序中的进程操作-进程之间进行通信(mulitiProcessing Queue队列)

一.进程间通信 IPC(Inter-Process Communication) IPC机制:实现进程之间通讯 管道:pipe 基于共享的内存空间 队列:pipe+锁的概念--->queue 二.队列(Queue) 2.1 概念-----multiProcess.Queue 创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. Queue([maxsize])创建共享的进程队列. 参数 :maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制

c# IPC实现本机进程之间的通信

IPC可以实现本地进程之间通信.这种用法不是太常见,常见的替代方案是使用wcf,remoting,web service,socket(tcp/pipe/...)等其他分布式部署方案来替代进程之间的通信.虽然不常见但也避免不了一些场景会使用该方案. 应用包含: 1)使用IPC技术实现多client与一个sever通信(不过是本机,感觉意义不大,但如果想实现本机上运行确实是一个不错的方案): 2)使用IPC技术实现订阅者和生产者分离时,一个server接收并消费消息,客户端是生产消息的. 1 1:

11.python并发入门(part10 多进程之间实现通信,以及进程之间的数据共享)

一.进程队列. 多个进程去操作一个队列中的数据,外观上看起来一个进程队列,只是一个队列而已,单实际上,你开了多少个进程,这些进程一旦去使用这个队列,那么这个队列就会被复制多少份. (队列=管道+锁) 这么做的主要原因就是,不同进程之间的数据是无法共享的. 下面是使用进程队列使多进程之间互相通信的示例: 下面这个例子,就是往进程队列里面put内容. #!/usr/local/bin/python2.7 # -*- coding:utf-8 -*- import multiprocessing de

python 进程之间互相通信-----&gt;队列(推荐使用)

1.进程之间相互通信有几种实现方式. multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的,推荐使用队列,因为管道也需要处理锁的问题. 2队列的主要方法 # 1.q.put方法用以插入数据到队列中,# put方法还有两个可选参数:blocked和timeout.如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间.# 如果超时,会抛出Queue.Full异常.如果blocked为Fals

进程之间的通信

1无名管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用.进程的亲缘关系通常是指父子进程关系. 2.高级管道(popen):将另一个程序当做一个新的进程在当前程序进程中启动,则它算是当前程序的子进程,这种方式我们成为高级管道方式.3 有名管道 (named pipe) : 有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信.4信号量( semophore ) : 信号量是一个计数器,可以用来控制多个进程对共享资源的访问.它常作为一种

网络编程-进程-4、队列完成进程之间的通信

前言:之前说过,多个进程之间是不能共享全局变量的,那么怎么解决这个问题呢?通过队列Queue去解决这个问题 1.看代码演示,解释看注解: #!/usr/bin/env python # coding=utf-8 # author:刘仲 # datetime:2018/7/25 16:31 # software: PyCharm import multiprocessing """定义一个全局变量num,线程函数test1修改全局变量num然后放进num1空列表,然后调用队列对

python全栈开发 * 进程之间的通信,进程之间数据共享 * 180726

进程之间的通信(IPC)队列和管道一.队列 基于管道实现 管道 + 锁 数据安全(一).队列 队列遵循先进先出原则(FIFO) 多用于维护秩序,买票,秒杀 队列的所有方法: put()(给队列里添加数据),put_nowait(), get()(从队列中获取数据),get_nowait(), 相同点:有值的时候取值 区别:get()没有值时会阻塞 get_nowait() 没有值时会报错 full()(返回布尔值),empty()(返回bool值), qsize()(队列大小) 示例: from

进程对象的其他方法、守护进程、使用多进程实现 socket tcp协议 server端的并发(抢票程序)、队列、进程之间的通信(IPC)

# 进程对象的其他方法 from multiprocessing import Process import time class MyProcess(Process): def __init__(self, a, b): # 为了给子进程传递参数 super().__init__() self.a = a self.b = b def run(self): print("子进程开始执行") time.sleep(2) print("子进程结束", self.a,

Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信

目录 Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信 1.昨日回顾 2.僵尸进程和孤儿进程 2.1僵尸进程 2.2孤儿进程 2.3僵尸进程如何解决? 3.互斥锁,锁 3.1互斥锁的应用 3.2Lock与join的区别 4.进程之间的通信 进程在内存级别是隔离的 4.1基于文件通信 (抢票系统) 4.2基于队列通信 Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信 1.昨日回顾 1.创建进程的两种方式: 函数, 类. 2.pid: os.getpid() os.get