python 的threading模块

其中Thread类

是你主要的线程类,可以创建进程实例。该类提供的函数包括:

getName(self) 返回线程的名字

isAlive(self) 布尔标志,表示这个线程是否还在运行中

isDaemon(self) 返回线程的daemon标志,将线程放入后台执行

join(self, timeout=None) 程序挂起,直到线程结束,如果给出timeout,则最多阻塞timeout秒,主线程需等待子线程完成

run(self) 定义线程的功能函数

setDaemon(self, daemonic) 把线程的daemon标志设为daemonic

setName(self, name) 设置线程的名字

start(self) 开始线程执行

其中Queue提供的类

Queue队列

LifoQueue后入先出(LIFO)队列

PriorityQueue 优先队列

接下来,我们将会用一个一个示例来展示threading的各个功能,包括但不限于:两种方式起线程、threading.Thread类的重要函数、使用Lock互斥及RLock实现重入锁、使用Condition实现生产者和消费者模型、使用Event和Semaphore多线程通信

两种方式起线程

在Python中我们主要是通过thread和threading这两个模块来实现的,其中Python的threading模块是对thread做了一些包装的,可以更加方便的被使用,所以我们使用threading模块实现多线程编程。一般来说,使用线程有两种模式,一种是创建线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;另一种是直接从Thread继承,创建一个新的class,把线程执行的代码放到这个新的 class里。

方法一:

例一:

#!/usr/bin/python

import threading

import time

def worker():

print "worker"

time.sleep(1)

return

for i in xrange(5):

t = threading.Thread(target=worker)

#threading.setDaemon()的使用。设置后台进程。

t.setDaemon(True)

t.start()

#threading.activeCount()的使用,此方法返回当前进程中线程的个数

print "current has %d threads" % (threading.activeCount() - 1)

#threading.enumerate()的使用。此方法返回当前运行中的Thread对象列表。

for item in threading.enumerate():

print item

例二:

import threading def thread_fun(num):    for n in range(0, int(num)):        print " I come from %s, num: %s" %( threading.currentThread().getName(), n) def main(thread_num):    thread_list = list();    # 先创建线程对象    for i in range(0, thread_num):        thread_name = "thread_%s" %i        thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (20,)))     # 启动所有线程    for thread in thread_list:        thread.start()     # 主线程中等待所有子线程退出    for thread in thread_list:        thread.join() if __name__ == "__main__":    main(3)

程序启动了3个线程,并且打印了每一个线程的线程名字,这个比较简单吧,处理重复任务就派出用场了,下面介绍使用继承threading的方式;

方法二:

继承自threading.Thread类:

import threading class MyThread(threading.Thread):    def __init__(self):

#MyThread的初始化和threading.Thread一样        threading.Thread.__init__(self);     def run(self):#重载run方法        print "I am %s" %self.name if __name__ == "__main__":    for thread in range(0, 5):        t = MyThread()#自动调用类的run函数        t.start()

threading.Thread类的重要函数

介绍threading模块中的主类Thread的一些主要方法,实例代码如下:

import threading class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        print "I am %s" % (self.name) if __name__ == "__main__":    for i in range(0, 5):        my_thread = MyThread()        my_thread.start()

1、name相关
你可以为每一个thread指定name,默认的是Thread-No形式的,如上述实例代码打印出的一样:

I am Thread-1    I am Thread-2    I am Thread-3    I am Thread-4    I am Thread-5

当然你可以指定每一个thread的name,这个通过setName方法,代码:


def __init__(self):     threading.Thread.__init__(self)     self.setName("new" + self.name)

2、join方法
join方法原型如下,这个方法是用来阻塞当前上下文,直至该线程运行结束:


def join(self, timeout=None):

timeout可以设置超时时间

3、setDaemon方法
当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法,并设置其参数为True。

使用Lock互斥锁

现在我们考虑这样一个问题:假设各个线程需要访问同一公共资源,我们的代码该怎么写?

import threadingimport time counter = 0 class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global counter        time.sleep(1);        counter += 1        print "I am %s, set counter:%s" % (self.name, counter) if __name__ == "__main__":    for i in range(0, 200):        my_thread = MyThread()        my_thread.start()

解决上面的问题,我们兴许会写出这样的代码,我们假设跑200个线程,但是这200个线程都会去访问counter这个公共资源,并对该资源进行处理(counter += 1),代码看起来就是这个样了,但是我们看下运行结果:

