python并发编程基础之守护进程、队列、锁

并发编程2

1.守护进程

什么是守护进程?

表示进程A守护进程B,当被守护进程B结束后,进程A也就结束。

from multiprocessing import Process
import time
?
def task():
    print(‘妃子的一生‘)
    time.sleep(15)
    print(‘妃子死了‘)
?
if __name__ == ‘__main__‘:
    fz = Process(target=task)
    fz.daemon = True #将子进程作为主进程的守护进程。必须在开启之前设置
    fz.start()
    print(‘皇帝登基了‘)
    time.sleep(1)
    print(‘做了10年皇帝‘)
    time.sleep(5)
    print(‘皇帝驾崩了!‘)

上面的代码说明了什么叫守护进程,需要注意的是,守护进程须在进程开启前设置,就是改变了,本质其实就是改变了Process类里面的daemon属性,默认是False

应用场景:之所以开启子进程,是为了帮助主进程完成某个任务,然而主进程认为自己的事情一旦完成,子进程就没必要存在,那就可以将子进程设置未主进程的守护进程。

例如:QQ下载文件,QQ退出,文件就终止下载。

2、互斥锁

1.什么是锁(Lock)?

锁是multiprocessing模块中的一个类(Lock)。

2.为什么要有锁?

串行效率低,但是数据不会出现问题。并发效率高,但是数据可能出错。

当多进程共享一个数据时,可能造成数据错乱。

1.使用join方法可以让数据不会错乱,但是会让进程变为串行。效率不高。

2.使用lock可以将数据加锁,其他程序使用时需等待当前程序使用完成。lock只是使部分代码串行,其他部分还是并发。

3.互斥锁的示例

Lock类的两个方法acquire(锁)和release(释放),acquire将状态变为True,  release是将状态变为False

锁的本质就是一个bool类型,在执行前会先判断这个值。

def acquire(self, blocking=True, timeout=-1):
    pass
?
def release(self):
    pass
#锁的使用
#注意:使用锁的必须保证锁是同一个
from multiprocessing import Process,Lock
import random
import time
?
?
def task1(lock):
    lock.acquire()
    print(‘1111‘)
    time.sleep(random.randint(1, 3))
    print(‘11111111‘)
    lock.release()
def task2(lock):
    lock.acquire()
    print(‘222222‘)
    time.sleep(random.randint(1,2))
    print(‘2222222222222‘)
    lock.release()
def task3(lock):
    lock.acquire()
    print(‘3333333333‘)
    time.sleep(random.randint(1, 3))
    print(‘33333333333333333333333‘)
    lock.release()
?
if __name__ == ‘__main__‘:
    lock = Lock()
    p1 = Process(target=task1, args=(lock,))
    p1.start()
    p2 = Process(target=task2, args=(lock,))
    p2.start()
    p3 = Process(target=task3, args=(lock,))
    p3.start()

 ###输出结果除了1的位置固定,后两个不固定,但是如果出现了2,则必须运行完2
1111
11111111
3333333333
33333333333333333333333
222222
2222222222222
?

4.锁的应用

应用场景:购票软件的购票过程。

逻辑分析:1.用户都可以查看余票

2.但是其中一个用户发起请求购票,其他用户不能发起请求购票(锁状态)

import json
from multiprocessing import Process,Lock
import random
import time
#查看余票功能
def check_ticket(name):
    time.sleep(random.randint(1, 3))
    with open(‘ticket.json‘, mode=‘rt‘, encoding=‘utf-8‘)as f:
        dic = json.load(f)
        print(‘%s正在查看余票,  余票:%s 张‘ % (name, dic[‘count‘]))
#购票功能
def buy_ticket(name):
    with open(‘ticket.json‘, mode=‘rt‘, encoding=‘utf-8‘)as f:
        dic = json.load(f)
        if dic[‘count‘] > 0:
            time.sleep(random.randint(1, 3))
            dic[‘count‘] -= 1
            print(‘%s购票成功! 余票:%s张‘ % (name, dic[‘count‘]))
            with open(‘ticket.json‘, mode=‘wt‘, encoding=‘utf-8‘)as f:
                json.dump(dic, f)
