《Python》线程池、携程

一、线程池(concurrent.futures模块)

#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务

#map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作

#shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

#result(timeout=None)
取得结果

#add_done_callback(fn)
回调函数
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def func(i):
    print(‘thread‘, i)
    time.sleep(1)
    print(‘thread %s end‘ % i)

# 线程池,提供异步调用
tp = ThreadPoolExecutor(5)
tp.submit(func, 1)
tp.shutdown()
print(‘主线程‘)

# 进程池,提供异步调用
if __name__ == ‘__main__‘:
    tp = ProcessPoolExecutor(5)
    tp.submit(func, 1)
    tp.shutdown()
    print(‘主线程‘)
import time
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread

def func(i):
    print(‘thread‘, i, current_thread().ident)
    time.sleep(1)
    print(‘thread %s end‘ % i)

tp = ThreadPoolExecutor(5)
for i in range(20):
    tp.submit(func, i)
tp.shutdown()
print(‘主线程‘)

# 获取返回值
import time
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread

def func(i):
    print(‘thread‘, i, current_thread().ident)
    time.sleep(1)
    print(‘thread %s end‘ % i)
    return i * ‘*‘

tp = ThreadPoolExecutor(5)
ret_l = []
for i in range(20):
    ret = tp.submit(func, i)
    ret_l.append(ret)
for ret in ret_l:
    print(ret.result())
print(‘主线程‘)
# map的用法
import time
from concurrent.futures import ThreadPoolExecutor

def func(i):
    print(‘thread‘, i)
    time.sleep(1)
    print(‘thread %s end‘ % i)
    return i * ‘*‘

tp = ThreadPoolExecutor(5)
res = tp.map(func, range(20))
for i in res:
    print(i)
# 回调函数
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread

def func(i):
    print(‘thread‘, i, current_thread().ident, os.getpid())
    time.sleep(1)
    print(‘thread %s end‘ % i)
    return i * ‘*‘

def call_back(arg):
    print(‘call back : ‘, current_thread().ident, os.getpid())
    print(‘ret : ‘, arg.result())
# 线程池的回调函数是由子线程完成的
tp = ThreadPoolExecutor(5)
ret_l = []
for i in range(20):
    tp.submit(func, i).add_done_callback(call_back)
print(‘主线程‘, current_thread().ident, os.getpid())
# 进程池的回调函数是由主进程完成的
if __name__ == ‘__main__‘:
    tp = ProcessPoolExecutor(5)
    ret_l = []
    for i in range(20):
        tp.submit(func, i).add_done_callback(call_back)
    print(‘主线程‘, current_thread().ident, os.getpid())

二、协程

  之前我们学习了线程、进程的概念,了解了在操作系统中进程是资源分配的最小单位,线程是CPU调度的最小单位。按道理来说我们已经算是把cpu的利用率提高很多了。但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程、创建线程、以及管理他们之间的切换。

  随着我们对于效率的追求不断提高,基于单线程来实现并发又成为一个新的课题,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。

  为此我们需要先回顾下并发的本质:切换+保存状态

  cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长

    

  ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态

   一:其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。

  为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

# 1. yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
# 2. send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换

# 串行执行
import time
def consumer(res):
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    pass

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
# 串行执行
res=producer()
consumer(res)     # 写成consumer(producer())会降低执行效率
stop=time.time()
print(stop-start)  # 1.5536692142486572

#基于yield并发执行
import time
def consumer():
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

start=time.time()
# 基于yield保存状态,实现两个任务直接来回切换,即并发的效果
# PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()

stop=time.time()
print(stop-start) #2.0272178649902344

单纯地切换反而会降低运行效率

  二:第一种情况的切换。在任务一遇到io情况下,切到任务二去执行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。

import time
def consumer():
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
        time.sleep(2)

start=time.time()
producer()     # 并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行

stop=time.time()
print(stop-start)

yield无法做到遇到io阻塞

  对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

# 1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
# 2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

协程介绍:  

  协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

需要强调的是:

#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

对比操作系统控制线程的切换,用户在单线程内控制协程的切换

优点如下:

#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点如下:

#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

总结协程特点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

Greenlet 模块

安装 :pip3 install greenlet

# greenlet实现状态切换

import time
from greenlet import greenlet

