Python并发编程之进程通信


'''
进程间的通信
'''

"""
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,
"""
from multiprocessing import Queue
q = Queue(3)

#put, get, put_nowait, get_nowait, full, empty
q.put(1)
q.put(2)
q.put(3)
# q.put(1)#队列已经满了,再加程序就会一直停在这里,等待数据被别人取走,不取走就一直停在这
## q.get(1)#可以先取出一个,然后再加就可以了
## q.put(1)
# try:
#     q.put_nowait(1) #使用这个绑定方法,队列满了不会阻塞,但是会因为队列满了而报错。
# except:
#     print('队列已经满了') #加了try之后,不阻塞,但是消息会丢
print(q.full())#查看队列是否满了
print(q.get())
print(q.get())
print(q.get())

# q.get(1)#同put 方法一样,队列空了继续取就会出现阻塞。

#和上面方法类似
try:
    q.get_nowait()
except:
    print('队列已经空了')

print(q.empty())

# 子进程数据给父进程
import time
from multiprocessing import Process, Queue

def  f(q):
    q.put([time.asctime(), 'from Eva', 'hello'])

if __name__ == '__main__':
    q = Queue() #创建一个Queue对象
    p = Process(target=f, args=(q, )) #创建一个进程
    p.start()
    print(q.get())
    p.join()

#
# #批量生产数据放入队列再批量获取结果
import os
import time
import multiprocessing

#向queue中输入数据的函数
def inputQ(queue):
    info = str(os.getpid()) + '(put): ' + str(time.asctime())
    queue.put(info)

#向queue中输出数据的函数
def outputQ(queue):
    info = queue.get()
    print(f'{str(os.getpid())} (get): {info}')

#Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = [] #store input processes
    record2 = [] #store output processes
    queue = multiprocessing.Queue(3)

    #输入进程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ, args=(queue, ))
        process.start()
        record1.append(process)

    #输出进程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args = (queue, ))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

import os
import time
import multiprocessing

#向队列中输入数据
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

#向队列中输出数据
def outputQ(queue):
    info = queue.get()
    print(f'{str(os.getpid())} (get: ) {info}')

if __name__ == '__main__':
    multiprocessing.freeze_support()
    p1_list = []
    p2_list = []
    q = multiprocessing.Queue(3)

    for i in range(10):
        p = multiprocessing.Process(target=inputQ, args=(q, ))
        p.start()
        p1_list.append(p)
    for i in range(10):
        p = multiprocessing.Process(target=outputQ, args=(q, ))
        p.start()
        p2_list.append(p)

    for k in p1_list:
        k.join()

    for j in p2_list:
        j.join()

#基于队列实现生产者和消费者模型(生产者生产出来放到阻塞队列里,消费者直接从阻塞队列中取需要的东西)
from multiprocessing import Process, Queue #导入进程和队列模块
import time, random,os#导入时间、随机数和os模块
def consume(q):#定义消费者函数
    while True: #循环消费
        res = q.get()  #从队列中取东西
        time.sleep(random.randint(1, 3)) #随机睡几秒
        print(f'{str(os.getpid())} 吃 {res}') #打印出来
def producer(q): #定义生产者函数
    for i in range(10): #生产10个包子
        time.sleep(random.randint(1, 3)) #随机睡几秒
        res = f'包子{i}' #生产包子标记下来
        q.put(res) #把包子放到阻塞队列里面
        print(f'生产了{(os.getpid(), res)}') #打印东西
if __name__ == '__main__':
    q = Queue() #阻塞队列
    #生产者们:即厨师们
    p1 = Process(target=producer, args=(q, ))

    #消费者们:即吃货们
    c1 = Process(target=consume, args=(q, ))

    #开始
    p1.start()
    c1.start()
    print('主')

#生产者和消费者(改良版)
'''
上面的版本主进程永远不会结束,原因是:生产者p在生产完后就结束了,
但是消费者c在取空了q之后,一直处于死循环卡在q.get这一步

解决方法是生产者生产完成之后,网队列中组发一个结束信号,这样消费者在接收到结束
信号后就可以break出死循环

注意:结束信号None,不一定要由生产者发,主进程里面同样可以发,但主进程需要
等生产者结束后才应该发送该信号
'''
import os, random, time
from multiprocessing import Process, Queue

#定义消费者函数
def consume(queue):
    while True:
        res = queue.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1, 3))
        print(f'{str(os.getpid())} 吃了 {res}')

#定义生产者函数
def producer(queue):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = f'包子{i}'
        queue.put(res)
        print(f'{os.getpid()}生产了{res}')
    # queue.put(None) #生产者发送结束信号

if __name__ == '__main__':
    q = Queue(3)

    #生产者
    p = Process(target=producer, args=(q, ))

    #消费者
    q1 = Process(target=consume, args=(q, ))

    p.start()
    q1.start()

    p.join()
    q.put(None) #主进程里面发送结束信号
    q1.join() #可加可不加

    print('主线程')

