python Event对象、队列和多进程基础

Event对象

用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了event对象

event对象默认为假(Flase),即遇到event对象在等待就阻塞线程的执行。

示例1:主线程和子线程间通信,代码模拟连接服务器

 1 import threading
 2 import time
 3 event=threading.Event()
 4
 5 def foo():
 6     print(‘wait server...‘)
 7     event.wait()    #括号里可以带数字执行,数字表示等待的秒数,不带数字表示一直阻塞状态
 8     print(‘connect to server‘)
 9
10 t=threading.Thread(target=foo,args=())  #子线程执行foo函数
11 t.start()
12 time.sleep(3)
13 print(‘start server successful‘)
14 time.sleep(3)
15 event.set()     #默认为False,set一次表示True,所以子线程里的foo函数解除阻塞状态继续执行

示例2:子线程与子线程间通信

 1 import threading
 2 import time
 3 event=threading.Event()
 4
 5 def foo():
 6     print(‘wait server...‘)
 7     event.wait()    #括号里可以带数字执行,数字表示等待的秒数,不带数字表示一直阻塞状态
 8     print(‘connect to server‘)
 9 def start():
10     time.sleep(3)
11     print(‘start server successful‘)
12     time.sleep(3)
13     event.set()     #默认为False,set一次表示True,所以子线程里的foo函数解除阻塞状态继续执行
14 t=threading.Thread(target=foo,args=())  #子线程执行foo函数
15 t.start()
16 t2=threading.Thread(target=start,args=())  #子线程执行start函数
17 t2.start()

示例3: 多线程阻塞

 1 import threading
 2 import time
 3
 4 event=threading.Event()
 5 def foo():
 6     while not event.is_set():   #返回event的状态值,同isSet
 7         print("wait server...")
 8         event.wait(2)   #等待2秒,如果状态为False,打印一次提示继续等待
 9     print("connect to server")
10
11 for i in range(5):  #5个子线程同时等待
12     t=threading.Thread(target=foo,args=())
13     t.start()
14
15 print("start server successful")
16 time.sleep(10)
17 event.set()   # 设置标志位为True,event.clear()是回复event的状态值为False

queue队列

队列是一只数据结构,数据存放方式类似于列表,但是取数据的方式不同于列表。

队列的数据有三种方式:

  1、先进先出(FIFO),即哪个数据先存入,取数据的时候先取哪个数据,同生活中的排队买东西

  2、先进后出(LIFO),同栈,即哪个数据最后存入的,取数据的时候先取,同生活中手枪的弹夹,子弹最后放入的先打出

  3、优先级队列,即存入数据时候加入一个优先级,取数据的时候优先级最高的取出

代码实现

先进先出:put存入和get取出

 1 import queue
 2 import threading
 3 import time
 4 q=queue.Queue(5) #加数字限制队列的长度,最多能够存入5个数据,有取出才能继续存入
 5 def put():
 6     for i in range(100):    #顺序存入数字0到99
 7         q.put(i)
 8         time.sleep(1)   #延迟存入数字,当队列中没有数据的时候,get函数取数据的时候会阻塞,直到有数据存入后才从阻塞状态释放取出新数据
 9 def get():
10     for i in range(100):    #从第一个数字0开始取,直到99
11         print(q.get())
12
13 t1=threading.Thread(target=put,args=())
14 t1.start()
15 t2=threading.Thread(target=get,args=())
16 t2.start()

先进先出:join阻塞和task_done信号

 1 import queue
 2 import threading
 3 import time
 4 q=queue.Queue(5) #加数字限制长度
 5 def put():
 6     for i in range(100):
 7         q.put(i)
 8     q.join()    #阻塞进程,直到所有任务完成,取多少次数据task_done多少次才行,否则最后的ok无法打印
 9     print(‘ok‘)
10
11 def get():
12     for i in range(100):
13         print(q.get())
14         q.task_done()   #必须每取走一个数据,发一个信号给join
15     # q.task_done()   #放在这没用,因为join实际上是一个计数器,put了多少个数据,
16                       #计数器就是多少,每task_done一次,计数器减1,直到为0才继续执行
17
18 t1=threading.Thread(target=put,args=())
19 t1.start()
20 t2=threading.Thread(target=get,args=())
21 t2.start()

先进后出:

 1 import queue
 2 import threading
 3 import time
 4
 5 q=queue.LifoQueue()
 6 def put():
 7     for i in range(100):
 8         q.put(i)
 9     q.join()
10     print(‘ok‘)
11
12 def get():
13     for i in range(100):
14         print(q.get())
15         q.task_done()
16
17 t1=threading.Thread(target=put,args=())
18 t1.start()
19 t2=threading.Thread(target=get,args=())
20 t2.start()

