线程进阶之线程队列、线程池和协程

本节目录:

1.线程队列

2.线程池

3.协程

一、线程队列

  线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢?

  queue队列 :使用import queue,用法与进程Queue一样

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  class queue.Queue(maxsize=0) #先进先出

import queue #不需要通过threading模块里面导入,直接import queue就可以了,这是python自带的
#用法基本和我们进程multiprocess中的queue是一样的
q=queue.Queue()
q.put(‘first‘)
q.put(‘second‘)
q.put(‘third‘)
# q.put_nowait() #没有数据就报错,可以通过try来搞
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #没有数据就报错,可以通过try来搞
‘‘‘
结果(先进先出):
first
second
third
‘‘‘

先进先出示例代码

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue() #队列,类似于栈,栈我们提过吗,是不是先进后出的顺序啊
q.put(‘first‘)
q.put(‘second‘)
q.put(‘third‘)
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
‘‘‘
结果(后进先出):
third
second
first
‘‘‘

先进后出示例代码

  class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((-10,‘a‘))
q.put((-5,‘a‘))  #负数也可以
# q.put((20,‘ws‘))  #如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序
# q.put((20,‘wd‘))
# q.put((20,{‘a‘:11})) #TypeError: unorderable types: dict() < dict() 不能是字典
# q.put((20,(‘w‘,1)))  #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序

q.put((20,‘b‘))
q.put((20,‘a‘))
q.put((0,‘b‘))
q.put((30,‘c‘))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
‘‘‘
结果(数字越小优先级越高,优先级高的优先出队):
‘‘‘

优先级队列示例代码

  这三种队列都是线程安全的,不会出现多个线程抢占同一个资源或数据的情况。

二、线程池

  Python标准模块——concurrent.futures

  到这里就差我们的线程池没有遇到了,我们用一个新的模块给大家讲,早期的时候我们没有线程池,现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,他俩用法上是一样的。

  为什么要将进程池和线程池放到一起呢,是为了统一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一样,而且只要通过这个concurrent.futures导入就可以直接用他们两个了

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务

#map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作

#shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

#result(timeout=None)
取得结果

#add_done_callback(fn)
回调函数

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print(‘%s打印的:‘%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
# tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只需要将上面的ThreadPoolExecutor改为ProcessPoolExecutor就行了,其他都不用改
#异步执行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs):  可以传任意形式的参数
    t_lst.append(t)  #
    # print(t.result())
    #这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果
tpool.shutdown() #起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕
print(‘主线程‘)
for ti in t_lst:
    print(‘>>>>‘,ti.result())

# 我们还可以不用shutdown(),用下面这种方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print(‘>>>>‘, ti.result(),n)
#     time.sleep(2) #每个两秒去去一次结果,哪个有结果了,就可以取出哪一个,想表达的意思就是说不用等到所有的结果都出来再去取,可以轮询着去取结果,因为你的任务需要执行的时间很长,那么你需要等很久才能拿到结果,通过这样的方式可以将快速出来的结果先拿出来。如果有的结果对象里面还没有执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪一个的结果,可以通过枚举enumerate来搞,记录你是哪一个位置的结果对象的结果已经被取过了,取过的就不再取了

#结果分析: 打印的结果是没有顺序的,因为到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,但是最后的我们通过结果对象取结果的时候拿到的是有序的,因为我们主线程进行for循环的时候,我们是按顺序将结果对象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主线程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16

ThreadPoolExecutor的简单使用

  ThreadPoolExecutor的使用:

只需要将这一行代码改为下面这一行就可以了,其他的代码都不用变
tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就会发现为什么将线程池和进程池都放到这一个模块里面了,用法一样

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print(‘%s is runing‘ %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == ‘__main__‘:

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])

map的使用

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print(‘结果为:%s‘%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)