#用户购票流程
def task(name,lock):
    check_ticket(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()

if __name__ == ‘__main__‘:
    lock = Lock()
    for i in range(1, 11):
        p = Process(target=task, args=(‘用户%s‘ % i, lock))
        p.start()

5.重入锁(RLock)

RLock类,可重入的锁。Lock类只能有一个acquire方法,RLock可以有多个,如果在多进程中使用Rlock,并且一个进程A执行了多次acquire,其他进程B想要获取锁,则需将所有的acquire释放可以获取。

from multiprocessing import RLock
lock = RLock()
lock.acquire()
lock.acquire()
lock.acquire()
print(‘haha‘)
lock.release()
lock.release()
lock.release()
#注意重入锁,锁几次就需要开几次

6.死锁

1.什么是死锁?

死锁是指锁无法打开,导致程序卡死。必须是多把锁才能形成死锁。正常开发时,要避免这种情况。通常一把锁可以解决大多数问题。

##以一个吃饭需要盘子l1 和筷子 l2 二者缺一不可来说明死锁
from multiprocessing import Process,Lock
import time
def task1(l1, l2, i):
    l1.acquire()
    print(‘盘子被%s抢走了‘% i)
    time.sleep(2)
?
    l2.acquire()
    print(‘筷子被%s抢走了‘% i)
    print(‘吃饭!‘)
    l1.release()
    l2.release()
?
def task2(l1, l2, i):
    l2.acquire()      ###task2先抢了筷子,导致task1无法完成后续动作
    print(‘筷子被%s抢走了‘ % i)
?
?
    l1.acquire()
    print(‘盘子被%s抢走了‘% i)
    print(‘吃饭!‘)
    l1.release()
    l2.release()
?
?
if __name__ == ‘__main__‘:
    l1 = Lock()
    l2 = Lock()
    Process(target=task1, args=(l1, l2, 1)).start()   #开启两个进程
    Process(target=task2, args=(l1, l2, 2)).start()

?

#在开发中要避免这种问题。如果有使用则应该注意将task2调整锁的位置。
def task2(l1, l2, i):
    l1.acquire()
    print(‘盘子被%s抢走了‘% i)
    l2.acquire()
    print(‘筷子被%s抢走了‘ % i)
    print(‘吃饭!‘)
    l1.release()
    l2.release()

7.ICP进程间通讯

由于进程间内存是相互独立的,所以需要对应积极的方案能够使得进程之间可以相互传递数据。

解决方案:

1.使用共享文件。多个进程同时读写同一个文件。但是IO速度慢,传输的数据大小不受限制。

2.管道。基于内存,速度块,但是是单向的,用起来麻烦。

3.队列。申请共享内存空间,多个进程可以共享这个内存区域。

#创建共享文件
?
from multiprocessing import Manager,Process,Lock
def work(d):
    # with lock:
        d[‘count‘]-=1
?
if __name__ == ‘__main__‘:
?
    with Manager() as m:
        dic=m.dict({‘count‘:100}) #创建一个共享的字典
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,))
            p_l.append(p)
            p.start()
?
        for p in p_l:
            p.join()
        print(dic)

8.队列

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。

队列也是一种数据容器,其特点:先进先出。

与堆栈相反:先进后出。  (函数的调用就是一种类似堆栈方式)

创建队列的类(底层就是以管道和锁定的方式实现):

1.Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递

2.maxsize是队列中允许最大项数,省略则无大小限制。

主要方法

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
? q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
? q.get_nowait():同q.get(False)
? q.put_nowait():同q.put(False)
? q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
? q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
? q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

队列的优点是:保证数据不会错乱,即使在多进程中。因为put和get都是阻塞

from multiprocessing import Queue
?
q = Queue(3)    #创建一个队列,队列最多存三个数据
q.put(‘ming‘)   #存数据
q.put(123)
print(q.get())  #put默认会阻塞,当队列装满时程序会卡住
print(q.get())
q.put(‘xian‘)   #
q.put(‘daidai‘,False)  #put第二个参数设置为False会强行将数据放入队列
q.put(‘j‘, True, timeout=3)    #timeout等待时间
?
?

9.生产者消费者模型

1.什么是生产者消费者模型?

生产者:生产数据的一方。

消费者:处理使用数据的一方。

例如:一个爬虫程序:1.爬去数据     2.解析数据

2.怎么实现?

1.将两个不同任务分配给不同的进程。

2.提供一个进程共享的数据容器

#模型示例:
from multiprocessing import Process, Queue
import time
import random
?
?
def get_data(q):
    for num in range(1, 6):
        print(‘正在爬取第%s个数据‘ % num)
        time.sleep(random.randint(1, 2))
        print(‘第%s个数据爬取完成。‘ % num)
        q.put(‘第%s个数据‘ % num)
?
?
def parse_data(q):
    for num in range(1, 6):
        data = q.get()
        print(‘正在解析%s‘ % data)
        time.sleep(random.randint(1, 2))
        print(‘%s解析完成。‘ % data)
if __name__ == ‘__main__‘:
    q = Queue(5)  # 创建一个可以存放5个数据的队列
    #生产者模型
    producer = Process(target=get_data, args=(q,))
    producer.start()
    #消费者模型
    consumer = Process(target=parse_data, args=(q,))
    consumer.start()
