31、互斥锁与进程间通信

我们之前做了多进程并发,那么你们有没有发现问题。如果说多个进程共享同一个数据,比如抢火车票大家同时在客户端查看同时购买会出现什么问题呢?今天我们将讲述进程锁还有进程间通信,进程之间彼此隔离,他们需要一个第三方联系起来。

一、互斥锁

进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理

注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。

1、上厕所

先举个通俗易懂的例子,家里的厕所,你要上厕所进去后会先锁门,厕所门就相当于一个互斥锁,当你在里面的时候别人过来上厕所就只能在门口等。

from multiprocessing import Process,Lock
import os
import time
def work(mutex):
    mutex.acquire()  #上锁
    print(‘task[%s] 上厕所‘ %os.getpid())
    time.sleep(3)
    print(‘task[%s] 上完厕所‘ %os.getpid())
    mutex.release()  #开锁

if __name__ == ‘__main__‘:
    mutex=Lock()  #实例化(互斥锁)
    p1=Process(target=work,args=(mutex,))
    p2=Process(target=work,args=(mutex,))
    p3=Process(target=work,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()

    print(‘start...‘)

2、模拟抢票

#文件db的内容为:{"count":1}  #票数可以自己定义
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search() :  #查看票数
    dic=json.load(open(‘db.txt‘,))
    print(‘剩余票数%s‘ %dic[‘count‘])
def get_ticket() :  #购票
    dic = json.load(open(‘db.txt‘,))
    if dic[‘count‘] > 0 :
        dic[‘count‘] -= 1
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(‘%s 购票成功‘ %os.getpid())
def task(mutex) :  #购票流程
    search()
    time.sleep(random.randint(1, 3)) #模拟购票一系列繁琐的过程所花费的时间
    mutex.acquire()
    get_ticket()
    mutex.release()
if __name__ == ‘__main__‘ :
    mutex = Lock()
    for i in range(50):
        p = Process(target=task,args=(mutex,))
        p.start()


二、Process对象其他属性使用案例补充

1、deamon守护进程

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

ps:

from multiprocessing import Process
import os
import time
def work():
    print(‘%s is working‘ %os.getpid())
    time.sleep(10)
    print(‘%s is ending‘ % os.getpid())
if __name__ == ‘__main__‘:
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()
    time.sleep(2)
    print(‘start。。。‘)

2、join等待子进程

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

ps:

from multiprocessing import Process
import os
import time
def work():
    print(‘%s is working‘ %os.getpid())
    time.sleep(3)
if __name__ == ‘__main__‘:
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p3.join()
    p1.join()
    p2.join()
    print(‘基于初始化的结果来继续运行‘)

3、terminate,is_alive,name,pid

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.name:进程的名称
p.pid:进程的pid

ps:

from multiprocessing import Process
import os
import time
def work():
    print(‘%s is working‘ %os.getpid())
    time.sleep(3)
if __name__ == ‘__main__‘:
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p1.terminate()  #不建议使用
    print(p1.is_alive())
    #虽然已经强制终止进程了但是操作系统终止进程也需要时间所以此时还是True
    print(p1.name)  #如果没有起名默认Process-1后面的数字按子进程顺序排
    print(p2.name)
    print(p1.pid)  # p1.pid == os.getpid()
    print(‘基于初始化的结果来继续运行‘)


三、进程间通信

我们学习了通过使用共享的文件的方式,实现进程直接的共享,即共享数据的方式,这种方式必须考虑周全同步、锁等问题。而且文件是操作系统提供的抽象,可以作为进程直接通信的介质,与mutiprocess模块无关

但其实mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。IPC机制中的队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

1、进程间通信(IPC)方式一:队列(推荐使用)

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

可以往队列里放任意类型的数据

队列:先进先出

1)导入

from multiprocessing import Queue

2)实例化

q=Queue(3)  #3是队列中规定允许的最大项数,省略即不限大小

3)主要方法

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

4)其他方法(了解)

q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

5)应用

from multiprocessing import Process,Queue
#1:可以往队列里放任意类型的数据 2 队列:先进先出
q=Queue(3)
q.put(‘first‘)
q.put(‘second‘)
q.put(‘third‘)
# q.put(‘fourht‘)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.empty()) #空了

# q=Queue(3)
# q.put(‘first‘,block=False)
# q.put(‘second‘,block=False)
# q.put(‘third‘,block=False)
# q.put(‘fourth‘,block=True,timeout=3)

# q.get(block=False)
# q.get(block=True,timeout=3)

# q.get_nowait() #q.get(block=False)

6)生产消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。

基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print(‘%s 吃了 %s‘ % (os.getpid(), res))
def producer(q):
    for i in range(5):
        time.sleep(2)
        res=‘包子%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.put(None)
if __name__ == ‘__main__‘:
    q=Queue()
    #生产者们:厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:吃货们
    p2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(‘主‘)

生产者消费者模型

7)创建队列的另外一个类

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

