互斥锁与进程间通信

一、互斥锁

进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理

注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。

1、上厕所

先举个通俗易懂的例子,家里的厕所,你要上厕所进去后会先锁门,厕所门就相当于一个互斥锁,当你在里面的时候别人过来上厕所就只能在门口等。

from multiprocessing import Process,Lock
import os
import time
def work(mutex):
    mutex.acquire()  #上锁
    print(‘task[%s] 上厕所‘ %os.getpid())
    time.sleep(3)
    print(‘task[%s] 上完厕所‘ %os.getpid())
    mutex.release()  #开锁

if __name__ == ‘__main__‘:
    mutex=Lock()  #实例化(互斥锁)
    p1=Process(target=work,args=(mutex,))
    p2=Process(target=work,args=(mutex,))
    p3=Process(target=work,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()

    print(‘start...‘)

2、模拟抢票

#文件db的内容为:{"count":1}  #票数可以自己定义
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search() :  #查看票数
    dic=json.load(open(‘db.txt‘,))
    print(‘剩余票数%s‘ %dic[‘count‘])
def get_ticket() :  #购票
    dic = json.load(open(‘db.txt‘,))
    if dic[‘count‘] > 0 :
        dic[‘count‘] -= 1
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(‘%s 购票成功‘ %os.getpid())
def task(mutex) :  #购票流程
    search()
    time.sleep(random.randint(1, 3)) #模拟购票一系列繁琐的过程所花费的时间
    mutex.acquire()
    get_ticket()
    mutex.release()
if __name__ == ‘__main__‘ :
    mutex = Lock()
    for i in range(50):
        p = Process(target=task,args=(mutex,))
        p.start()

二、Process对象其他属性使用案例补充

1、deamon守护进程

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

ps:

from multiprocessing import Process
import os
import time
def work():
    print(‘%s is working‘ %os.getpid())
    time.sleep(10)
    print(‘%s is ending‘ % os.getpid())
if __name__ == ‘__main__‘:
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()
    time.sleep(2)
    print(‘start。。。‘)

2、join等待子进程

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

ps:

from multiprocessing import Process
import os
import time
def work():
    print(‘%s is working‘ %os.getpid())
    time.sleep(3)
if __name__ == ‘__main__‘:
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p3.join()
    p1.join()
    p2.join()
    print(‘基于初始化的结果来继续运行‘)

3、terminate,is_alive,name,pid

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.name:进程的名称
p.pid:进程的pid

ps:

from multiprocessing import Process
import os
import time
def work():
    print(‘%s is working‘ %os.getpid())
    time.sleep(3)
if __name__ == ‘__main__‘:
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p1.terminate()  #不建议使用
    print(p1.is_alive())
    #虽然已经强制终止进程了但是操作系统终止进程也需要时间所以此时还是True
    print(p1.name)  #如果没有起名默认Process-1后面的数字按子进程顺序排
    print(p2.name)
    print(p1.pid)  # p1.pid == os.getpid()
    print(‘基于初始化的结果来继续运行‘)

三、进程间通信

我们学习了通过使用共享的文件的方式,实现进程直接的共享,即共享数据的方式,这种方式必须考虑周全同步、锁等问题。而且文件是操作系统提供的抽象,可以作为进程直接通信的介质,与mutiprocess模块无关

但其实mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。IPC机制中的队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

1、进程间通信(IPC)方式一:队列(推荐使用)

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

可以往队列里放任意类型的数据

队列:先进先出

1)导入

from multiprocessing import Queue

2)实例化

q=Queue(3)  #3是队列中规定允许的最大项数,省略即不限大小

3)主要方法

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

4)其他方法(了解)

q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

5)应用

from multiprocessing import Process,Queue
#1:可以往队列里放任意类型的数据 2 队列:先进先出
q=Queue(3)
q.put(‘first‘)
q.put(‘second‘)
q.put(‘third‘)
# q.put(‘fourht‘)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.empty()) #空了

# q=Queue(3)
# q.put(‘first‘,block=False)
# q.put(‘second‘,block=False)
# q.put(‘third‘,block=False)
# q.put(‘fourth‘,block=True,timeout=3)

# q.get(block=False)
# q.get(block=True,timeout=3)

# q.get_nowait() #q.get(block=False)

6)生产消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print(‘%s 吃了 %s‘ % (os.getpid(), res))
def producer(q):
    for i in range(5):
        time.sleep(2)
        res=‘包子%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.put(None)
if __name__ == ‘__main__‘:
    q=Queue()
    #生产者们:厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:吃货们
    p2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(‘主‘)

生产者消费者模型

生产者消费者模型

7)创建队列的另外一个类

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