按优先级:不管是数字、字母、列表、元组等(字典、集合没测),使用优先级存数据取数据,队列中的数据必须是同一类型,都是按照实际数据的ascii码表的顺序进行优先级匹配,汉字是按照unicode表(亲测)

列表

 1 import queue
 2 q=queue.PriorityQueue()
 3 q.put([1,‘aaa‘])
 4 q.put([1,‘ace‘])
 5 q.put([4,333])
 6 q.put([3,‘afd‘])
 7 q.put([5,‘4asdg‘])
 8 #1是级别最高的,
 9 while not q.empty():#不为空时候执行
10     print(q.get())

元组

1 import queue
2 q=queue.PriorityQueue()
3 q.put((1,‘aaa‘))
4 q.put((1,‘ace‘))
5 q.put((4,333))
6 q.put((3,‘afd‘))
7 q.put((5,‘4asdg‘))
8 while not q.empty():#不为空时候执行
9     print(q.get())

汉字

1 import queue
2 q=queue.PriorityQueue()
3 q.put(‘我‘)
4 q.put(‘你‘)
5 q.put(‘他‘)
6 q.put(‘她‘)
7 q.put(‘ta‘)
8 while not q.empty():
9     print(q.get())

生产者与消费者模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

 1 import time,random
 2 import queue,threading
 3
 4 q = queue.Queue()
 5
 6 def Producer(name):
 7   count = 0
 8   while count <10:
 9     print("making........")
10     time.sleep(random.randrange(3))
11     q.put(count)
12     print(‘Producer %s has produced %s baozi..‘ %(name, count))
13     count +=1
14     #q.task_done()
15     #q.join()
16     print("ok......")
17 def Consumer(name):
18   count = 0
19   while count <10:
20     time.sleep(random.randrange(4))
21     if not q.empty():
22         data = q.get()
23         #q.task_done()
24         #q.join()
25         print(data)
26         print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ %(name, data))
27     else:
28         print("-----no baozi anymore----")
29     count +=1
30
31 p1 = threading.Thread(target=Producer, args=(‘A‘,))
32 c1 = threading.Thread(target=Consumer, args=(‘B‘,))
33 # c2 = threading.Thread(target=Consumer, args=(‘C‘,))
34 # c3 = threading.Thread(target=Consumer, args=(‘D‘,))
35 p1.start()
36 c1.start()
37 # c2.start()
38 # c3.start()

多进程基础

由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程

多进程优点:可以利用多核、实现并行运算

多进程缺点:切换开销太大、进程间通信困难

multiprocessing模块

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的环境。

计算密集型串行计算:计算结果大概25秒左右

 1 import time
 2
 3 def foo(n):    #计算0到1亿的和
 4     ret=0
 5     for i in range(n):
 6         ret+=i
 7     print(ret)
 8
 9 def bar(n):    #计算1到10万的乘积
10     ret=1
11     for i in range(1,n):
12         ret*=i
13     print(ret)
14 if __name__ == ‘__main__‘:
15     s=time.time()
16     foo(100000000)
17     bar(100000)
18     print(time.time()-s) 

计算密集型多进程计算:计算结果13秒左右

 1 import multiprocessing
 2 import time
 3
 4 def foo(n):
 5     ret=0
 6     for i in range(n):
 7         ret+=i
 8     print(ret)
 9
10 def bar(n):
11     ret=1
12     for i in range(1,n):
13         ret*=i
14     print(ret)
15
16 if __name__ == ‘__main__‘:
17     s=time.time()
18     p1 = multiprocessing.Process(target=foo,args=(100000000,))  #创建子进程,target: 要执行的方法;name: 进程名(可选);args/kwargs: 要传入方法的参数。
19     p1.start()  #同样调用的是类的run方法
20     p2 = multiprocessing.Process(target=bar,args=(100000,) )  #创建子进程
21     p2.start()
22     p1.join()
23     p2.join()
24     print(time.time()-s) 

继承类用法

 1 from multiprocessing import Process
 2 import time
 3
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7         # self.name = name
 8
 9     def run(self):
10         print (‘hello‘, self.name,time.ctime())
11         time.sleep(1)
12
13 if __name__ == ‘__main__‘:
14     p_list=[]
15     for i in range(3):
16         p = MyProcess()
17         p.start()
18         p_list.append(p)
19
20     for p in p_list:
21         p.join()
22
23     print(‘end‘)

方法示例

 1 from multiprocessing import Process
 2 import os
 3 import time
 4
 5 def info(name):
 6     print("name:",name)
 7     print(‘parent process:‘, os.getppid())    #获取父进程的id号
 8     print(‘process id:‘, os.getpid())    #获取当前进程pid
 9     print("------------------")