maxsize是队列中允许最大项数,省略则无大小限制。

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(‘%s 吃了 %s‘ % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(5):
        time.sleep(2)
        res=‘包子%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()

if __name__ == ‘__main__‘:
    q=JoinableQueue()
    #生产者们:厨师们
    p1=Process(target=product_baozi,args=(q,))

    #消费者们:吃货们
    p4=Process(target=consumer,args=(q,))
    p4.daemon=True

    p1.start()
    p4.start()

    p1.join()
    print(‘主‘)
    #p2结束了

生产者消费者模型2

from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(‘%s 吃了 %s‘ % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(3):
        time.sleep(2)
        res=‘包子%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()

def product_gutou(q):
    for i in range(3):
        time.sleep(2)
        res=‘骨头%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()

def product_ganshui(q):
    for i in range(3):
        time.sleep(2)
        res=‘泔水%s‘ %i
        q.put(res)
        print(‘%s 制造了 %s‘ %(os.getpid(),res))
    q.join()
if __name__ == ‘__main__‘:
    q=JoinableQueue()
    #生产者们:厨师们
    p1=Process(target=product_baozi,args=(q,))
    p2=Process(target=product_gutou,args=(q,))
    p3=Process(target=product_ganshui,args=(q,))

    #消费者们:吃货们
    p4=Process(target=consumer,args=(q,))
    p5=Process(target=consumer,args=(q,))
    p4.daemon=True
    p5.daemon=True
    #设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    p_l=[p1,p2,p3,p4,p5]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()

    print(‘主‘)

生产者消费者模型3

2、进程间通信(IPC)方式二:管道(不推荐使用,了解即可)

3、 进程间通信方式三:共享数据(不推荐使用,了解即可)

(对这两种方式了解不深,有兴趣的可以自己搜索相关文章)

时间: 2024-12-28 11:51:04

31、互斥锁与进程间通信的相关文章

互斥锁与进程间通信

一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全. 1.上厕所 先举个通俗易懂的例子,家里的厕所,你要上厕所进去后会先锁门,厕所门就相当于一个互斥锁,当你在里面的时候别人过来上厕所就只能在门口等. from multiprocessing import Process,Lock import os

互斥锁不在一个线程内引发的问题

本实验创建了3个进程,为了更好的描述线程之间的并行执行, 让3个线程共用同一个执行函数.每个线程都有5次循环(可以看成5个小任务), 每次循环之间会随机等待1-10s的时间,意义在于模拟每个任务的到达时间是随机的,并没有任何特定的规律.使用互斥锁mutex完成互斥访问 1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 5 #define THREAD_NUMBER 3 //线程数 6

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

条件变量、信号量、互斥锁

转载 http://blog.csdn.net/yusiguyuan/article/details/14161225 线程间的同步技术,主要以互斥锁和条件变量为主,条件变量和互斥所的配合使用可以很好的处理对于条件等待的线程间的同步问题.举个例子:当有两个变量x,y需要在多线程间同步并且学要根据他们之间的大小比较来启动不同的线程执行顺序,这便用到了条件变量这一技术.看代码 1 #include <iostream> 2 #include <pthread.h> 3 using na

线程通信(一)&mdash;&mdash; 互斥锁

在使用线程时,经常要注意的就是访问临界资源加锁. 在编码过程由于粗心忘记加锁将带来不可预知的错误.这类错误单次运行或小并发时难以复现,当数据量变大,用户数增多时,轻则系统崩溃,大则引起数据错误.造成损失. 线程中互斥锁与进程的信号量类似,也可以看做是PV操作,用于保护临界资源,确保只有一个线程访问. 下面代码是不加锁错误代码,其中也涉及到之前提到的线程编程时需要注意的一些小细节. 1 #include <pthread.h> 2 #include <unistd.h> 3 #inc

线程的互斥锁和条件变量通信机制

1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <time.h> 4 #include <pthread.h> 5 6 #define BUFFER_SIZE 2 7 struct prodcons 8 { 9 int buffer[BUFFER_SIZE]; 10 pthread_mutex_t lock; 11 int readpos,writepos; 12 pthread_cond_t no

Linux线程同步---互斥锁

线程中互斥锁使用的步骤与信号量相似! 1.首先定义互斥锁变量,并初始化 pthread_mutex_t mutex_lock;pthread_mutex_init(&mutex_lock,NULL);2.在操作前对互斥量进行加锁操作 pthread_mutex_lock(&mutex_lock);3.操作完毕后进行解锁操作 pthread_mutex_unlock(&mutex_lock); 所有操作均在加锁和解锁操作之间进行,保证同时仅仅对有一个操作对关键变量或是区域进行操作.

linux驱动开发(十一)linux内核信号量、互斥锁、自旋锁

参考: http://www.360doc.com/content/12/0723/00/9298584_225900606.shtml http://www.cnblogs.com/biyeymyhjob/archive/2012/07/21/2602015.html http://blog.chinaunix.net/uid-25100840-id-3147086.html http://blog.csdn.net/u012719256/article/details/52670098 --

互斥锁,IPC队列

进程同步(锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,part1:共享同一打印终端,发现会有多行内容打印到一行的现象(多个进程共享并抢占同一个打印终端,乱了) #多进程共享一个打印终端(用python2测试看两个进程同时往一个终端打印,出现打印到一行的错误) from multiprocessing import Process import time class Logger(Process): def __init__(self):