11.python并发入门(part11 进程同步锁,以及进程池,以及callback的概念)

一、关于进程锁。

其实关于进程锁没啥好讲的了,作用跟线程的互斥锁(又叫全局锁也叫同步锁)作用几乎是一样的。

都是用来给公共资源上锁,进行数据保护的。

当一个进程想去操作一个公共资源,它就可以给公共资源进程“上锁”的操作,其他进程如果也想去访问或者操作这个公共资源,那么其他的进程只能阻塞,等待刚刚的进程把锁释放,下一个进程才可以对这个公共资源进行操作。

下面是个关于进程锁的使用示范:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import multiprocessing

def func1(lock_1,name):

lock_1.acquire()

print "hello %s" %(name)  #当执行到这句话的时候,就不再是并行了,和线程中锁的概念是一样的。

lock_1.release()

if __name__ == ‘__main__‘:

lock = multiprocessing.Lock()

for num in range(10):

multiprocessing.Process(target=func1,args=(lock,num)).start()

输出结果:

hello 0

hello 1

hello 2

hello 3

hello 4

hello 5

hello 6

hello 7

hello 8

hello 9

!!补充下,其实在进程中,也可以使用RLock递归锁,信号量,event等各种锁,用法基本和多线程是一样的。

二、关于进程池。

什么是进程池呢?

进程池就是程序中维护的一个进程序列,当需要使用进程的时候,就会从进程池中获取一个进程,当进程池中不再有进程的时候,程序就会阻塞,一直等到有新的进程加入到进程池为止。

那么进程池有什么好处?可以看看下面这个例子。

假如说有个函数,这个函数的内容需要被执行100次,才能完成我们想要完成的任务。

我们假定运行一次这个函数要10秒钟,如果使用单进程单线程的模式串行执行,执行这个函数100次要花费1000秒的时间,效率非常低!

那么多线程呢?我们可以同时开100个线程,每个线程去执行一次这个函数,也很快可以执行结束,但是前提是,这个函数所执行的操作必须属于I/O密集型,这样才可以发挥出多线程的高效率,如果这个函数所执行的操作是计算密集型的呢?(如果计算密集型的任务交给多线程去做,反而可能会降低效率,这是因为全局解释器锁的特性造成的)那我们就需要去考虑去开多进程了。

好,那接下来我们用多线程的方式去处理这个问题。

这个函数既然要执行100次,那么我干脆去开100个进程,让这100个函数并行去执行这个函数。

这样确实可以大大提高效率,但是,无论是开100个进程也好,或者说是开100个线程也好,这样做开销实在是太大了!!!同一个进程里面的多个线程还好,数据资源在多个进程之间都是共享的,线程和线程之间每次切换,值需要保存当前的执行状态就可以了,但是线程呢,线程和线程之间的数据都是独立的,多个进程要想实现数据共享,就要给每个进程复制一份数据资源,开100个线程,数据资源就要被复制100份!开销实在太大了!

其实上面的这些方法都不太好,但是可以折中考虑,创建一个线程池。

接下来我们来分析下进程池的好处。

假设说,我们创建了一个进程池,创建这个进程池的时候,指定这个进程池中最多可以容纳10个进程,也就意味着一次的最大并发是10个进程一起去执行这个函数。

接下来,这10个进程都把自己的事情做完了,当然,这10个进程不可能完全是一起执行完毕的(除非你的cpu有十核),肯定有先执行完任务进程。

那么换种说法吧,这10个并发运行的进程中,现在假如有一个进程已经运行完毕,那么这个进程池中还剩下9个进程,我的进程池中现在空出来了一个位置,空出来的这个位置,还可以继续开一个新的进程去工作(只要进程池中有空位,就可以开启新的进程去继续工作。)

所以说,通过以上的例子,可以明确一点,进程池的好处,就是可以维护一个进程工作的最大并发量,比如我们手动设置进程池中最大可以容纳10个进程,那么在程序运行时,运行的进程数量不会高于10个,也不会低于10个,可以一直维持这个最大并发量。

虽然没办法10秒执行100次这个函数,但是,使用了进程池,肯定要比串行执行要快的多!

所以说,进程池是一个折中的解决方案。

下面是进程池的使用示例:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import multiprocessing

import os

import time

def func1(value):

time.sleep(5)

print "---------->%s" %(value)

return value+100

def display_pro_info(value):

print os.getpid()

print os.getppid()

print "log:%s" %(value)

pro_pool = multiprocessing.Pool(10) #创建线程池对象,并指定并最大并发量,如果不指定,会按照当前计算机的cpu核数来自动调整.

display_pro_info(1)

print "--------------->"

for i in range(100): #创建100个线程

