上一篇博文介绍了Python中线程、进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块。首先来看看threading模块有哪些方法和类吧。
主要有:
Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能。
Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类。
Lock :原锁,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用。
RLock :可重入锁,使单线程可以再次获得已经获得的锁,即可以被相同的线程获得多次。
Condition :条件变量,能让一个线程停下来,等待其他线程满足某个“条件”。
Event:事件对象,是线程间最简单的通信机制之一:线程可以激活在一个事件对象上等待的其他线程。
Semaphore:信号量对象,是个变量,管理一个内置的计数器,指定可同时访问资源或者进入临界区的线程数。
BoundedSemaphore :有界信号量对象,与semaphore类似,但不允许超过初始值;
ThreadError:线程错误信息类。
active_count()和activeCount():返回当前活着的Thread对象个数。
current_thread()和currentThread():返回当前的Thread对象,对应于调用者控制的线程。如果调用者控制的线程不是通过threading模块创建的,则返回一个只有有限功能的虚假线程对象。
enumerate():返回当前活着的Thread对象的列表。该列表包括守护线程、由current_thread()创建的虚假线程对象和主线程。它不包括终止的线程和还没有开始的线程。
settrace(func):为所有从threading模块启动的线程设置一个跟踪函数。在每个线程的run()方法调用之前,func将传递给sys.settrace()(该函数是设置系统的跟踪函数)。
setprofile(func):为所有从threading模块启动的线程设置一个profile函数。在每个线程的run()调用之前,func将传递给sys.setprofile()(这个函数用于设置系统的探查函数)。
stack_size([size]):返回创建新的线程时该线程使用的栈的大小,
可选的size参数指定后来创建的线程使用栈的大小,它必须是0(使用平台的或者配置的默认值)或不少于32,768(32kB)的正整数。
其它一些以"_"开头的,有些是引入其它模块的函数,然后起了个别名,比如_format_exc,它的定义如下:
from traceback import format_exc as _format_exc
有些其实是类,比如RLock的定义如下:
def RLock(*args, **kwargs): """Factory function that returns a new reentrant lock. A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it. """ return _RLock(*args, **kwargs)
_RLock其实是个类,内部调用的类,RLock其实是个函数,返回一个类,感兴趣的可以去看看threading.py相关代码。
由于篇幅比较长,只介绍了Thread,Lock,RLock,Condition,不正之处欢迎批评指正,当然更希望大家有耐心、仔细看完,同时能实践一下示例并仔细体会。
(一)Thread对象
这个类表示在单独的一个控制线程中运行的一个活动。有两种创建方法:创建线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;而是从Thread类中继承,然后在子类中覆盖run()方法,在子类中不应该覆盖其它方法(__init__()除外),也就是只覆盖该类的__init__()和run()方法。
Thread类主要方法:
start():开始线程的活动。每个线程对象必须只能调用它一次。
run():表示线程活动的方法,可以在子类中覆盖这个方法。
join([timeout]):是用来阻塞当前上下文,直至该线程运行结束,一个线程可以被join()多次,timeout单位是秒。
name:一个字符串,只用于标识的目的。它没有语义。多个线程可以被赋予相同的名字。初始的名字通过构造函数设置。
getName()/setName():作用于name的两个函数,从字面就知道是干嘛的,一个是获取线程名,一个是设置线程名
ident:线程的ID,如果线程还未启动则为None,它是一个非零的整数当一个线程退出另外一个线程创建时,线程的ID可以重用,即使在线程退出后,其ID仍然可以访问。
is_alive()/isAlive():判断线程是否还活着。
daemon:一个布尔值,指示线程是(True)否(False)是一个守护线程。它必须在调用start()之前设置,否则会引发RuntimeError。它的初始值继承自创建它的线程;主线程不是一个守护线程,所以在主线程中创建的所有线程默认daemon = False。
何为守护线程?举个例子,在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线程是否完成。 对于普通线程,如果子线程的任务没有结束,主线程不会退出,整个程序也不会退出;对于守护线程,即使子线程任务还没有结束,如果主线程退出该线程也会退出。
isDaemon()/setDaemon():作用与daemon的函数,一个是判断是不是守护线程,一个是设置守护线程。
下面通过简单的例子来展示Thread的使用方法,希望对你理解能有帮助,当然更希望你能动手操作下,了解上面方法的用处。
将函数传递到Thread对象
#coding=utf-8 import threading import datetime import time def thread_fun(num): time.sleep(num) now = datetime.datetime.now() print "线程名:%s ,now is %s" %( threading.currentThread().getName(), now) def main(thread_num): thread_list = list() # 先创建线程对象 for i in range(0, thread_num): thread_name = "thread_%s" %i thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (2,))) # 启动所有线程 for thread in thread_list: thread.setName("good")#修改线程名 print thread.is_alive()#判断线程是否是活的 print thread.ident thread.start() print thread.ident print thread.is_alive() thread.join() print thread.is_alive() if __name__ == "__main__": main(3)
运行结果如下:
上面的例子只是展示了几个简单的Thread类的方法,其它方法大家可以自己动手试试,体会下。这里再讲下threading.Thread()
它的原型是:
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={})
group应该为None;被保留用于未来实现了ThreadGroup类时的扩展。
target是将被run()方法调用的可调用对象。默认为None,表示不调用任何东西。
name是线程的名字。默认情况下,以“Thread-N”的形式构造一个唯一的名字,N是一个小的十进制整数。
args是给调用目标的参数元组。默认为()。
kwargs是给调用目标的关键字参数的一个字典。默认为{}。
继承自threading.Thread类:
继承的话,主要是重定义__init__()和run()。
#coding=utf-8 import threading class MyThread(threading.Thread): def __init__(self,group=None,target=None,name=None,args=(),kwargs=None): threading.Thread.__init__(self,group,target,name,args,kwargs) self.name = "Thread_%s"%threading.active_count() def run(self): print "I am %s" %self.name if __name__ == "__main__": for thread in range(0, 5): t = MyThread() t.start()
运行结果就不贴出来了。
(二)Lock互斥锁
如果多个线程访问同时同一个资源,那么就有可能对这个资源的安全性造成破坏,还好Python的threading模块中引入了互斥锁对象,可以保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象这时候可以使用threading模块提供的Lock类。它主要有两个方法:acquire()和 release()
acquire(blocking=True, timeout=-1)
获取锁,设置为locked状态,blocking参数表示是否阻塞当前线程等待,timeout表示阻塞时的等待时间 。如果成功地获得lock,则acquire()函数返回True,否则返回False,timeout超时时如果还没有获得lock仍然返回False。
release()
释放锁,设置为unlocked状态,当在未锁定的锁上调用时,会引发ThreadError。没有返回值。
下面的代码是没有设锁时,100个线程访问一个资源,从运行结果来看对资源的完整性造成了一定的影响。
#coding=utf-8 import threading import time num = 0 class MyThread(threading.Thread): def __init__(self,target=None,name=None,args=(),kwargs=None): threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs) def run(self): global num time.sleep(1)#这句不能少,要不然看不到对公共资源完整性的破坏,当然也可以将该句放在 #num += 1的后面,有兴趣可以试试 num += 1 self.setName("Thread-%s" % num) print("I am %s, set counter:%s" % (self.name, num)) if __name__ == "__main__": for i in range(0, 100): my_thread = MyThread() my_thread.start()
运行结果如下:
看看部分数据的完整性遭到了“破坏”
再看看使用互斥锁的例子
#coding=utf-8 import threading import time num = 0 lock = threading.Lock() class MyThread(threading.Thread): def __init__(self,target=None,name=None,args=(),kwargs=None): threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs) def run(self): global num time.sleep(1) if lock.acquire(): num += 1 self.setName("Thread-%s" % num) print("I am %s, set counter:%s" % (self.name, num)) lock.release() if __name__ == "__main__": for i in range(0, 100): my_thread = MyThread() my_thread.start()
运行结果大家可以动手试试。
同步阻塞
当一个线程调用Lock对象的acquire()方法获得锁时,这把锁就进入“locked”状态。因为每次只有一个线程可以获得锁,所以如果此时另一个线程试图获得这个锁,该线程就会变为“block“同步阻塞状态。直到拥有锁的线程调用锁的release()方法释放锁之后,该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行状态。
死锁
是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
(三)可重入锁RLock
用法和Lock用法一样,只RLock支持嵌套,而Lock不支持嵌套;RLock允许在同一线程中被多次acquire(如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐),而Lock却不允许这种情况,否则会出现死锁。看下面例子:
#coding=utf-8 import threading import time num = 0 lock = threading.Lock() class MyThread(threading.Thread): def __init__(self,target=None,name=None,args=(),kwargs=None): threading.Thread.__init__(self,target=target,name=name,args=args,kwargs=kwargs) def run(self): global num time.sleep(1) if lock.acquire(): num += 1 self.setName("Thread-%s" % num) print("I am %s, set counter:%s" % (self.name, num)) if lock.acquire(): num += 1 self.setName("Thread-%s" % num) print("I am %s, set counter:%s" % (self.name, num)) lock.release() lock.release() if __name__ == "__main__": for i in range(0, 100): my_thread = MyThread() my_thread.start()
上面的例子使用Lock锁嵌套使用,运行后就出现了死锁,运行不下去,“卡住了”。
这个时候就出现了RLock,它支持在同一线程中可以多次请求同一资源。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。所以将上面的代码中
lock = threading.Lock()
改为
lock = threading.RLock()
就不会出现死锁情况。
(四)条件变量Condition
除了互斥锁外,对复杂线程同步问题,Python提供了Condition对象来支持解决。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
除了acquire和release方法外还有如下方法:
wait([timeout]):
w释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
notify():
唤醒一个挂起的线程(如果存在挂起的线程),该方法不会释放所占用的琐。
notify_all()/notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程),这些方法不会释放所占用的琐。
同时要主要这四个方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。
Condition其实是一把高级的锁,它内部维护着一个锁对象(默认是RLock),当然你也可以通过它的构造函数传递一个Lock/RLock对象给它。我们可以看看它的__init__()函数的定义:
def __init__(self, lock=None, verbose=None): _Verbose.__init__(self, verbose) if lock is None: lock = RLock() self.__lock = lock # Export the lock‘s acquire() and release() methods self.acquire = lock.acquire self.release = lock.release # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). try: self._release_save = lock._release_save except AttributeError: pass try: self._acquire_restore = lock._acquire_restore except AttributeError: pass try: self._is_owned = lock._is_owned except AttributeError: pass self.__waiters = []
是不是一目了然。条件变量的经典问题就是生产者与消费者的例子了。假设某代工厂的两个车间,一个车间生产某种配件,另一个车间需要这些配件进行组装产品,那么前者就是生产者(Producer),后者就是消费者(Consumer),当配件数低于1000就需要生产,多余5000个可以暂停生产,生产者一秒中可生产200个,消费者的策略是剩余数量少于800就不组装了,一秒钟可消费100个。代码如下
#coding=utf-8 import threading import time left_num = 500#假设初始剩余数为500 con = threading.Condition() class Producer(threading.Thread): """生产者""" def __init__(self, target=None, name=None, args=(), kwargs=None): threading.Thread.__init__(self, target=target, name=name, args=args, kwargs=kwargs) def run(self): global left_num is_product = True while True: if con.acquire(): if left_num > 5000:#大于5000可以暂停生产 is_product = False#不用继续生产 con.wait()#该“生产线”挂起 elif left_num < 1000: is_product = True#继续生产 left_num += 200 msg = self.name+‘ produce 200, left_num=%s‘%left_num print msg con.notify() else: #对于剩余数量处在中间的,就要分情况讨论了,一旦开启“生产”就要等 #到生产到指定数目才能停止,仔细想想很好理解的 if is_product: left_num += 200 msg = self.name + ‘ produce 200, left_num=%s‘ % left_num print msg con.notify() else: con.wait() con.release() time.sleep(1) class Consumer(threading.Thread): """消费者""" def __init__(self, target=None, name=None, args=(), kwargs=None): threading.Thread.__init__(self, target=target, name=name, args=args, kwargs=kwargs) def run(self): global left_num while True: if con.acquire(): if left_num < 800:#少于800不组装 con.wait() else: left_num -= 100 msg = self.name+‘ consume 100, left_num=%s‘%left_num print msg con.notify() con.release() time.sleep(1) def test(): for i in range(1): p = Producer() p.start() for i in range(1): c = Consumer() c.start() if __name__ == ‘__main__‘: test()
大家可以把代码复制下来运行看看,仔细体会下,当累计剩余数大于5000后,就停止了“生产",之后只有消费者在运作,当剩余数低于1000,生产者”生成线"重启生产,直到累计剩余数大于5000,后又停止生产,以此类推。示例中展示的都是生产者与消费者都只有一条”流水线”,实际上,生产者有多条”流水线",消费者也有多条"流水线“,其实都一样,将上面代码的for循环的范围改下就可以了。还有一点,Condition可以接受一个Lock/RLock对象,大家可以试试创建一个Lock/RLock传递给Condition,如下:
lock = threading.Lock() con = threading.Condition(lock)
效果是一样的,大家可以动手试试。