10     time.sleep(5)
11 if __name__ == ‘__main__‘:
12     info(‘main process‘)    #第一次获取的是ide工具的进程和该代码文件的进程
13     p1 = Process(target=info, args=(‘alvin‘,))    #该代码文件的进程和p1的进程
14     p1.start()
15     p1.join()

对象实例的方法

实例方法:
  is_alive():返回进程是否在运行。
  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
  start():进程准备就绪,等待CPU调度
  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
  terminate():不管任务是否完成,立即停止工作进程
属性:
  daemon:和线程的setDeamon功能一样
  name:进程名字。
  pid:进程号。

时间: 2024-08-29 07:24:50

python Event对象、队列和多进程基础的相关文章

Python开发基础--- Event对象、队列和多进程基础

Event对象 用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了event对象 event对象默认为假(Flase),即遇到event对象在等待就阻塞线程的执行. 示例1:主线程和子线程间通信,代码模拟连接服务器 1 import threading 2 import time 3 event=threading.Event() 4 5 def foo(): 6 print('wait server...') 7 event.wait() #括号里可

Python开发基础-Day31 Event对象、队列和多进程基础

Event对象 用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了event对象 event对象默认为假(Flase),即遇到event对象在等待就阻塞线程的执行. 示例1:主线程和子线程间通信,代码模拟连接服务器 1 import threading 2 import time 3 event=threading.Event() 4 5 def foo(): 6 print('wait server...') 7 event.wait() #括号里可

Day28:Event对象、队列、multiprocessing模块

一.Event对象 线程的一个关键特性是每个线程都是独立运行且状态不可预测.如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手.为了解决这些问题,我们需要使用threading库中的Event对象. 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生.在初始情况下,Event对象中的信号标志被设置为假.如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真.一个线程如果将一个E

Python - 面对对象(基础)

目录 Python - 面对对象(基础) 一. 概述 二. 创建类和对象 三. 面向对象三大特征 封装 继承 多态 Python - 面对对象(基础) 一. 概述 面向过程:根据业务逻辑从上到下写垒代码 函数式:将某功能代码封装到函数中,日后便无需重复编写,仅调用函数即可 面向对象:对函数进行分类和封装,让开发"更快更好更强..." 面向过程编程(Object Oriented Programming,OOP,面向对象程序设计) 最易被初学者接受,其往往用一长段代码来实现指定功能,开发

深入理解python(一)python语法总结:基础知识和对python中对象的理解

用python也用了两年了,趁这次疫情想好好整理下. 大概想法是先对python一些知识点进行总结,之后就是根据python内核源码来对python的实现方式进行学习,不会阅读整个源码,,,但是应该会把数据结构的实现.函数调用过程.以及python虚拟机的基本原理根据源码解释下. 当然限于笔者只是一个弱鸡,,,如内容有疏漏的地方或者是一些错误,希望看到的大佬不吝赐教. 第一部分 python语法总结 当然如果对python语法还是一无所知的同学请移步缪雪峰或者菜鸟教程等学习网站看一遍再过来,,,

11.python并发入门(part5 event对象)

一.引入event. 每个线程,都是一个独立运行的个体,并且每个线程的运行状态是无法预测的. 如果一个程序中有很多个线程,程序的其他线程需要判断某个线程的运行状态,来确定自己下一步要执行哪些操作. threading模块中的event对象恰好能做到这一点,event对象包含了一个可以通过线程设置的一个信号标志位,它允许线程一直等待某些事件的发生. 在初始化默认的情况下,event对象中的信号标识被设置为"假",如果这时,有一个线程等待这个event对象,而这个event对象的信号标志为

11.python并发入门(part10 多进程之间实现通信,以及进程之间的数据共享)

一.进程队列. 多个进程去操作一个队列中的数据,外观上看起来一个进程队列,只是一个队列而已,单实际上,你开了多少个进程,这些进程一旦去使用这个队列,那么这个队列就会被复制多少份. (队列=管道+锁) 这么做的主要原因就是,不同进程之间的数据是无法共享的. 下面是使用进程队列使多进程之间互相通信的示例: 下面这个例子,就是往进程队列里面put内容. #!/usr/local/bin/python2.7 # -*- coding:utf-8 -*- import multiprocessing de

Python开发Day9(多线程多进程)

python线程: 介绍: Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元. 使用: Threading方法 .start() : 激活线程 .getName(): 获取线程的名称 .setName() : 设置线程的名称 .name : 获取或设置线程的名称 .is_alive() : 判断线程是否为激活状态 .isAlive() :判断线程是否为激活状态 .setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必

python中的Queue与多进程(multiprocessing)

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程 一.先说说Queue(队列对象) Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的"先吃先拉"与"后吃先吐",其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多 import Queue q = Queue.Queue(10