回调函数简单应用

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print(‘<进程%s> get %s‘ %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {‘url‘:url,‘text‘:respone.text}

def parse_page(res):
    res=res.result()
    print(‘<进程%s> parse %s‘ %(os.getpid(),res[‘url‘]))
    parse_res=‘url:<%s> size:[%s]\n‘ %(res[‘url‘],len(res[‘text‘]))
    with open(‘db.txt‘,‘a‘) as f:
        f.write(parse_res)

if __name__ == ‘__main__‘:
    urls=[
        ‘https://www.baidu.com‘,
        ‘https://www.python.org‘,
        ‘https://www.openstack.org‘,
        ‘https://help.github.com/‘,
        ‘http://www.sina.com.cn/‘
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

回调函数的应用

三、协程

  线程实现并发的最小单位

  并发:记录状态+切换

1.生成器版生成器(仅仅是模仿了下大概的思路,实质没有节省资源)

import time

def f1():
    for i in range(10):
        time.sleep(0.5)
        print(‘f1>>‘,i)
        yield

def f2():
    g = f1()
    for i in range(10):
        time.sleep(0.5)
        print(‘f2>>‘, i)
        next(g)

f1()
f2()

生成器版协程

2.greenlet版协程

大概和生成器版差不多,两个方法来回切换。伪协程!

import time
from greenlet import greenlet
def f1(s):
    print(‘第一次f1‘+s)
    g2.switch(‘taibai‘)  #切换到g2这个对象的任务去执行
    time.sleep(1)
    print(‘第二次f1‘+s)
    g2.switch()
def f2(s):
    print(‘第一次f2‘+s)
    g1.switch()
    time.sleep(1)
    print(‘第二次f2‘+s)
g1 = greenlet(f1)  #实例化一个greenlet对象,并将任务名称作为参数参进去
g2 = greenlet(f2)
g1.switch(‘alex‘) #执行g1对象里面的任务

greenlet版的协程

3.gevent版协程(真正的协程)

import gevent
import time

def f1():
    print("第一次f1")
    gevent.sleep(1)
    print("第二次f1")

def f2():
    print("第一次f2")
    gevent.sleep(2)
    print("第二次f2")

s = time.time()
g1 = gevent.spawn(f1) #异步提交了f1任务
g2 = gevent.spawn(f2) #异步提交了f2任务
g1.join()
g2.join()
e = time.time()
print("执行时间:",e-s)
print("主程序任务")

gevent版协程(不完美版)

大家会发现一个问题就是只能使用gevent.sleep来代替time.sleep。还有就是要g1.join()和g2.join()有些麻烦对不对,下面就是协程gevent版的升级版。

import gevent
import time
from gevent import monkey;monkey.patch_all() #可以接收所有的I/O
def f1():
    print("第一次f1")
    time.sleep(1)
    print("第二次f1")

def f2():
    print("第一次f2")
    time.sleep(2)
    print("第二次f2")

s = time.time()
g1 = gevent.spawn(f1) #异步提交了f1任务
g2 = gevent.spawn(f2) #异步提交了f2任务

gevent.joinall([g1,g2]) #一个列表里面是任务名等同于g1.join()和g2.join()
e = time.time()
print("执行时间:",e-s)
print("主程序任务")

gevent版协程(升级版)

原文地址:https://www.cnblogs.com/guchenxu/p/10268769.html

时间: 2024-10-07 20:18:17

线程进阶之线程队列、线程池和协程的相关文章

线程队列、事件以及协程

线程的几个队列 都是从queue这个模块中导入 1.Queue队列(先进先出的队列) from queue import Queue q = Queue(maxsize=3) # 实例化产生队列对象 # maxsize 设置队列里能容纳的最大的数据个数 q.put("first") q.put("second") q.put("third") # 如果队列满了,put会阻塞住,等到空了再放进去 print(q.get()) # first pri

进程池与线程池、协程、协程实现TCP服务端并发、IO模型

进程池与线程池.协程.协程实现TCP服务端并发.IO模型 一.进程池与线程池 1.线程池 ''' 开进程开线程都需要消耗资源,只不过两者比较的情况下线程消耗的资源比较少 在计算机能够承受范围内最大限度的利用计算机 什么是池? 在保证计算机硬件安全的情况下最大限度的利用计算机 池其实是降低了程序的运行效率,但是保证了计算机硬件的安全 (硬件的发展跟不上软件的速度) ''' from concurrent.futures import ThreadPoolExecutor import time p

老男孩学习DAY11-1 进程、进程池、协程

python 进程 优点:可以处理大量的并发操作,使用IO计算型 缺点:由于进程之间的数据都是独立,所以创建一个进程,就得消耗一份内存 (进程和cpu核数相同的情况最好) Process :进程 (让我想到了40个人,要烧40壶水,要弄40个炉子,但是效率高) 进程中有 join (2)   阻塞住啦,最多阻塞2秒钟:demaon(true)  可以设置不阻塞,直接运行. 都说进程之间的数据是独立,那么我们你能将进程之间的数据共享吗,聪明的人类,当然可以,那就用到了mange和array arr

并发编程 之 线程的队列, 线程池; 以及协程 (四)

线程: 队列:Queue from queue import Queue 特点: 先进先出 自带锁, 数据安全 方法: put() 添加 get() 获取 put_nowait() get_nowait() full() empty() qsize() 和普通的队列用法一样, 详情 请看 进程之队列 栈:LifoQueue from queue import LifoQueue 后进先出 自带锁, 数据安全 优先级队列:PriorityQueue from queue import Priori

线程队列,线程池和协程

线程的其他方法: threading.current_thread() #当前线程对象 getName() # 获取线程名 ident  # 获取线程id    threading.enumerate()   # 当前正在运行的线程对象的一个列表 threading.active_count()  # 当前正在运行的线程数量 import time from threading import Thread,current_thread def f1(n): print(f"{n}号线程正在运行&

线程queue、线程进程池,协程

线程queue import queue q = queue.Queue() #先进先出 q = queue.LifoQueue() #先进后出 t = queue.PriorityQueue() #优先级取数据,通常这个元组的第一个值是int类型 q.put('123') q.put('qweqwe') print(q.get()) print(q.get()) t.put('100', 'tank') t.put('10', 'nick') t.put('1', 'jason') print

Python入门学习-DAY37-进程池与线程池、协程、gevent模块

一.进程池与线程池 基本使用: 进程池和线程池操作一样 提交任务的两种方式: 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的 同步调用 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os

python全栈脱产第37天------进程池与线程池、协程、gevent模块、单线程下实现并发的套接字通信

一.进程池与线程池 调用concurrent.futures下的ThreadPoolExecutor,ProcessPoolExecutor来实现 提交任务有两种方式:同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,在执行下一段代码,是串行的 异步调用:提交完一个任务之后,不在原地等待,直接运行下一段代码,任务是并发的 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimp

Python 37 进程池与线程池 、 协程

一:进程池与线程池 提交任务的两种方式: 1.同步调用:提交完一个任务之后,就在原地等待,等任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行 2.异步调用:提交完一个任务之后,不是原地等待,而是直接执行下一行代码,会导致任务是并发执行的,结果future对象会在任务运行完毕后自动传给回调函数 二:协程 基于单线程下实现并发,只有一个主线程(如下图:可利用的CPU只有一个)的情况下实现并发,并发的本质:切换+保存状态 CPU正在运行一个任务,会在两种情况下自习其他任务(切换