###输出结果
正在爬取第1个数据
第1个数据爬取完成。
正在爬取第2个数据
正在解析第1个数据
第2个数据爬取完成。
正在爬取第3个数据
第1个数据解析完成。
正在解析第2个数据
第3个数据爬取完成。
正在爬取第4个数据
第2个数据解析完成。
正在解析第3个数据
第3个数据解析完成。
第4个数据爬取完成。
正在爬取第5个数据
正在解析第4个数据
第5个数据爬取完成。
第4个数据解析完成。
正在解析第5个数据
第5个数据解析完成。
###可以看出两个模型之间互不干扰,相互独立。

原文地址:https://www.cnblogs.com/5j421/p/10198001.html

时间: 2024-08-08 08:14:29

python并发编程基础之守护进程、队列、锁的相关文章

Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信

目录 Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信 1.昨日回顾 2.僵尸进程和孤儿进程 2.1僵尸进程 2.2孤儿进程 2.3僵尸进程如何解决? 3.互斥锁,锁 3.1互斥锁的应用 3.2Lock与join的区别 4.进程之间的通信 进程在内存级别是隔离的 4.1基于文件通信 (抢票系统) 4.2基于队列通信 Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信 1.昨日回顾 1.创建进程的两种方式: 函数, 类. 2.pid: os.getpid() os.get

python网络编程基础(线程与进程、并行与并发、同步与异步)

python网络编程基础(线程与进程.并行与并发.同步与异步) 目录 线程与进程 并行与并发 同步与异步 线程与进程 进程 前言 进程的出现是为了更好的利用CPU资源使到并发成为可能. 假设有两个任务A和B,当A遇到IO操作,CPU默默的等待任务A读取完操作再去执行任务B,这样无疑是对CPU资源的极大的浪费.聪明的老大们就在想若在任务A读取数据时,让任务B执行,当任务A读取完数据后,再切换到任务A执行.注意关键字切换,自然是切换,那么这就涉及到了状态的保存,状态的恢复,加上任务A与任务B所需要的

python并发编程之多进程(二):互斥锁(同步锁)&进程其他属性&进程间通信(queue)&生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

python并发编程基础

一.多任务编程 1. 意义: 充分利用计算机多核资源,提高程序的运行效率. 2. 实现方案 :多进程 , 多线程 3. 并行与并发 并发 : 同时处理多个任务,内核在任务间不断的切换达到好像多个任务被同时执行的效果,实际每个时刻只有一个任务占有内核. 并行 : 多个任务利用计算机多核资源在同时执行,此时多个任务间为并行关系. 二.进程(process) 进程理论基础 1. 定义 : 程序在计算机中的一次运行. 1> 程序是一个可执行的文件,是静态的占有磁盘. 2> 进程是一个动态的过程描述,占

Python并发编程之线程池/进程池--concurrent.futures模块

h2 { color: #fff; background-color: #f7af0d; padding: 3px; margin: 10px 0px } 一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间.但从Python3.2开始,标准库为我们提供了conc

Python并发编程基础 №⑧ 并发完结篇:IO模型

1.详细介绍 为了更好地了解IO模型,我们需要事先回顾下:同步.异步.阻塞.非阻塞 同步(synchronous):就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列.要么成功都成功,失败都失败,两个任务的状态可以保持一致. 异步(asynchronous):是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了.至于被依赖的任务最终是否真正完成,依赖它的任务无法确定

python并发编程(管道,事件,信号量,进程池)

管道 Conn1,conn2 = Pipe() Conn1.recv() Conn1.send() 数据接收一次就没有了 from multiprocessing import Process,Pipe def f1(conn): from_zhujincheng = conn.recv() print('子进程') print('来自主进程的消息:',from_zhujincheng) if __name__ == '__main__': conn1,conn2 = Pipe() #创建一个管

python 并发编程 多线程 死锁现象与递归锁

一 死锁现象 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁 from threading import Thread from threading import Lock import time # 实例化两把不同的锁 mutexA = Lock() mutexB = Lock() class MyThread(Threa

python中并发编程基础1

并发编程基础概念 1.进程. 什么是进程? 正在运行的程序就是进程.程序只是代码. 什么是多道? 多道技术: 1.空间上的复用(内存).将内存分为几个部分,每个部分放入一个程序,这样同一时间在内存中就有了多道程序. 2.时间上的复用(CPU的分配).只有一个CPU,如果程序在运行过程中遇到了I/O阻塞或者运行时间足够长.操作系统会按照算法将CPU分配给其他程序使用,依次类推.直到第一个程序被重新分配到CPU会继续运行. 多道技术中的问题解决: 空间复用:程序之间的内存必须分割.这种分割需要在硬件