# 多个消费者就需要发送多个None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('生产了 %s %s' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    # 必须保证生产者全部生产完毕,才应该发送结束信号
    p1.join()
    p2.join()
    p3.join()

    # 有几个消费者就应该发送几次结束信号None
    q.put(None)
    q.put(None) #发送结束信号
    print('主')

#JoinableQueue队列实现消费者与生产者模型
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))
        #向q.join()发送一次信号,证明一个数据已经被取走了
        q.task_done()

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('生产了 %s %s' %(os.getpid(),res))
    q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理

if __name__ == '__main__':
    q=JoinableQueue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True #设置成守护进程
    c2.daemon=True #设置成守护进程

    #开始
    p_1 = [p1, p2, p3, c1, c2]
    for p in p_1:
        p.start()
    # 必须保证生产者全部生产完毕,才应该发送结束信号
    p1.join()
    p2.join()
    p3.join()
    print('主')
'''
主进程等--->p1, p2, p3等--->c1, c2
p1, p2, p3结束了,证明c1, c2肯定全部收完了p1, p2, p3发到队列的数据
因而c1, c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了,
应该随着主进程的结束而结束,所以设置成守护进程就可以了
'''

原文地址:https://www.cnblogs.com/michealjy/p/11523507.html

时间: 2024-08-28 04:41:57

Python并发编程之进程通信的相关文章

Python并发编程之进程2

引言 本篇介绍Python并发编程下的进程,先介绍进程的相关知识,然后对python中multiprocessing模块进行介绍(Process.Pipe.Queue以及 Lock). 进程(process) 在面向线程设计的系统(如当代多数操作系统.Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器. 进程拥有自己独立的内存空间,所属线程可以访问进程的空间. 程序本身只是指令.数据及其组织形式的描述,进程才是程序的真正运行实例. 例如,我们在PyCharm开发环境中写

Python并发编程之进程

一.理论概念 1.定义 进程(Process 也可以称为重量级进程)是程序的一次执行.在每个进程中都有自己的地址空间.内存.数据栈以及记录运行的辅助数据,它是系统进行资源分配和调度的一个独立单位. 2.并行和并发 并行:并行是指多个任务同一时间执行: 并发:是指在资源有限的情况下,两个任务相互交替着使用资源: 3.同步和异常 同步是指多个任务在执行时有一个先后的顺序,必须是一个任务执行完成另外一个任务才能执行: 异步是指多个任务在执行时没有先后顺序,多个任务可以同时执行: 4.同步/异步/阻塞/

python并发编程之进程池,线程池concurrent.futures

进程池与线程池 在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多, 这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途, 例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制 Python--concurrent.fu

python并发编程之进程池,线程池

要注意一下不能无限的开进程,不能无限的开线程最常用的就是开进程池,开线程池.其中回调函数非常重要回调函数其实可以作为一种编程思想,谁好了谁就去掉只要你用并发,就会有锁的问题,但是你不能一直去自己加锁吧那么我们就用QUEUE,这样还解决了自动加锁的问题由Queue延伸出的一个点也非常重要的概念.以后写程序也会用到这个思想.就是生产者与消费者问题 一.Python标准模块--concurrent.futures(并发未来) concurent.future模块需要了解的1.concurent.fut

python并发编程(进程操作)

一. multiprocess模块 仔细说来,multiprocess不是一个模块而是python中一个操作.管理进程的包. 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块.由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享. process模块介绍 process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建. 二. 使用process模块创建进程

Python网络编程(进程通信、信号、线程锁、多线程)

什么是进程通讯的信号? 用过Windows的我们都知道,当我们无法正常结束一个程序时, 可以用任务管理器强制结束这个进程,但这其实是怎么实现的呢? 同样的功能在Linux上是通过生成信号和捕获信号来实现的, 运行中的进程捕获到这个信号然后作出一定的操作并最终被终止. 信号是UNIX和Linux系统响应某些条件而产生的一个事件, 接收到该信号的进程会相应地采取一些行动.通常信号是由一个错误产生的. 但它们还可以作为进程间通信或修改行为的一种方式, 明确地由一个进程发送给另一个进程.一个信号的产生叫

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

python并发编程(守护进程,进程锁,进程队列)

进程的其他方法 P = Process(target=f,) P.Pid 查看进程号  查看进程的名字p.name P.is_alive()  返回一个true或者False P.terminate()  给操作系统发送一个结束进程的信号 验证进程之间是空间隔离的 from multiprocessing import Process num = 100 def f1(): global num num = 3 print(num) # 结果 3 if __name__ == '__main__

python 并发编程 查看进程的pid与ppid

查看进程id pid 不需要传参数 from multiprocessing import Process import time import os def task(): print("%s is running" % os.getpid()) time.sleep(3) print("%s is done" % os.getpid()) if __name__ == "__main__": t = Process(target=task,