35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

守护进程

进程:一个正在运行的程序。

主进程创建守护进程:

1.守护进程会在主进程代码执行结束后就终止,

2.守护进程内无法再开启子进程,否则抛出异常。

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。

例子:from multiprocessing import Processimport time

def task():    print(‘老了。。。。‘)    time.sleep(2)    print(‘睡了一会。。‘)

if __name__ == ‘__main__‘:    print(‘长大了。。。。‘)    p=Process(target=task)    p.daemon=True    #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行    p.start()    time.sleep(2)    print(‘关闭了。。。。‘)

    p.terminate()

使用场景:父进程交给子进程一个任务,任务还没有完成父进程就结束了,此时子进程就没意义了。互斥锁互斥锁:   互相排斥的锁(如果这个资源已经被锁了,其他进程就无法使用了)需要强调的是: 锁 并不是真的把资源锁起来了,只是在代码层面限制你的代码不能执行。使用场景:    并发将带来资源的竞争问题,当多个进程同时要操作同一个资源时,将会导致数据错乱的问题。

解决方案1:

? 加join,

例子:from multiprocessing import Processimport time

def task1():    print(‘你好,阿森。。。‘)    time.sleep(3)    print(‘吃饭‘)    time.sleep(3)    print(‘下雨‘)

def task2():    print(‘你好,阿三。。。‘)    time.sleep(3)    print(‘吃面‘)    time.sleep(3)    print(‘下雪‘)

def task3():    print(‘你好,阿四。。。‘)    time.sleep(3)    print(‘吃米‘)    time.sleep(3)    print(‘下冰雹‘)

if __name__ == ‘__main__‘:    p1 = Process(target=task1)    p2 = Process(target=task2)    p3 = Process(target=task3)

    p1.start()    p1.join()    p2.start()    p2.join()    p3.start()    p3.join()

小知识点:

andom.randint(a,b)用于生成一个指定范围内的整数。其中参数a是下限,参数b是上限,生成的随机数n: a <= n <= b。

? 弊端: 1.把原本并发的任务变成了穿行,避免了数据错乱问题,但是效率降低了,这样就没必要开子进程了。

?        2.原本多个进程之间是公平竞争,join执行的顺序就定死了,这是不合理的。

解决方案2:

? 就是给公共资源加 锁----互斥锁。

例子:

from multiprocessing import Process,Lockimport time,random
def task1(lock):    # 上锁    lock.acquire()      #就等同于一个if判断    print(‘你好,阿森。。。‘)    time.sleep(random.randint(0,3))    print(‘吃饭‘)    time.sleep(random.randint(0, 3))    print(‘下雨‘)    #解锁    lock.release()

def task2(lock):    lock.acquire()    print(‘你好,阿三。。。‘)    time.sleep(random.randint(0, 3))    print(‘吃面‘)    time.sleep(random.randint(0, 3))    print(‘下雪‘)    lock.release()

def task3(lock):    lock.acquire()    print(‘你好,阿四。。。‘)    time.sleep(random.randint(0, 3))    print(‘吃米‘)    time.sleep(random.randint(0, 3))    print(‘下冰雹‘)    lock.release()

if __name__ == ‘__main__‘:    lock=Lock()    p1=Process(target=task1,args=(lock,))    p2=Process(target=task2,args=(lock,))    p3=Process(target=task3,args=(lock,))

    p1.start()    p2.start()    p3.start()
# 注意1:  不要对同一把执行多出acquire 会锁死导致程序无法执行  一次acquire必须对应一次release

from multiprocessing import Lockl=Lock()l.acquire()print(‘抢到了‘)l.release()

l.acquire()print(‘接着抢‘)

# 注意2:想要保住数据安全,必须保住所有进程使用同一把锁

锁和join的区别:

1.  join是固定了执行顺序,会造成父进程等待子进程完成后完成

?    锁是公平竞争谁先抢到谁先执行,父进程可以做其他事情、

2.  join是把进程的任务全部串行

?   锁可以锁任意代码 ,一行也可以 可以自己调整粒度

粒度:

粒度越大意味着锁住的代码越多 效率越低
粒度越小意味着锁住的代码越少 效率越高

MYSQL 中不同隔离级别 其实就是不同的粒度

小练习----------抢票
"""过程:1.查看余票,2.有就买,无失败 os.getpid()获取当前进程id
#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
"""from multiprocessing import Process, Lockimport json, random, time, os