# pro_pool.apply_async(func=func1,args=(i,))  #这一步可以暂时理解为往进程池里添加进程,本文后面会对该步骤做详细描述。

pro_pool.apply_async(func=func1,args=(i,),callback=display_pro_info) #这里使用了callback回调函数,什么是回调函数,本文后面会介绍。

pro_pool.close() #在这需要注意一点,就是使用进程池,一定要先close再join,否则就会报错。先close后join这个顺序是固定的!!

pro_pool.join()  #没有join的话,进程池里的进程是不会运行的。

print "the end!"

下面输出一下这个程序的运行结果:

44720

40833

log:0

--------------->

---------->0

---------->1

---------->3

---------->2

---------->5

---------->9

---------->4

---------->7

---------->6

---------->8

44720

40833

log:100

44720

40833

log:101

44720

40833

log:103

44720

40833

log:102

44720

40833

log:109

44720

40833

log:104

44720

40833

log:107

44720

40833

log:106

44720

40833

log:105

44720

40833

log:108

......

the end!

下面开始对代码进行分析,并且对进程池中一些常用的方法进行讲解:

  1. 首先来说说,在进程池中添加进程的两种模式。

分别是apply(同步执行模式)和apply_async(异步执行模式)。

1.1 apply(同步执行模式)

这种模式一般情况下没人会去用,如果进程池使用了这种模式,当进程池的第一个进程执行完毕后,才会执行第二个进程,第二个进程执行完毕后,在执行第三个进程....(也就是说这种模式会阻塞主进程!),无法实现并行效果。(不止如此,这种模式还不支持callback回调函数。)

1.2 apply_async (异步执行模式)

异步的执行模式,才是可以实现并行效果的模式,支持callback回调函数,当一个进程没有执行完毕,没有返回结果,异步执行的模式并不会对主进程进行阻塞!

补充一点!虽然 apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,如使用result.get()会阻塞主进程!

1.3 pro_pool.apply_async(func=func1,args=(i,),callback=display_pro_info)

语法和创建多线程多进程没有什么差别,func用于指定一个进程要运行的函数,args用来给函数传参数,callback用来指定回调函数。

2.关于join,close,terminate。

2.1 join 主进程阻塞等待子进程运行结束后退出, join方法要在close或terminate之后使用。(换种说法就是,不加join方法,进程池里的进程根本不会执行~)

2.2 close 关闭进程池,不再接受新的任务

2.3 terminate 直接结束进程,不再处理没有处理的任务

#其实join和close比较常用。

3.关于callback函数的一个简单说明。

在了解回调函数之前,需要注意!!callback函数是由主进程去调用的,并不是子进程!!从刚刚的示例中就可以看出结果!

某一个函数执行完毕后,某个函数或者动作执行成功后,再去执行的一个函数,并且之前执行成功的那个函数的返回值,会作为参数传给callback函数。

让回调函数在主进程下调用有什么好处?

比如现在需要开10个线程去对数据库中的数据进行操作,现在有个需求,就是对数据库内部进行的操作都需要记录一个日志,我们可以把写日志这个函数做为一个回调(callback)函数。

在不使用callback函数之前,如果想去开10个进程去操作数据库,这10个进程是并发执行的,每个进程都去操作数据库后,同时去写一个日志文件,如果不加锁的话很容易造成日志文件的损坏,所以我们可以把写日志的函数去交给主进程去执行,当进程池中的10个进程运行结束后,主线程直接执行一个写日志的操作,这就是我理解的callback函数的用处。

拿进程池中的callback来举例吧,pro_pool.apply_async(func=func1,args=(i,),callback=display_pro_info)当func1函数内部执行成功后(只要返回值不为假),display_pro_info这个函数就会被作为回调函数去执行(当然执行这个函数的是主进程!),func1的返回值会作为参数传给display_pro_info这个函数。

最后补充一下,使用callback回调函数需要注意的几点。

  1. 在执行回调函数之前的那个函数,必须有一个返回值!这个返回值有两个用途,第一个用途是判断这个函数是否执行成功,执行成功才可以执行callback函数,另一个用途就是这个返回值会作为参数传给回调函数。
  2. callback回调函数本身必须接收一个参数,这个参数用来接收上一个函数执行完毕后的返回值。
时间: 2024-12-09 22:01:36

11.python并发入门(part11 进程同步锁,以及进程池,以及callback的概念)的相关文章

11.python并发入门(part2 threading模块的基本使用)

