1 . 线程队列
from multiprocessing Queue , JoinableQueue #进程IPC队列
from queue import Queue #线程队列 先进先出
from queue import LifoQueue #后进先出的
方法都是一样的 : put , get , put_nowait , get_nowait , full , empty , qsize
队列 Queue : 先进先出 , 自带锁 , 数据安全
栈 LifoQueue : 后进先出 , 自带锁 , 数据安全
q = LifoQueue(5)
q.put(3)
q.put_nowait(4)
print(q.get()) #谁最后进的,就会先取谁
q.get_nowait()
print(q.full())
pirnt(q.empty())
print(q.qsize())
from queue import PriorityQueue #优先级队列
q = PriorityQueue()
q.put((10,"aaa"))
q.put((4,"bbb"))
print(q.get())
线程池
Threading 没有线程池的
Multiprocessing Pool
concurrent.futures 帮助管理线程池和进程池
import time
from threading import currentThread,get_ident
from concurrent.futures import ThreadPoolExecutor #帮助启动线程池
from concurrent.futures import ProcessPoolExecutor #帮助启动进程池
def func(i):
time.sleep(1)
print("in%s%s"%(i,currentThread()))
return i**2
def back(fn):
print(fn.result(),currentThread())
#map启动多线程任务
t = ThreadPoolExecutor(5)
t.map(func,range(20)) == for i in range(20):
t.submit(func,i)
#submit异步提交任务
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(func,i)
t.shutdown()
print("main:",currentThread()) #起多少个线程池 , 5*CPU的个数
#获取任务结果
t = ThreadPoolExecutor(5)
li = []
for i in range(20):
ret = t.submit(func,i)
li.append(ret)
t.shutdown()
for i in li:
print(i.result())
print("main:",currentThread())
#回调函数
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(func,i).add_done_callback(back)
#回调函数(进程版)
import os,time
from concurrent.futures import ProcessPoolExecutor
def func(i):
print("in%s%s"%(i,os.getpid()))
return i**2
def back(fn):
print(fn.result(),os.getpid())
if __name__ == "__main__":
p = ProcessPoolExecutor(5)
for i in range(20):
p.submit(func,i).add_done_callback(back)
print("main:",os.getpid())
multiprocessing模块自带进程池的
threading模块是没有线程池的
concurrent.futures 进程池和线程池 : 高度封装 , 进程池/线程池的统一的使用方式
创建线程池/进程池 ProcessPoolExecutor ThreadPoolExecutor
ret .result()获取结果,如果想实现异步效果,应该使用列表
shutdown == close + join 同步控制
add_done_callback 回调函数,在回调函数内接收的参数是一个对象,需要通过result来获取返回值. 进程池的回调函数仍然在主进程中执行,但是线程池的回调函数是在线程中执行.
进程 : 资源分配的最小单位 , 班级
线程 : CPU调度最小单位 , 人
Cpython线程不能利用多核的 ,多线程无法利用多核 ,一个线程能同时执行多个任务.
协程 : 能在一条线程的基础上 ,再过个任务之间互相切换 .节省了线程开启的消耗.
协程是从python代码的级别调度的 , 正常的线程是CPU调度的最小单位 ; 协程的调度并不是由操作系统来完成的.
之前学过的协程在两个任务之间相互切换的是生成器函数:yield
def func():
print(1)
x = yield "aaa"
print(x)
yield "bbb"
g = func()
print(next(g))
print(g.send("***"))
在多个函数之间互相切换的功能 --- 协程
def consumer():
while True:
x = yield
print(x)
def producer():
g = consumer()
next(g) #预激
for i in range(20):
g.send(i)
producer()
yield只有程序之间的切换,没有重利用任何IO操作的时间
模块的安装 :
pip3 install 要安装的模块的名字
使用协程减少IO操作带来的时间消耗
from gevent import monkey ; monkey.patch_all()
import gevent,time
def eat():
print("吃")
time.sleep(2)
print("吃完了")
def play():
print("玩")
time.sleep(1)
print("玩完了")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
没有执行 , 为什么没执行???是需要开启么?
没有开启但是切换了
gevent帮我们做了切换,做切换是有条件的,遇到IO才切换
gevent不认识除了gevent这个模块内以外的IO操作
使用join可以一直阻塞直到协程任务完成
帮助gevent来认识其他模块中的阻塞
from gevent import monkey;monkey.patch_all() 写在其他模块导入之前.
使用协程来实现TCP协议 :
服务器 :
from gevent import monkey;monkey.patch_all()
import gevent,socket
def func(conn):
while 1:
conn.send(b"hello")
print(conn.recv(1024))
sk = socket.socket()
sk.bind(("127.0.0.1",9090))
sk.listen()
while 1:
conn,addr = sk.accept()
gevent.spawn(func,conn)
客户端 :
import socket
from threading import Thread
def client():
sk = socket.socket()
sk.connect(("127.0.0.1",9090))
while 1:
print(sk.recv(1024))
sk.send(b"bye")
for i in range(20):
Thread(target=client).start()
4c 并发50000 qps
5个进程
20个线程
500个协程
协程能够在单核的情况极大地提高CPU的利用率
协程不存在数据不安全 , 也不存在线程切换/创造的时间开销 ; 切换时用户级别的,程序不会因为协程中某一个任务进入阻塞状态而使整条线程阻塞
线程的切换 :
时间片到了 降低了CPU的效率
IO会切 提高了CPU的效率
原文地址:https://www.cnblogs.com/fengkun125/p/9403360.html