python 进程/线程详解
进程定义:以一个整体的形式暴露给操作系统管理,它里面包含对各种资源的调用,内存的管理,网络接口的调用等等,对各种资源管理的集合,就可以叫做一个进程。
线程定义:线程是操作系统能够进行运算调度的最小单位(是一串指令的集合)。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
另说明:进程是不能直接操作CPU执行的,每个进程的执行都是默认创建一个主线程来操作CPU进行执行指令集合。
进程和线程的区别:
1:同一进程内的多个线程之间是共享内存空间,每个进程的内存是独立的。
2:同一个进程内的线程之间可以直接交流,而俩个进程如果想通讯,必须通过一个中间代理来实现,如(进程队列,管道,manage)。
3:创建新线程很简单,创建新进程需要对其父进程进行一次克隆。
4:一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程.
举例一个最简单的多线程实现并发效果:
1 import threading,time 2 def fun(n): # 定义一个函数,每次打印后等待2秒钟退出 3 print(n) 4 time.sleep(2) 5 for i in range(5): 6 t = ‘t%d‘ % i 7 # 通过线程并发执行5次 8 t = threading.Thread(target=fun,args=(‘jack‘,)) 9 t.start()
举例:一个继承式多线程写法(只是一种方法而已)
1 import threading,time 2 class MyThread(threading.Thread): 3 def __init__(self,n): 4 # 重写父类中的属性值 5 super(MyThread,self).__init__() 6 self.n = n 7 def run(self): # 这里的run为固定模式,就是重写父类中的run方法 8 print(‘输出的字符串为:‘,self.n) 9 time.sleep(2) 10 for i in range(50): # 一次执行了50个线程 11 n = ‘n‘+str(i) 12 t = MyThread(n) 13 t.start()
举例:判断每个线程执行的时间
1 class MyThread(threading.Thread): 2 ‘‘‘继承线程类并重写run方法‘‘‘ 3 def __init__(self,n): 4 super(MyThread,self).__init__() 5 self.n = n 6 # 函数为启动的子线程执行的 7 def run(self): 8 print(‘输出的字符串为:‘,self.n) 9 time.sleep(2) 10 # 以下为主线程的执行 11 obj_list = [] 12 for i in range(50): 13 n = ‘n‘+str(i) 14 t = MyThread(‘t1‘) 15 t.start() 16 # 将每个线程内存添加到列表中 17 obj_list.append(t) 18 new_time = time.time() 19 for j in obj_list: 20 # 等待每个线程执行完成后,计算线程的用时 21 j.join() 22 out_time = time.time() - new_time 23 print(‘用时:‘,out_time) 24 print(‘所有线程一共用时:%s‘ % (time.time()-new_time))
守护线程:主线程退出时不会等待守护线程是否执行完毕,只会等待非守护线程执行完才退出。
使用情况:一般用于socketserver端,启动多个守护线程,当主线程关闭了,子线程就自动关闭,否则关闭主线程需等待子线程关闭了才可以关闭。
举例:将子线程修改为守护线程
1 class MyThread(threading.Thread): 2 def __init__(self,n): 3 super(MyThread,self).__init__() 4 self.n = n 5 def run(self): 6 print(‘输出的字符串为:‘,self.n) 7 time.sleep(2) 8 new_time = time.time() 9 for i in range(50): 10 n = ‘n‘+str(i) 11 t = MyThread(‘t1‘) 12 # 如下将线程t 通过setDaemon即声明为一个守护线程 13 t.setDaemon(True) 14 t.start() 15 print(‘所有线程一共用时:%s‘ % (time.time()-new_time))
GIL (全局解释器锁)
对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:
1. 设置GIL
2. 切换到一个线程去运行
3. 运行:
a. 指定数量的字节码指令,或者
b. 线程主动让出控制(可以调用time.sleep(0))
4. 把线程设置为睡眠状态
5. 解锁GIL
6. 再次重复以上所有步骤
举例:线程手工加锁方法
1 import threading 2 lock = threading.Lock() # 实例化一个锁 3 num = 0 4 def fun(): 5 lock.acquire() #加锁 6 global num 7 num += 1 8 lock.release() #解锁 这样就在并发线程中将多个线程进行串行,也实现了对同一块内存进行操作 9 for i in range(50): 10 n = threading.Thread(target=fun,args=()) 11 n.start() 12 print(‘num:‘,num) 13 14 输出:num: 50
递归锁:一个外部锁内还有个子锁。(就是锁中有锁)
当程序中多个地方用到锁的时候,要使用递归锁: RLock()互斥锁(Mutex):一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,
如果2个线程同时修改同一份数据就会出现问题,这时候就要使用互斥锁以保证数据在操作时候是加锁的其他线程无法修改。
信号量(Semaphore)
互斥锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的线程更改数据。
功能:就是用来控制同时运行的线程数量
比如;我们启动100个线程,通过信号量可以控制同一时刻有几个线程运行。
举例:
1 import threading 2 def fun(n): 3 semaph.acquire() #加锁 4 print(n) 5 time.sleep(2) 6 semaph.release() #解锁 7 8 semaph = threading.BoundedSemaphore(5) # 定义同时有5个线程运行 9 for i in range(23): 10 t = threading.Thread(target=fun,args=(i,)) 11 t.start()
events 事件
通过设置的标志位实现2个或多个线程间的交互
event = threading.Event()
event.wait() 等待设定标志位
event.set() 设定标志位
event.clear() 清空标志位
event.is_set() 判断是否设定标志位
举例:红绿灯
1 import threading 2 event = threading.Event() 3 4 def light(): # 定义一个红绿灯(也就是一个规则) 5 count = 0 # 默认0秒 6 event.set() # 首先设置开始默认标志位(绿灯) 7 while True: 8 if count > 5 and count < 10: 9 print(‘\033[31;1m当前是红灯,所有车停止等待\033[0m‘) 10 event.clear() # 当满足条件清空标志位(红灯) 11 elif count > 10: 12 print(‘变灯了‘) 13 event.set() # 进行设置标志位 14 count = 0 # 重新计数 15 else: 16 print(‘\033[32;1m当前是绿灯,所有车可以经过\033[0m‘) 17 count += 1 18 time.sleep(1) 19 def car(arg): 20 while True: 21 # 通过判断标志为执行代码 22 if event.is_set(): # 判断如果设置了标志位,则执行 23 print(‘汽车:%s 可以通行‘ % arg) 24 time.sleep(1) 25 else: 26 print(‘红灯了。汽车要停车‘) 27 event.wait() # 没有设置标志位,则进入等待 28 print(‘变灯了,可以开车‘) 29 lit = threading.Thread(target=light) # 启动灯 30 lit.start() 31 ca = threading.Thread(target=car,args=(‘雪铁龙‘,)) # 启动车 32 ca.start()
queue 队列:类似于一条管道,只应用于当前进程内的线程相互数据访问使用,跳出当前进程就会失效。
注意:队列都是在内存中操作,进程退出,队列清空,另队列也是一个阻塞的形态。
功能:解耦,提高效率
常用方法:
queue.Queue(maxsize=0) :先入先出
queue.LifoQueue(maxsize=0): 后进先出
queue.PriorityQueue(maxsize=0):存储数据时可设置优先级的队列(元组格式:优先级,数据)
queue.deque 双线队列
队列的方法
put:放数据,Queue.put()默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout确定。当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常
get:取数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间
empty:如果队列为空,返回True,否则返回False
qsize:显示队列中真实存在的元素长度
maxsize:最大支持的队列长度,使用时无括号
join:实际上意味着等到队列为空,再执行别的操作
task_done:在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
full:如果队列满了,返回True
举例:先入先出
1 import queue 2 q = queue.Queue(5) #如果不设置长度,默认为无限长 3 print(q.maxsize) # 打印最大长度 4 q.put(‘a‘) 5 q.put(‘b‘) 6 q.put(‘c‘) 7 print(q.qsize()) # 打印队列元素长度 8 print(q.get()) 9 print(q.get())
举例:后进先出
1 import queue 2 q = queue.LifoQueue() 3 q.put(‘a‘) 4 q.put(‘b‘) 5 print(q.get()) 6 打印:b
举例:优先级队列
需要注意的是,优先级队列put的是一个元组,格式为:(优先级,数据),优先级数越小,级别越高
1 import queue 2 q = queue.PriorityQueue() 3 q.put((2,‘a‘)) 4 q.put((1,‘b‘)) 5 q.put((3,‘c‘)) 6 print(q.get()) 7 print(q.get()) 8 print(q.get()) 9 输出为: 10 (1, ‘b‘) 11 (2, ‘a‘) 12 (3, ‘c‘)
生产者 消费者模型
在 工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。
产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,
生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型。结构如下:
生产者 --》 缓存区 --》消费者
优点:
1:解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。
如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
2:支持并发:由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,
就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
3:支持忙闲不均:缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,
消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉
举例:利用线程队列技术 吃包子
1 import queue,time,threading 2 q = queue.Queue(10) # 默认定义最大队列长度为10 3 def P(name): 4 while True: 5 if q.qsize() <= 5: # 当队列中长度小于等于5时候,开始生产包子 6 print(‘目前剩余%d个包子,不足5个,开始生产,保持随时有10个包子出售‘ % (q.qsize())) 7 for i in range(10): 8 baozi = ‘%s开始生产包子,生产了%d个包子‘ % (name,i) 9 print(baozi) 10 q.put(i) # 添加 11 else: 12 print(‘当前剩余包子数量:%d个,先不生产‘ % (q.qsize())) 13 time.sleep(2) 14 def C(name): 15 while True: 16 if q.qsize() > 5: # 当队列长度大于5,开始买包子 17 for l in name: 18 print(‘%s 来了,买5个包子‘ % l) 19 for i in range(5): 20 print(‘交给%s %d个包子‘ % (l,i)) 21 q.get() 22 print(‘5个包子已交给%s‘ % l) 23 else: 24 print(‘包子不够,等待生产包子‘) 25 time.sleep(1) 26 27 P1 = threading.Thread(target=P,args=(‘jack chen‘,)) # 并发线程的生产者 28 P1.start() # 启动生产者 29 C1 = threading.Thread(target=C,args=([‘bard‘,‘vivi‘,‘anne‘],)) # 并发线程的消费者 30 C1.start() # 启动消费者
---------------------------以上为线程功能
举例:进程和线程共存
1 import multiprocessing,time,threading 2 def thread_fun(): 3 print(‘当前线程ID:%d‘ % threading.get_ident()) 4 def fun(n): 5 time.sleep(2) 6 print(n) 7 # 线程启动 8 t = threading.Thread(target=thread_fun) 9 t.start() 10 if __name__ == ‘__main__‘: 11 for i in range(10): 12 # 进程启动 13 pro = multiprocessing.Process(target=fun,args=(‘进程并发:%d‘ % i,)) 14 pro.start()
进程间数据交互解释:进程内存各自独立,如何进行数据交互了。找中间件,有3种方法:(进程队列,管道,Managers数据共享)
举例1:实现数据传递(进程队列)
from multiprocessing import Queue,Process def fun(qq): # 这里的参数qq是启动进程时候传入的队列 qq.put([‘jack‘,‘bard‘,‘vivi‘]) if __name__ == ‘__main__‘: q = Queue() # 实例化进程队列 P = Process(target=fun,args=(q,)) # 将队列传入子进程 P.start() print(q.get()) # 打印队列数据 P.join()
第二种实现进程间数据传输方法:Pipes (管道)
举例2:实现数据传递(socket技术)
1 from multiprocessing import Process,Pipe 2 def fun(conn): 3 conn.send([‘jack‘,‘bard‘,‘vivi‘]) 4 conn.send([‘jack1‘,‘bard1‘,‘vivi1‘]) 5 print(conn.recv()) 6 if __name__ == ‘__main__‘: 7 # 这里pipe管道有2个头,一头定义变量person,另一头定义变量child 8 person,child = Pipe() 9 # 将child传给子进程fun函数,进行数据的传递 10 P = Process(target=fun,args=(child,)) 11 P.start() 12 # 这里主进程使用person打印子进程发来的数据 13 print(person.recv()) 14 print(person.recv()) 15 # 主进程也可以发送数据给子进程 16 person.send(‘父进程发消息给子进程‘) 17 P.join()
第三种:Managers 可以实现2个进程间数据共享,并可以修改共享数据
举例3:实现数据传递(Managers)
1 from multiprocessing import Process,Manager 2 import os 3 def manag(d,l): # 这个子进程函数功能实现修改共享数据 4 d[os.getpid()] = os.getpid() 5 l.append(os.getpid()) 6 7 def out(d,l): # 这个子进程函数功能实现提取打印共享数据 8 print(d) 9 for line in l: 10 print(line) 11 12 if __name__ == ‘__main__‘: 13 manager = Manager() 14 d = manager.dict() # 生成一个字典,可实现在多个进程间共享和传递数据 15 l = manager.list(range(5)) # 生成一个列表,可实现在多个进程间共享和传递数据 16 id_list = [] # 临时存放进程内存地址 17 for i in range(10): 18 # 先执行manag函数进行数据的添加修改 19 P = Process(target=manag,args=(d,l,)) 20 P.start() 21 id_list.append(P) 22 for res in id_list: 23 res.join() 24 print(d) 25 print(l) 26 # 再调用out函数对数据进行打印输出 27 P1 = Process(target=out,args=(d,l)) # 再将字典,列表传入另一个子进程,实现数据共享和传递 28 P1.start() 29 P1.join()
进程锁:就是防止多个进程打印数据时候同时抢占屏幕,导致混乱输出。
举例:
from multiprocessing import Process,Lock def fun(lk,mess): lk.acquire() # 加锁 print(‘输出:‘,mess) # 屏幕输出 lk.release() # 解锁 if __name__ == ‘__main__‘: lock = Lock() for i in range(20): P = Process(target=fun,args=(lock,i)) P.start()
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止,
进程池的2个方法:
1:apply (同步,串行)
2:apply_async (异步,并行)
举例:
1 from multiprocessing import Process,Pool 2 import os,time 3 def fun(i): 4 time.sleep(2) 5 print(i+100) 6 def bar(i): 7 print(‘finish:%s‘ % (str(os.getppid()))) 8 if __name__ == ‘__main__‘: 9 pool = Pool(processes=5) # 允许进程池同时放入5个进程 10 for i in range(10): # 启动10个进程 11 # out = pool.apply(func=fun,args=(i,)) # 串行写法 12 # pool.apply_async(func=fun,args=(i,)) # 并行写法 13 pool.apply_async(func=fun,args=(i,),callback=bar) # 并行,callback表示执行完子进程的fun函数后,再由主进程再执行bar函数收尾 14 print(‘end‘) 15 pool.close() # 注意这里 close,join的顺序是固定了。不能变动 16 pool.join() # 注意顺序,必须有,进程池中的进程执行完毕后,再关闭