一.在使用python多线程之前,你需要知道的. python的多线程中,实现并发是没有问题的,但是!!是无法实现真正的并行的. 这是因为python内部有个GIL锁(全局解释器锁),这个锁限制了在同一时刻,同一个进程中,只能有一个线程被运行!!! 二.threading模块的基本使用方法. 可以使用它来创建线程.有两种方式来创建线程. 1.通过继承Thread类,重写它的run方法. 2.创建一个threading.Thread对象,在它的初始化函数__init__中将可调用对象作为参数传入.

11.python并发入门(part4 死锁与递归锁)

一.关于死锁. 死锁,就是当多个进程或者线程在执行的过程中,因争夺共享资源而造成的一种互相等待的现象,一旦产生了死锁,不加人工处理,程序会一直等待下去,这也被称为死锁进程. 下面是一个产生"死锁"现象的例子: import threading import time lock_a = threading.Lock() lock_b = threading.Lock() class test_thread(threading.Thread): def __init__(self): su

11.python并发入门(part3 多线程与互斥锁)

一.锁的概念. 锁,通常被用来实现共享数据的访问,为每一个共享的数据,创建一个Lock对象(一把锁),当需要访问这个共享的资源时,可以调用acquire方法来获取一个锁的对象,当共享资源访问结束后,在调用release方法去解锁. 二.python中的互斥锁. 在介绍互斥锁之前,先来一起看一个例子.(每个线程对num实现一次-1的操作) import threading import  time num = 200  #每个线程都共享这个变量. tread_list = [] def count

11.python并发入门(part12 初识协程)

一.协程的简介. 协程,又被称为微线程,虽然是单进程,单线程,但是在某种情况下,在python中的协程执行效率会优于多线程. 这是因为协程之间的切换和线程的切换是完全不一样的!协程的切换是由程序自身控制的(程序的开发者使用yield去进行控制,协程和协程之间的切换是可控制的,想什么时候切换就什么时候切换). 当使用多线程时,开的线程越多,协程的优势就越明显. 协程的另一个优点,就是无需锁机制,因为协程只有一个进程,和线程,不存在多线程或者多进程之间访问公共资源的冲突,所以说,在协程中无需加锁,如

11.python并发入门(part7 线程队列)

一.为什么要用队列? 队列是一种数据结构,数据结构是一种存放数据的容器,和列表,元祖,字典一样,这些都属于数据结构. 队列可以做的事情,列表都可以做,但是为什么我们还要去使用队列呢? 这是因为在多线程的情况下,列表是一种不安全的数据结构. 为什么不安全?可以看下面这个例子: #开启两个线程,这两个线程并发从列表中移除一个元素. import threading import time l1 = [1,2,3,4,5] def pri(): while l1: a = l1[-1] print a

11.python并发入门(part10 多进程之间实现通信,以及进程之间的数据共享)

一.进程队列. 多个进程去操作一个队列中的数据,外观上看起来一个进程队列,只是一个队列而已,单实际上,你开了多少个进程,这些进程一旦去使用这个队列,那么这个队列就会被复制多少份. (队列=管道+锁) 这么做的主要原因就是,不同进程之间的数据是无法共享的. 下面是使用进程队列使多进程之间互相通信的示例: 下面这个例子,就是往进程队列里面put内容. #!/usr/local/bin/python2.7 # -*- coding:utf-8 -*- import multiprocessing de

11.python并发入门(part9 多线程模块multiprocessing基本用法)

一.回顾多继承的概念. 由于GIL(全局解释器锁)的存在,在python中无法实现真正的多线程(一个进程里的多个线程无法在cpu上并行执行),如果想充分的利用cpu的资源,在python中需要使用进程. 二.multiprocessing模块的简介. multiprocessing是python中用来管理多进程的包,与threading用法非常类似,它主要使用multiprocessing.Process对象来创建一个进程对象,该进程可以运行在python的函数中. 该Process(进程)对象

11.python并发入门(part6 Semaphore信号量)

一.什么是信号量. 信号量也是一种锁. 信号量的主要用途是用来控制线程的并发量的,BoundedSemaphore或Semaphore管理一个内置的计数器,每调用一次acquire()方法时,计数器-1,每调用一次release()方法时,内部计数器+1. 不过需要注意的是,Semaphore内部的计数器不能小于0!当它内部的计数器等于0的时候,这个线程会被锁定,进入阻塞状态,直到其他线程去调用release方法. BoundedSemaphore与Semaphore的唯一区别在于前者将在调用r

11.python并发入门(part1 初识进程与线程,并发,并行,同步,异步)

一.什么是进程? 在说什么是进程之前,需要先插入一个进程切换的概念! 进程,可以理解为一个正在运行的程序. 现在考虑一个场景,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源.你是不是已经想到在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让程序B暂停.这当然没问题,但这里有一个关键词:切换. 既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与