4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQueue(maxsize=0) # last in fist out 后进先出 class queue.PriorityQueue(maxsize=0) # 存储数据时可以设置优先级的队列,如果你实例化一个优先级队列,在往队列中put时参数就需要一个元组了,如:put((优先级数,value)),其中优先及数越小,优先级越高. # 当然q.get()取数据时,取出来的也是元组格式.(优先级数,value) 它有几种方法: exception queue.Empty 正常情况下取一个queue,如果取完了(空了),就进入了一个阻塞状态(程序就开始等待).但是你也可以把它设置成非阻塞. 正常取的方法:queue.get() ,设置成非阻塞queue.get_nowaitt() ,如果设置了非阻塞.当队列取完了,程序就会跑出一个异常. exception queue.Full 正常情况下可以设置队列的大小,如果队列满了,也会进入一个阻塞状态.同样也可以设置非阻塞抛异常的方法. 正常存方法queue.put() , 设置成非阻塞queue.put_nowait()后如果queue满了,那么程序会抛异常. Queue.qsize() # 获取当前的queue里的值的长度. Queue.empty() # 判断queue是不是为空,如果为空则返回True Queue.full() # 判断queue是不是满了,如果满了则返回True Queue.put(item,block=True,timeout=None) # 默认是是block,这个timeout是做什么的.当block=True时,如果queue队列满了以后,程序会一直阻塞.那么有些情况下,我们想不能一直阻塞.timeout就其作用了,阻塞一段时间后,报一个"Full exception" 异常 Queue.put_nowait(item) # 直接不等了,如果队列满了则报Full exception 异常 Queue.get(block=True,timeout=None) # 默认block=True ,如果取不到数据,一直阻塞,timeout设置阻塞多久后抛 Empty exception 异常 Queue.get_nowait() Queue.task_done() # 像是一个信号,当消费者消费完了,通知生产者已经消费完了,你要生产了.task_done()方法用于消费者线程. 4.7 生产者消费者模型 这是一个新的概念,干嘛用的? 它是用于两个独立的系统之间,一个系统用于生产消息,一个用于消费消息.它的最重要的作用,使你的程序解偶,使你两个系统之间关联性没有那么大.两个系统完全的分开了,分开了之后,两个系统之间的通信就通过生产者消费者模型这个桥梁. 消费者,生产者这是以后做开发最常用设计模式之一. 既然说生产者\消费者模型是两个系统之间的桥梁,那么他两之间怎么去通信.怎么实现生产者和消费者? 拿厨师和食客做举例: 1.厨师作为生产者,生产包子 2.食客作为消费者,吃包子. 那么问题来了:他两在什么的情况下,效率可以达到最高. 第一种情况:消费者每次跑到生产者面前说我要吃包子,然后厨师做(花1分钟),然后把包子给消费者. 如果现在又来了一个消费者.他两同时跟厨师说要吃包子,厨师的处理过程就是,先给消费者1做包子,做好后给消费者1,然后在给消费者2. 这里我们可以知道消费者2,要等待厨师做2个包子的时间.消费者2就浪费时间了.那么我们想在厨师做包子的期间,消费者2能不能去做其他事情. 目前不行,因为消费者2走后,消费者3来了.要买包子,作为生产者程序的厨师是串行的,它只能等消费者2取走包子后继续做包子3.所以这种情况下,消费者2就只能在窗口等着,哪都不能去. 那么有么有解决办法? 第二种情况:作为一名聪明的厨师,肯定能想到,我找一个保鲜盒,我先做上3个放那.当消费者1来要包子,我去保鲜盒里拿1个给它,消费者2来要包子,同样也去保鲜盒拿包子.等空下来在去做包子(目的是要保持保鲜盒中有3个包子).但问题来了?当你空下来去做包子的时候,有人来买包子.作为一名合格的厨师,你不能轻易的就把做包子的事情中断,去给食客拿包子.因为那不符合一名高尚厨师的规范. 同样,这些在厨师做包子的时间段要买包子的食客,也还是要等上一段时间.虽然比之前的情况好一些,但消费者的体验也同样不是很好. 第三种情况:为了提高消费者的体验,厨师想到了,招一个漂亮的女服务员,让服务员保存3个包子.消费者不在和厨师直接通信,消费者直接去找服务员说我要吃包子. 服务员拿一个包子给消费者1后,她就告诉厨师,我这边少一个包子,你生产一个.厨师就只和服务员通话. 消费者也只和服务员通话. 服务员只做双方信息的传递. 第三种情况就算一个生产者\消费者模型 那么用代码如何实现这个服务员呢:这就可以使用queue队列了. 用代码实现:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 简单的生产者消费者模型代码 5 1对多 6 ‘‘‘ 7 8 import threading,queue 9 10 def consumer(n): 11 ‘‘‘ 12 简单的一个消费者 13 :param n: 14 :return: 15 ‘‘‘ 16 print("我是消费者 [%s]"%n) 17 print("consumer [%s] get task: %s"%(n,q.get())) 18 # q.task_done() # 通知生产者的标记 19 20 def producer(n): 21 for i in range(2): 22 print("厨师%s生产了编号为%s的包子!"%(n,i)) 23 q.put(i) 24 # q.join() # 消费者.task_done()方法告诉的标记不为空,就一直阻塞 25 print("两个包子都被消费者吃掉了") 26 27 q = queue.Queue(maxsize=3) 28 c1 = threading.Thread(target=consumer,args=[1,]) 29 c2 = threading.Thread(target=consumer,args=[2,]) 30 c3 = threading.Thread(target=consumer,args=[3,]) 31 32 p = threading.Thread(target=producer,args=[‘ted‘,]) 33 34 c1.start() 35 c2.start() 36 c3.start() 37 p.start() 38 代码实现结果: 39 我是消费者 [1] 40 我是消费者 [2] 41 我是消费者 [3] 42 厨师ted生产了编号为0的包子! 43 厨师ted生产了编号为1的包子! 44 两个包子都被消费者吃掉了 45 consumer [1] get task: 0 46 consumer [2] get task: 1
上面的代码,我们看到线程没有结束.原因是线程c3 种q.get()还在阻塞状态.因为厨师只做了2个包子,并且两个包子通过管道已经被消费者c1.c2吃掉了. 轮到消费者c3时,已经没包子了.所以c3 中,q.get()阻塞在那了. 实际中我们肯定不是这样的.作为生产者,希望服务员能告诉生产者什么时候该做包子, 两种方式:1.厨师不停的去问服务员,你那里包子还够不够.2.服务员在没有包子的时候告诉厨师,没存货了,你赶紧做. 方式1,用代码也能实现,就是贩判断队列是否为空if q.empty(),为空则去做.但这样会有问题,因为你的程序处于不断的循环状态.不可取 方式2,作为服务员角色的队列,主动告诉生产者程序,队列是不是空了,那这就要用到队列的q.task_done()方法.返回给队列的q.join()方法队列是否为空的状态. 这样在消费者程序中每次从队列get()后,执行一次q.task_done()方法.而生产者q.join()方法一直处于阻塞状态.只有当传过来的q.task_done() 中说明队列已经为空时,生产者程序才会中断阻塞状态,继续执行下面的代码. 如此以来代码应该优化成:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 简单的生产者消费者模型代码 5 1对多 6 ‘‘‘ 7 8 import threading,queue 9 10 def consumer(n): 11 ‘‘‘ 12 简单的一个消费者 13 :param n: 14 :return: 15 ‘‘‘ 16 print("我是消费者 [%s]"%n) 17 print("consumer [%s] get task: %s"%(n,q.get())) 18 q.task_done() # 通知生产者的标记 19 20 def producer(n): 21 count = 1 22 while True: 23 print("厨师%s生产了编号为%s的包子!"%(n,count)) 24 q.put(count) 25 count += 1 26 q.join() # 消费者.task_done()方法告诉的标记不为空,就一直阻塞 27 print("包子都被消费者吃掉了") 28 29 q = queue.Queue(maxsize=3) 30 c1 = threading.Thread(target=consumer,args=[1,]) 31 c2 = threading.Thread(target=consumer,args=[2,]) 32 c3 = threading.Thread(target=consumer,args=[3,]) 33 34 p = threading.Thread(target=producer,args=[‘ted‘,]) 35 36 c1.start() 37 c2.start() 38 c3.start() 39 p.start()
这里我们不在使用for循环,而是每次只生产1个包子.在实际的开发过程中也是这样,一个生产者一次只生产一个信息供消费者消费,而生产者消费者模型的本身就是为了代码根据实际情况随意的扩展消费者线程和生产者线程. 上面的代码实例是1对多的实例,上面的代码中生产者的代码只是打印,不消耗时间,所以感觉1对多的情况下消费者也不怎么排队,但是如果生产者代码要耗费0.5秒,每次生产包子都耗费0.5秒.那么执行程序这个感觉就是每次生产0.5秒,3个消费者就要排队了.所以我们要加一个厨师(生产者.) 代码如下:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 简单的生产者消费者模型代码 5 1对多 6 ‘‘‘ 7 8 import threading,queue 9 import time 10 11 def consumer(n): 12 ‘‘‘ 13 简单的一个消费者 14 :param n: 15 :return: 16 ‘‘‘ 17 print("我是消费者 [%s]"%n) 18 print("consumer [%s] get task: %s"%(n,q.get())) 19 q.task_done() # 通知生产者的标记 20 21 def producer(n): 22 count = 1 23 24 while True: 25 time.sleep(0.5) # 模拟生产耗费0.5秒 26 print("厨师%s生产了编号为%s的包子!"%(n,count)) 27 q.put(count) 28 count += 1 29 q.join() # 消费者.task_done()方法告诉的标记不为空,就一直阻塞 30 print("包子都被消费者吃掉了") 31 32 q = queue.Queue(maxsize=3) 33 c1 = threading.Thread(target=consumer,args=[1,]) 34 c2 = threading.Thread(target=consumer,args=[2,]) 35 c3 = threading.Thread(target=consumer,args=[3,]) 36 37 p = threading.Thread(target=producer,args=[‘ted‘,]) 38 p2 = threading.Thread(target=producer,args=[‘bob‘,]) 39 p3 = threading.Thread(target=producer,args=[‘lily‘,]) 40 c1.start() 41 c2.start() 42 c3.start() 43 p.start() 44 p2.start() 45 p3.start() 46 代码执行结果如下: 47 我是消费者 [1] 48 我是消费者 [2] 49 我是消费者 [3] 50 厨师ted生产了编号为1的包子! 51 厨师bob生产了编号为1的包子! 52 consumer [1] get task: 1 53 厨师lily生产了编号为1的包子! 54 consumer [2] get task: 1 55 consumer [3] get task: 1 56 厨师lily生产了编号为2的包子! 57 厨师ted生产了编号为2的包子! 58 厨师bob生产了编号为2的包子!
我们看到结果 生产者都生产了第二次,那么问题来了如果此时我们的生产者有100个呢,因为q.join()方法是同时获得状态的.而队列最大是3个,就一下子多出来97个在那里阻塞着. 而当队列里的三个包子消费掉了.那三个执行到q.join()代码的生产者线程又回在生产3个.这样就会导致生产者一直处于有97个处于put()阻塞状态.这和我们写代码的程序员的想法不一致,我们想的当然是当队列为空时,先把余下的97个put()减去3个,先前执行到q.join()方法的线程不动. 那么这样实现作为一个有强迫症的程序员的想法呢.答案是,根据生产者消费者模型可以随意扩展,消费者线程数和生产者线程数的特点.选择1对1的方式.既有多少个消费者,咱就有多少生产者.(其实就是没有好的办法,指定q.task_done指通知数量) 但是如果你非要在给3个消费者建立100个生产者,也是有办法的:(思路就是在生产之前对队列做判断,且每个生产者判断之前等待的时间不一致),但是效果不好,因为熟眠时间短,效果不好,睡眠时间长影响性能.所以不建议用. 代码如下:
1 #!/usr/bin/env python3.5 2 #__author__:"ted.zhou" 3 ‘‘‘ 4 简单的生产者消费者模型代码 5 1对多 6 ‘‘‘ 7 8 import threading,queue 9 import time 10 import random 11 def consumer(n): 12 ‘‘‘ 13 简单的一个消费者 14 :param n: 15 :return: 16 ‘‘‘ 17 print("我是消费者 [%s]"%n) 18 print("consumer [%s] get task: %s"%(n,q.get())) 19 q.task_done() # 通知生产者的标记 20 21 def producer(n): 22 count = 1 23 24 while True: 25 time.sleep(random.random()) 26 if q.qsize() < 2: 27 time.sleep(0.5) # 模拟生产耗费0.5秒 28 print("厨师%s生产了编号为%s的包子!"%(n,count)) 29 q.put(count) 30 count += 1 31 # q.join() # 消费者.task_done()方法告诉的标记不为空,就一直阻塞 32 # print("包子都被消费者吃掉了") 33 else: 34 q.join() 35 print("包子都被消费者吃掉了") 36 37 q = queue.Queue(maxsize=2) 38 c1 = threading.Thread(target=consumer,args=[1,]) 39 c2 = threading.Thread(target=consumer,args=[2,]) 40 c3 = threading.Thread(target=consumer,args=[3,]) 41 42 p = threading.Thread(target=producer,args=[‘ted‘,]) 43 p2 = threading.Thread(target=producer,args=[‘bob‘,]) 44 p3 = threading.Thread(target=producer,args=[‘lily‘,]) 45 c1.start() 46 c2.start() 47 c3.start() 48 p.start() 49 p2.start() 50 p3.start() 51 执行结果: 52 我是消费者 [1] 53 我是消费者 [2] 54 我是消费者 [3] 55 厨师bob生产了编号为1的包子! 56 consumer [1] get task: 1 57 包子都被消费者吃掉了 58 厨师lily生产了编号为1的包子! 59 consumer [2] get task: 1 60 包子都被消费者吃掉了 61 厨师ted生产了编号为1的包子! 62 consumer [3] get task: 1 63 包子都被消费者吃掉了 64 厨师lily生产了编号为2的包子! 65 厨师bob生产了编号为2的包子!
总之,你觉得生产者慢就给生产者加线程,如果觉得消费者慢,就给消费者加线程 总结: 1.生产者 和 消费者 1对1 情况最佳;2.你写的生产者生产信息?所花费的时间要比消费者消费信息所花费的时间短;(因为只有这样,消费者消费完还没到下一次消费,生产者就已经把queue队列又存满了) PS:上面的例子,我们是在一个程序里弄出来多个线程模拟的.算是之前说的两个独立的系统吗?当然不算,但是表现出来的结果是一样的. 生产环境中说的两个独立的系统,那可能都不是一个语言写的,比如生产者是python,而消费者是C语言写的.这种情况下,队列就不是用python的queue模块了,那就是真正的队列了:redis,zibbitMQ,zeroMQ等真正的MQ,后面会学习zibbitMQ
时间: 2024-10-12 03:25:29