I am Thread-69, set counter:64I am Thread-73, set counter:66I am Thread-74, set counter:67I am Thread-75, set counter:68I am Thread-76, set counter:69I am Thread-78, set counter:70I am Thread-77, set counter:71I am Thread-58, set counter:72I am Thread-60, set counter:73I am Thread-62, set counter:74I am Thread-66, set counter:75I am Thread-70, set counter:76I am Thread-72, set counter:77I am Thread-79, set counter:78I am Thread-71, set counter:78

打印结果我只贴了一部分,从中我们已经看出了这个全局资源(counter)被抢占的情况,问题产生的原因就是没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。在开发过程中我们必须要避免这种情况,那怎么避免?这就用到了我们在综述中提到的互斥锁了。

互斥锁概念

Python编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。在Python中我们使用threading模块提供的Lock类。
我们对上面的程序进行整改,为此我们需要添加一个互斥锁变量mutex = threading.Lock(),然后在争夺资源的时候之前我们会先抢占这把锁mutex.acquire(),对资源使用完成之后我们在释放这把锁mutex.release()。代码如下:


import threadingimport time counter = 0mutex = threading.Lock() class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global counter, mutex        time.sleep(1);        if mutex.acquire():            counter += 1            print "I am %s, set counter:%s" % (self.name, counter)            mutex.release() if __name__ == "__main__":    for i in range(0, 100):        my_thread = MyThread()        my_thread.start()

同步阻塞
当一个线程调用Lock对象的acquire()方法获得锁时,这把锁就进入“locked”状态。因为每次只有一个线程1可以获得锁,所以如果此时另一个线程2试图获得这个锁,该线程2就会变为“block“同步阻塞状态。直到拥有锁的线程1调用锁的release()方法释放锁之后,该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

进一步考虑

通过对公共资源使用互斥锁,这样就简单的到达了我们的目的,但是如果我们又遇到下面的情况:

1、遇到锁嵌套的情况该怎么办,这个嵌套是指当我一个线程在获取临界资源时,又需要再次获取;

2、如果有多个公共资源,在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源;

上述这两种情况会直接造成程序挂起,即死锁,下面我们会谈死锁及可重入锁RLock。

死锁的形成

前一篇文章Python:使用threading模块实现多线程编程四[使用Lock互斥锁]我们已经开始涉及到如何使用互斥锁来保护我们的公共资源了,现在考虑下面的情况–
如果有多个公共资源,在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,这会引起什么问题?

死锁概念
所谓死锁: 是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 由于资源占用是互斥的,当某个进程提出申请资源后,使得有关进程在无外力协助下,永远分配不到必需的资源而无法继续运行,这就产生了一种特殊现象死锁。


import threading counterA = 0counterB = 0 mutexA = threading.Lock()mutexB = threading.Lock() class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        self.fun1()        self.fun2()     def fun1(self):        global mutexA, mutexB        if mutexA.acquire():            print "I am %s , get res: %s" %(self.name, "ResA")             if mutexB.acquire():                print "I am %s , get res: %s" %(self.name, "ResB")                mutexB.release()         mutexA.release()     def fun2(self):        global mutexA, mutexB        if mutexB.acquire():            print "I am %s , get res: %s" %(self.name, "ResB")             if mutexA.acquire():                print "I am %s , get res: %s" %(self.name, "ResA")                mutexA.release()         mutexB.release() if __name__ == "__main__":    for i in range(0, 100):        my_thread = MyThread()        my_thread.start()

代码中展示了一个线程的两个功能函数分别在获取了一个竞争资源之后再次获取另外的竞争资源,我们看运行结果:

I am Thread-1 , get res: ResAI am Thread-1 , get res: ResBI am Thread-2 , get res: ResAI am Thread-1 , get res: ResB

可以看到,程序已经挂起在那儿了,这种现象我们就称之为”死锁“。

避免死锁
避免死锁主要方法就是:正确有序的分配资源,避免死锁算法中最有代表性的算法是Dijkstra E.W 于1968年提出的银行家算法。

可重入锁RLock

考虑这种情况:如果一个线程遇到锁嵌套的情况该怎么办,这个嵌套是指当我一个线程在获取临界资源时,又需要再次获取。
根据这种情况,代码如下:


import threadingimport time counter = 0mutex = threading.Lock() class MyThread(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global counter, mutex        time.sleep(1);        if mutex.acquire():            counter += 1            print "I am %s, set counter:%s" % (self.name, counter)            if mutex.acquire():                counter += 1                print "I am %s, set counter:%s" % (self.name, counter)                mutex.release()            mutex.release() if __name__ == "__main__":    for i in range(0, 200):        my_thread = MyThread()        my_thread.start()

这种情况的代码运行情况如下:

I am Thread-1, set counter:1

之后就直接挂起了,这种情况形成了最简单的死锁。
那有没有一种情况可以在某一个线程使用互斥锁访问某一个竞争资源时,可以再次获取呢?在Python中为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

代码只需将上述的:

mutex = threading.Lock()

替换成:

mutex = threading.RLock()

使用Condition实现复杂同步

目前我们已经会使用Lock去对公共资源进行互斥访问了,也探讨了同一线程可以使用RLock去重入锁,但是尽管如此我们只不过才处理了一些程序中简单的同步现象,我们甚至还不能很合理的去解决使用Lock锁带来的死锁问题。所以我们得学会使用更深层的解决同步问题。

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。

使用Condition的主要方式为:线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

下面我们通过很著名的“生产者-消费者”模型来来演示下,在Python中使用Condition实现复杂同步。


import threadingimport time condition = threading.Condition()products = 0 class Producer(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global condition, products        while True:            if condition.acquire():                if products < 10:                    products += 1;                    print "Producer(%s):deliver one, now products:%s" %(self.name, products)                    condition.notify()                else:                    print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)                    condition.wait();                condition.release()                time.sleep(2) class Consumer(threading.Thread):    def __init__(self):        threading.Thread.__init__(self)     def run(self):        global condition, products        while True:            if condition.acquire():                if products > 1:                    products -= 1                    print "Consumer(%s):consume one, now products:%s" %(self.name, products)                    condition.notify()                else:                    print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)                    condition.wait();                condition.release()                time.sleep(2) if __name__ == "__main__":    for p in range(0, 2):        p = Producer()        p.start()     for c in range(0, 10):        c = Consumer()        c.start()

代码中主要实现了生产者和消费者线程,双方将会围绕products来产生同步问题,首先是2个生成者生产products ,而接下来的10个消费者将会消耗products,代码运行如下:

Producer(Thread-1):deliver one, now products:1Producer(Thread-2):deliver one, now products:2Consumer(Thread-3):consume one, now products:1Consumer(Thread-4):only 1, stop consume, products:1Consumer(Thread-5):only 1, stop consume, products:1Consumer(Thread-6):only 1, stop consume, products:1Consumer(Thread-7):only 1, stop consume, products:1Consumer(Thread-8):only 1, stop consume, products:1Consumer(Thread-10):only 1, stop consume, products:1Consumer(Thread-9):only 1, stop consume, products:1Consumer(Thread-12):only 1, stop consume, products:1Consumer(Thread-11):only 1, stop consume, products:1

另外:Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock;除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

使用Event实现线程间通信

使用threading.Event可以实现线程间相互通信,之前的Python:使用threading模块实现多线程编程七[使用Condition实现复杂同步]我们已经初步实现了线程间通信的基本功能,但是更为通用的一种做法是使用threading.Event对象。
使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。


import threadingimport time class MyThread(threading.Thread):    def __init__(self, signal):        threading.Thread.__init__(self)        self.singal = signal     def run(self):        print "I am %s,I will sleep ..."%self.name        self.singal.wait()        print "I am %s, I awake..." %self.name if __name__ == "__main__":    singal = threading.Event()    for t in range(0, 3):        thread = MyThread(singal)        thread.start()     print "main thread sleep 3 seconds... "    time.sleep(3)     singal.set()

运行效果如下:

I am Thread-1,I will sleep ...I am Thread-2,I will sleep ...I am Thread-3,I will sleep ...main thread sleep 3 seconds...I am Thread-1, I awake...I am Thread-2, I awake... I am Thread-3, I awake...

时间: 2024-08-03 20:36:36

python 的threading模块的相关文章

Python 多线程threading模块

首先,我们在了解多线程时需要理解的就是什么是多线程,按照官方的解释就是:多线程(英语:multithreading),是指从软件或者硬件上实现多个线程并发执行的技术. 在我自学到这里的时候,通过会在想进程和线程到底是有什么区别,我的理解就是: 进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,一个进程可以包含多个线程. 下面就以简单的例子来加强我们对python 线程的理解. 默认情况下,我们在没有启动线程的时候,可以看一下程序总的运行时间,应该是每个函数