def check_ticket():    with open(‘ticket.json‘, ‘rt‘, encoding=‘utf-8‘)as a:        data = json.load(a)        time.sleep(random.randint(0, 3))        print(‘%s正在查票,票还有%s‘ % (os.getpid(), data["count"]))

def buy_ticket():    with open(‘ticket.json‘, ‘rt‘, encoding=‘utf-8‘)as a:        data = json.load(a)        if data["count"] > 0:            data["count"] -= 1            # 模拟延迟            time.sleep(random.randint(0, 3))            with open(‘ticket.json‘, ‘wt‘, encoding=‘utf-8‘)as a:                json.dump(data, a)                print(‘%s恭喜你抢票成功‘ % os.getpid())

        else:            print(‘%s抢票失败,开个加速包试一试‘ % os.getpid())

def task(lock):    check_ticket()    lock.acquire()    buy_ticket()    lock.release()

if __name__ == ‘__main__‘:

    lock = Lock()    # 三个人抢票    for i in range(10):        p = Process(target=task, args=(lock,))        p.start()

IPC进程间通讯

通讯指的就是交换数据

? 进程之间内存是相互隔离的,当一个进程想要把数据给另外一个进程,就需要考虑IPC

方式:

? 管道: 只能单向通讯,数据都是二进制

? 文件: 在硬盘上创建共享文件

? 缺点:速度慢

? 优点:数据量几乎没有限制

? socket:

? 编程复杂度较高

? 共享内存:必须由操作系统来分配 要掌握的方式*****

? 优点: 速度快

? 缺点: 数据量不能太大

共享内存的方式

? Manager提供很多数据结构 list dict等等

? Manager所创建出来的数据结构,具备进程间共享的特点

from multiprocessing import Process, Manager, Lock

import time

# 传参给父进程def task(data, l):    # 上锁    l.acquire()    num = data[‘num‘]    time.sleep(0.1)    # 时间过长,子进程就会死掉    data[‘num‘] = num - 1    # 解锁    l.release()

if __name__ == ‘__main__‘:    # 让Manager开启一个共享的字典    m = Manager()    data = m.dict({‘num‘: 10})

    l = Lock()    for i in range(10):        p = Process(target=task, args=(data, l))        p.start()  # 创建子进程

    time.sleep(2)    print(data)
Queue队列   帮我们处理了锁的问题 

队列是一种特殊的数据结构,先存储的先取出 就像排队 先进先出


? 相反的是堆栈,先存储的后取出, 就像衣柜 桶装薯片 先进后出


? 扩展:


? 函数嵌套调用时 执行顺序是先进后出 也称之为函数栈


? 调用 函数时 函数入栈 函数结束就出栈

from multiprocessing import Queue

# 创建队列创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递# maxsize是队列中允许最大项数,省略则无大小限制。# Queue({maxsize}):q=Queue(3)#  存储元素q.put(‘123‘)q.put(‘qwe‘)q.put(‘利物浦‘)

print(q.get())q.put(‘句柄hi‘)# 如果容量已经满了,在调用put时将进入阻塞状态 直到有人从队列中拿走数据有空位置 才会继续执行

#取出元素print(q.get())print(q.get())print(q.get())

q.get(block=True,timeout=2)
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
看一下:3 q.get_nowait():同q.get(False)
4 q.put_nowait():同q.put(False)
5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
8 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
9 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
10. q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
生产者消费者模型

? 模型 就是解决某个问题套路

? 产生数据的一方称之为生产者

? 处理数据的一方称之为消费者

  什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

解决的方案:

? 将双方分开来.一专门负责生成,一方专门负责处理

? 这样一来数据就不能直接交互了 双方需要一个共同的容器

? 生产者完成后放入容器,消费者从容器中取出数据

? 这样就解决了双发能力不平衡的问题,做的快的一方可以继续做,不需要等待另一方

基于队列实现生产者消费者模型

from multiprocessing import Process, Queueimport time, random

def make_rose(q):    for i in range(6):        print(‘%s个青椒炒鸡蛋制作完成‘ % i)        rose = ‘%s个青椒炒鸡蛋‘ % i        # 将生成完成的数据放入队列中        q.put(rose)

def eat(q):    for i in range(6):        rose = q.get()        time.sleep(random.randint(0, 3))        print(rose, ‘吃完了‘)