def eat():
    print(‘eating 1‘)
    g2.switch()
    time.sleep(1)
    print(‘eating 2‘)

def play():
    print(‘playing 1‘)
    time.sleep(1)
    print(‘playing 2‘)
    g1.switch()

g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()

单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度

# 顺序执行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print(‘run time is %s‘ %(stop-start)) #10.985628366470337

# 切换
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print(‘run time is %s‘ %(stop-start)) # 52.763017892837524

效率对比

  greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

  单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。

Gevent 模块

安装:pip3 install gevent

  Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

g1=gevent.spawn(func,1,,2,3,x=4,y=5) # 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

原文地址:https://www.cnblogs.com/yzh2857/p/9708648.html

时间: 2024-10-12 13:08:41

《Python》线程池、携程的相关文章

我对python线程池的理解

#!/usr/bin/env pythonfrom Queue import Queuefrom threading import Threadimport randomimport time def person(i,q):    while True:  #这个人一直处与可以接活干的状态        q.get()        print "Thread",i,"do_job"        time.sleep(random.randint(1,5))#每

python全栈开发 * 线程队列 线程池 协程 * 180731

一.线程队列 队列:1.Queue 先进先出 自带锁 数据安全 from queue import Queue from multiprocessing import Queue (IPC队列)2.LifoQueue后进先出 后进先出 自带锁 数据安全 from queue import LifoQueue lq=LifoQueue(5) lq.put(123) lq.put(666) lq.put(888) lq.put(999) lq.put("love") print(lq.pu

Python线程和协程-day10

写在前面 上课第10天,打卡: 感谢Egon老师细致入微的讲解,的确有学到东西! 一.线程 1.关于线程的补充 线程:就是一条流水线的执行过程,一条流水线必须属于一个车间: 那这个车间的运行过程就是一个进程: 即一个进程内,至少有一个线程: 进程是一个资源单位,真正干活的是进程里面的线程: 线程是一个执行单位: 多线程:一个车间内有多条流水线,多个流水线共享该车间的资源: 一个进程内有多个线程,多线程共享一个进程的资源: 线程创建的开销要远远小于创建进程的开销: 进程之间更多的是一种竞争关系:

线程队列 线程池 协程

1 . 线程队列 from multiprocessing Queue , JoinableQueue  #进程IPC队列 from queue import Queue  #线程队列  先进先出 from queue import LifoQueue  #后进先出的 方法都是一样的 : put , get , put_nowait , get_nowait , full , empty , qsize 队列 Queue : 先进先出 , 自带锁 , 数据安全 栈 LifoQueue : 后进先

[python] ThreadPoolExecutor线程池 python 线程池

初识 Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行. 这就是线程池的思想(当然没这么简单),但是自己编

进程池线程池 协程

socket服务端实现并发 服务端需要满足以下3点: 1 固定的ip和port 2 24小时提供服务 3 能够实现并发 多线程实现并发: 服务端: import socket from threading import Thread import os server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) #半连接池 def communicate(conn): while True: try: dat

python线程池

线程池: 版本一: #!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread

Java线程池主线程等待子线程执行完成

今天讨论一个入门级的话题, 不然没东西更新对不起空间和域名~~ 工作总往往会遇到异步去执行某段逻辑, 然后先处理其他事情, 处理完后再把那段逻辑的处理结果进行汇总的产景, 这时候就需要使用线程了. 一个线程启动之后, 是异步的去执行需要执行的内容的, 不会影响主线程的流程,  往往需要让主线程指定后, 等待子线程的完成. 这里有几种方式. 站在 主线程的角度, 我们可以分为主动式和被动式. 主动式指主线主动去检测某个标志位, 判断子线程是否已经完成. 被动式指主线程被动的等待子线程的结束, 很明

Python 线程池

python默认没有提供线程池的功能,所以要想使用线程池,就必要使用第三方的模块或者自定义线程 线程并不是越多越好,线程的上下文切换会影响到服务器的性能 线程池:一个容器,有最大数,取一个少一个,无线程时等待,线程执行完毕,交还线程 __author__ = 'alex' #coding:utf-8 import queue import threading import time class ThreadPool: def __init__(self,maxsize=5): self.maxs

一个简单的python线程池框架

初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信. 代码如下: 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 __author__ = "pandaychen" 5 6 import Queue 7 import sys 8 import os 9 import threading 10 import time 11