threading模块
线程有两种调用方式
直接调用
import threading import time def sayhi(num): # 定义每个线程要执行的函数 print("running on number:%s" % num) time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成一个线程实例 t1.start() # 启动线程 t2.start() # 启动线程 print(t1.getName()) # 获取线程名 print(t2.getName())
继承式调用
import threading import time class MyThread(threading.Thread): def __init__(self, num): super(MyThread, self).__init__() self.num = num def run(self): # 定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(2) if __name__ == "__main__": t1 = MyThread("1") t2 = MyThread("2") t1.start() t2.start()
并发的多线程示例
先看一段代码,启动十个线程,每个线程都会sleep2秒,我想计算下执行时间,预计是2秒左右
import threading import time def run(n): print("task", n, threading.current_thread()) time.sleep(2) start_time = time.time() # 开始时间 for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() use_time = time.time() - start_time print("use time: ", use_time)
但是执行后发现,程序并没有等待我所以线程都执行完就打印了执行时间,这样不是我想要的结果,原因是主线程的执行操作,不会等待子线程执行完成,此处是不堵塞的,说到这里就引入了join,join是等待子线程执行
import threading import time def run(n): print("task", n, threading.current_thread()) time.sleep(2) start_time = time.time() t_list = [] for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() t_list.append(t) for r in t_list: r.join() use_time = time.time() - start_time print("use time: ", use_time)
这样,主线程就能等待所以线程都执行结束再打印执行时间了,结果没有问题,use time: 2.0050017833709717
守护线程
主线程结束,守护线程不管执行没执行完它要做的操作都会跟着结束
import threading import time def run(n): print("task", n, threading.current_thread()) time.sleep(2) start_time = time.time() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.setDaemon(True) # 设置为守护线程 t.start() use_time = time.time() - start_time print("use time: ", use_time)
通过setDaemon(True)设置此线程为守护线程,代码执行效果为主线程打印完使用时间程序就退出了,并没有sleep
线程锁(互斥锁)
一个进程可以启动多个线程,多个线程共享一块内存空间,也就意味着每个线程可以访问同一个数据,如果两个线程要同时修改同一份数据时,肯定会出问题
为了避免这样的问题,我们可以“加把锁”,也就是线程锁,同一时间只能让一个线程去修改这个数据
import threading def run(n): lock.acquire() # 加锁 global num num += 1 print(n) lock.release() # 释放锁 lock = threading.Lock() # 实例化一个互斥锁 num = 0 for i in range(1000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() print("num:", num)
递归锁
一个大锁中还要有子锁
import threading num1, num2 = 0, 0 def run1(): print("grab the first part data") lock.acquire() global num1 num1 += 1 lock.release() return num1 def run2(): print("grab the second part data") lock.acquire() global num2 num2 += 1 lock.release() return num2 def run3(): lock.acquire() res1 = run1() print("-----between run1 and run2-----") res2 = run2() lock.release() print("res1:%s,res2:%s" % (res1, res2)) lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: pass else: print("-----all threads done-----") print(num1, num2)
semaphore(信号量)
互斥锁同时允许一个线程更改数据,而信号量是同时允许一定数量的线程更改数据,比如厕所有3个坑,最多允许3个人上厕所,后面的人只能等有人出来才能进去上厕所
import threading import time def run(n): semaphore.acquire() time.sleep(2) print("run the thread: %s" % n) semaphore.release() if __name__ == "__main__": semaphore = threading.BoundedSemaphore(3) # 最多同时运行3个线程 for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1: pass else: print("-----all threads done-----")
event事件
通过事件可以实现两个或多个进程间的交互
import threading import time import random def light(): if not event.isSet(): event.set() # 设置标志,表示通行 count = 0 while True: if count < 10: print(‘\033[42;1m--green light on---\033[0m‘) elif count < 13: print(‘\033[43;1m--yellow light on---\033[0m‘) elif count < 20: if event.isSet(): event.clear() # 清除标志位,表示禁止通行 print(‘\033[41;1m--red light on---\033[0m‘) else: count = 0 event.set() time.sleep(1) count += 1 def car(n): while True: time.sleep(random.randrange(10)) if event.isSet(): print("car [%s] is running.." % n) else: print("car [%s] is waiting for the red light.." % n) if __name__ == "__main__": event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(3): Car = threading.Thread(target=car, args=(i,)) Car.start()
queue队列
程序解耦
加快执行速度
queue有几种模式
queue.Queue(maxsize=0) # 先入先出
queue.LifoQueue(maxsize=0) # 后入先出
queue.PriorityQueue(maxsize=0) # 存储数据时可设置优先级的队列
先入先出的例子,取到1,2,3
import queue q = queue.Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
后入先出的例子,取到3,2,1
import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
优先级的例子
import queue q = queue.PriorityQueue() q.put((2, "jack")) q.put((3, "tom")) q.put((1, "jiachen")) print(q.get()) print(q.get()) print(q.get())
q.qsize()为队列中有几个数据
q.empty()判断队列中是否为空,空为True,非空为False
q.full()判断队列是否满了,满了为True,不满为False
q.put(item,block=True,timeout=None)将数据放入队列中
q.put_nowait(item)将数据放入队列中,如果队列满了,不堵塞直接报异常
q.get(item,block=True,timeout=None)取出队列中的数据,block为True如果没有数据可以取就堵塞,为False如果没有数据可以取就报异常,timeout为超时时间,超出时间报异常
q.get_nowait(item)取出队列中的数据,如果没有数据,不堵塞直接报异常
q.task_done()
q.join() 堵塞直到队列被消费完
生产者消费者模型
一个简单的例子
import queue import threading import time def producer(name): count = 1 while True: q.put(count) print("[%s]生产了骨头[%s]" % (name, count)) count += 1 time.sleep(0.5) def consumer(name): while True: q.get() print("[%s]吃了根骨头" % name) time.sleep(1) if __name__ == "__main__": q = queue.Queue(5) p = threading.Thread(target=producer, args=("xxx",)) c1 = threading.Thread(target=consumer,args=("tom",)) c2 = threading.Thread(target=consumer,args=("jack",)) p.start() c1.start() c2.start()