maxsize是队列中允许最大项数,省略则无大小限制。

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(‘%s 吃了 %s‘ % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(5):
        time.sleep(2)
        res=‘包子%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()

if __name__ == ‘__main__‘:
    q=JoinableQueue()
    #生产者们:厨师们
    p1=Process(target=product_baozi,args=(q,))

    #消费者们:吃货们
    p4=Process(target=consumer,args=(q,))
    p4.daemon=True

    p1.start()
    p4.start()

    p1.join()
    print(‘主‘)
    #p2结束了

生产者消费者模型2

生产者消费者模型2

from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(‘%s 吃了 %s‘ % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(3):
        time.sleep(2)
        res=‘包子%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()

def product_gutou(q):
    for i in range(3):
        time.sleep(2)
        res=‘骨头%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()

def product_ganshui(q):
    for i in range(3):
        time.sleep(2)
        res=‘泔水%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()
if __name__ == ‘__main__‘:
    q=JoinableQueue()
    #生产者们:厨师们
    p1=Process(target=product_baozi,args=(q,))
    p2=Process(target=product_gutou,args=(q,))
    p3=Process(target=product_ganshui,args=(q,))

    #消费者们:吃货们
    p4=Process(target=consumer,args=(q,))
    p5=Process(target=consumer,args=(q,))
    p4.daemon=True
    p5.daemon=True
    #设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    p_l=[p1,p2,p3,p4,p5]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()

    print(‘主‘)

生产者消费者模型3

生产者消费者模型3

2、进程间通信(IPC)方式二:管道(不推荐使用,了解即可)

3、 进程间通信方式三:共享数据(不推荐使用,了解即可)

时间: 2024-10-11 07:23:32

互斥锁与进程间通信的相关文章

31、互斥锁与进程间通信

我们之前做了多进程并发,那么你们有没有发现问题.如果说多个进程共享同一个数据,比如抢火车票大家同时在客户端查看同时购买会出现什么问题呢?今天我们将讲述进程锁还有进程间通信,进程之间彼此隔离,他们需要一个第三方联系起来. 一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全. 1.上厕所 先举个通俗易懂

python并发编程之多进程(二):互斥锁(同步锁)&进程其他属性&进程间通信(queue)&生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

条件变量、信号量、互斥锁

转载 http://blog.csdn.net/yusiguyuan/article/details/14161225 线程间的同步技术,主要以互斥锁和条件变量为主,条件变量和互斥所的配合使用可以很好的处理对于条件等待的线程间的同步问题.举个例子:当有两个变量x,y需要在多线程间同步并且学要根据他们之间的大小比较来启动不同的线程执行顺序,这便用到了条件变量这一技术.看代码 1 #include <iostream> 2 #include <pthread.h> 3 using na

linux驱动开发(十一)linux内核信号量、互斥锁、自旋锁

参考: http://www.360doc.com/content/12/0723/00/9298584_225900606.shtml http://www.cnblogs.com/biyeymyhjob/archive/2012/07/21/2602015.html http://blog.chinaunix.net/uid-25100840-id-3147086.html http://blog.csdn.net/u012719256/article/details/52670098 --

互斥锁,IPC队列

进程同步(锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,part1:共享同一打印终端,发现会有多行内容打印到一行的现象(多个进程共享并抢占同一个打印终端,乱了) #多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误) from multiprocessing import Process import time class Logger(Process): def __init__(self):

Linux环境编程之同步(一):互斥锁

同步的内容在<UNP2>里把它看作了进程间通信,我觉得其实同步只是进程间通信的一种协作方式一种协作的手段,不能称之为进程间通信的一种形式,所以标题用了"同步",而没有用IPC进程间通信. 互斥锁是同步的基本组成部分,它们总是用来同步一个进程内的各个线程的.如果互斥锁或条件变量存放在多个进程间共享的某个内存区,那么Posix还允许它用于这些进程间的同步. 互斥锁用于保护临界区以保证任何时刻只有一个线程在执行其中的代码,或者任何时刻只有一个进程在执行其中的代码.保护临界区的代码

线程同步机制之互斥锁

进程间通讯介绍 1.几种进程间的通信方式 # 管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用.进程的亲缘关系通常是指父子进程关系. # 有名管道 (named pipe) : 有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信. # 信号量( semophore ) : 信号量是一个计数器,可以用来控制多个进程对共享资源的访问.它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源.因此,主要作为进程间以及同一进

守护进程,模拟抢票例子,互斥锁,信号量,队列总结

 守护进程 主进程创建守护进程 其一:守护进程会在主进程代码执行结束后就终止 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止 # 守护进程 from multiprocessing import Process import os,time,random def task(): print('%s

多进程(了解),守护进程,互斥锁,信号量,进程Queue与线程queue

一.守护进程 主进程创建守护进程,守护进程的主要的特征为:①守护进程会在主进程代码执行结束时立即终止:②守护进程内无法继续再开子进程,否则会抛出异常. 实例: from multiprocessing import Process from threading import Thread import time def foo(): # 守护进程 print(123) time.sleep(1) print("end123") def bar(): print(456) time.sl