if __name__ == ‘__main__‘:    q = Queue()    make_p = Process(target=make_rose, args=(q,))    eat_p = Process(target=eat, args=(q,))

    make_p.start()    eat_p.start()
 #引入生产者消费者模型为了解决的问题是:
        平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

#如何实现:
        生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和

 


 
 


原文地址:https://www.cnblogs.com/komorebi/p/10969980.html

时间: 2024-12-12 16:42:49

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型的相关文章

进程 &gt;&gt; 互斥锁、队列与管道、生产者消费者模型

目录 1.互斥锁 2.队列与管道 3.生产者消费者模型(Queue) 4.生产者消费者模型(JoinableQueue) 1.互斥锁 首先导入Lock模块 实例化一把锁 但是每次生成子进程的时候都会重新实例化一把锁,我们的目的是想让所有的子进程使用同一把锁,所以需要把锁传递给子进程在使用 锁名.acquire():开锁->所有子进程开始抢位置 锁名.release():关锁->位置排好了,开始执锁起来执行. join与互斥锁的区别:join是把所有的子进程代码变为串行的,而互斥锁则可以规定那几

开启子进程的两种方式,孤儿进程与僵尸进程,守护进程,互斥锁,IPC机制,生产者与消费者模型

开启子进程的两种方式 # # # 方式一: # from multiprocessing import Process # import time # # def task(x): # print('%s is running' %x) # time.sleep(3) # print('%s is done' %x) # # if __name__ == '__main__': # # Process(target=task,kwargs={'x':'子进程'}) # p=Process(tar

守护进程,互斥锁,IPC,队列,生产者与消费者模型

小知识点:在子进程中不能使用input输入! 一.守护进程 守护进程表示一个进程b 守护另一个进程a 当被守护的进程结束后,那么守护进程b也跟着结束了 应用场景:之所以开子进程,是为了帮助主进程完成某个任务,然而,如果主进程认为自己的事情一旦做完了就没有必要使用子进程了,就可以将子进程设置为守护进程 例如:在运行qq的过程,开启一个进程,用于下载文件,然而文件还没有下载完毕,qq就退出了,下载任务也应该跟随qq的退出而结束. from multiprocessing import Process

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

操作系统发展史(主要的几个阶段) 初始系统 1946年第一台计算机诞生,采用手工操作的方式(用穿孔卡片操作) 同一个房间同一时刻只能运行一个程序,效率极低(操作一两个小时,CPU一两秒可能就运算完了) 联机批处理系统 脱机批处理系统 多道程序系统 1.空间上的复用 ? 多个程序公用一套计算机硬件 2.时间上的复用 ? 切换+保存状态 ? 保存状态:保存当前的运行状态,下次接着该状态继续执行 ? 切换的两种情况 ? (1) 当一个程序遇到 I/O 操作(不需要使用CPU),操作系统会剥夺该程序的C

线程同步与互斥(POSIX信号量——环形数组实现生产者消费者模型)

Semaphore(信号量) Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源,加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可用资源. 信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1.如果信号量描述的资源数目是1时,此时的信号量和互斥锁相同! POSIX semaphore库函数,这种信号量不仅可用于同一进程的线程间同步,

python—day29 守护进程、互斥锁、模拟抢票、IPC通信机制、生产者消费者模型

1.守护进程: 什么是守护进程,假如你是皇帝,每日每夜守护你的就是太监,守护进程就相当于太监,当皇帝驾崩后太监也需要陪葬,所以守护进程当父进程销毁时就一起销毁: 1 from multiprocessing import Process 2 3 import time 4 5 def task(name): 6 7 time.sleep(0.5) 8 print('%s' %name) 9 10 11 if __name__ == '__main__': 12 p = Process(targe

进程-IPC 共享内存和消息队列 (三)

详见:https://github.com/ZhangzheBJUT/linux/blob/master/IPC(%E4%B8%89).md 五 共享内存 5.1. 共享内存简介 共享内存指多个进程共享同一块物理内存,它只能用于同一台机器上的两个进程之间的通信.在进程的逻辑地址空间中有一段地址范围是用来进行内存映射使用的,该段逻辑地址空间可以映射到共享的物理内存地址上(进程空间介绍:http://blog.csdn.net/zhangzhebjut/article/details/3906025

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('