python中threading模块详解(一)

python中threading模块详解(一) 来源 http://blog.chinaunix.net/uid-27571599-id-3484048.html threading提供了一个比thread模块更高层的API来提供线程的并发性.这些线程并发运行并共享内存. 下面来看threading模块的具体用法: 一.Thread的使用 目标函数可以实例化一个Thread对象,每个Thread对象代表着一个线程,可以通过start()方法,开始运行. 这里对使用多线程并发,和不适用多线程并发做

再看python多线程------threading模块

现在把关于多线程的能想到的需要注意的点记录一下: 关于threading模块: 1.关于 传参问题 如果调用的子线程函数需要传参,要在参数后面加一个","否则会抛参数异常的错误. 如下: 1 for i in xrange(5): 2 threads.append(threading.Thread(target=worker,args=(i,))) 2.关于join()阻塞 join()方法一旦被调用,这个线程就会被阻塞住,等其他线程执行完才执行自身.当我们在主线程A中,创建了n个子线

python多线程-threading模块

threading 是我们常用的用于 python 多线程的模块,其功能更加丰富.下面我们就来开始学习这个模块. 同样的,我这里声明一样我使用的版本是 python2.7,不同版本直接可能存在差异. 老规矩,使用 help() 函数获取帮助文档,看看里面有什么内容. threading 模块中提供了一个 thread 的类,注意不要和 thread 模块搞混了,两者差别还是很大的.thread 这个类可以实例化一个对象,每个对象代表一个线程,可以调用其中的 run() 方法来开启一个线程的运行.

Python之threading模块简单使用

下面的代码来自<Python核心编程>(第二版)一书. threading模块对象 说明 Thread 表示一个线程的执行的对象 Lock 锁原语对象(跟thread模块里的锁原语对象相同) RLock 可重入锁对象.使单线程可以再次获得已经获得了的锁(递归锁定) Condition 条件变量对象能让一个线程停下来,等待其他线程满足了某个"条件".如,状态的改变或值得改变 Event 通用的条件变量,多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活 Sem

python之threading模块简单讲解和实例演示

import threading 在处理一些程序时为了节约时间,可以使用多线程的方式, 让其并发去执行,从而节约时间, (注意python其实是伪多线程,其实是以我们感觉不到的速度每行代码都按照相同时间执行) Lock 当多线程同时对一个变量进行修改操作的时候,那么可能会出现混乱, 因为其是并发(算是吧)而行, (所以当两个赋值时间相对接近时那么其容易出现混乱) 所以我们得创建锁, 让其上一个线程赋值结束, 下一个赋值再继续 threading.Lock() 创建锁 是可用的最低级的同步指令.

python 线程 threading模块

# 进程 : 数据隔离,资源分配的最小单位,可以利用多核,操作系统调度,数据不安全,开启关闭切换时间开销大 # multiprocessing 如何开启进程 start join # 进程有数据不安全的问题 Lock (抢票的例子) # 进程之间可以通信ipc: # 队列(安全) 管道(不安全) # 生产者消费者模型 # 第三方工具 # 进程之间可以通过Manager类实现数据共享(不需要会写代码) # 一般情况下我们开启的进程数不会超过cpu个数的两倍# 线程(80%) # 什么是线程 :能被

Python中threading模块的join函数

Join的作用是众所周知的,阻塞进程直到线程执行完毕.通用的做法是我们启动一批线程,最后join这些线程结束,例如: 1 for i in range(10): 2 3 t = ThreadTest(i) 4 5 thread_arr.append(t) 6 7 for i in range(10): 8 9 thread_arr[i].start() 10 11 for i in range(10): 12 13 thread_arr[i].join() 此处join的原理就是依次检验线程池中

python threading模块中对于信号的抓取

最近的物联网智能网关(树莓派)项目中遇到这样一个问题:要从多个底层串口读取发来的数据,并且做出相应的处理,对于每个串口的数据的读取我能想到的可以采用两种方式: 一种是采用轮询串口的方式,例如每3s向每个串口的buffer区去取一次数据,但是这样可能会有缓冲区溢出的可能,同时,数据的同步也可能会出现一定的问题,因为数据的上传周期是可以用户自定义的,一旦用户定义的上传周期过短或过长,都可能造成读取的数据出问题. 另一种方式,就是采用多线程方式,把每个串口读取数据放在单独的子线程中,每个子线程阻塞于串