协程,事件,队列,同步,异步,回调函数

协程

什么是协成?单个线程并发的处理多个任务,程序控制协成的切换+保持状态,协成的切换速度非常快,蒙蔽了操作系统的眼睛,让操作系统认为CPU一直在运行
进程或线程都是由操作系统控制CPU来回切换,遇到阻塞就切换执行其他任务,协成是程序控制的,霸占CPU执行任务,会在操作系统控制CPU之前来回切换,操作系统就认为CPU一直在运作

协程的优点:
    1.开销小
    2.运行速度快
    3.协程会长期霸占CPU只执行我程序里的所有任务
协程的缺点:
    1.协程属于微并发,处理任务不易过多
    2.协程的本质是单线程,无法利用多核,可以一个程序开启多个进程,每个进程开启多个线程.每个线程开启协程
协程处理io密集型比较好

协程的特点:
    1.必须在只有一个单线程里实现并发
    2.修改共享数据不需要加锁
    3.保持状态
    4.一个协程遇到io操作就自动切换到其他协程 

工作中:
?   一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个
#当一个任务没有阻塞的时候
import time
def task():
    res = 1
    for i in range(1,1000000):
        res += 1
def task1():
    res = 1
    for i in range(1,1000000):
        res -= 1
strt = time.time()
task()
task1()
end = time.time()
print(f"串行执行效率:{end - strt}") #串行执行效率:0.07783079147338867

#第一次接触协程是yield
#没有io阻塞
import time
def task():
    res = 1
    for i in range(1,1000000):
        res += 1
        yield res
def task1():
    g = task()
    res = 1
    for i in range(1,1000000):
        res -= 1
        next(g)
strt = time.time()
task()
task1()
end = time.time()
print(f"协成执行效率:{end - strt}")#协成执行效率:0.21143341064453125
纯计算密集型,没有io情况下,串行的执行效率高于协程

# 并不是真正的阻塞
import gevent
def eat(name):
    print(f"{name} eat 1")   #1
    gevent.sleep(2)    #模拟的是gevent可以识别的阻塞
    print(f"{name} eat 2")   #4
def play(name):
    print(f"{name} play 1")   #2
    gevent.sleep(1)
    print(f"{name} play 2")   #3
g1 = gevent.spawn(eat,'八戒')
g2 = gevent.spawn(play,name = '悟空')
g1.join()
g2.join()
print("主")                    #5

import threading
from gevent import monkey
import gevent
import time
monkey.patch_all()        # 打补丁:将下面的所有的任务的阻塞打上标记,遇到这个标记就切换

def eat():
    print(f"线程1:{threading.current_thread().getName()}")    #1
    print('eat food 1')                                       #2
    time.sleep(2)                                             #3阻塞,去执行别的任务
    print('eat food 2')                                       #8

def play():
    print(f"线程2:{threading.current_thread().getName()}")    #4
    print('play 1')                                           #5
    time.sleep(1)                                             #6阻塞,再去执行别的任务,上一个任务还在阻塞,继续向下执行
    print('play 2')                                           #7
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print(f"{threading.current_thread().getName()}")              #9
注意在各个任务中的阻塞时间

Event事件

添加全局变量,修改全局变量,实现一个线程在某一个节点让下一个线程继续工作
import time
from threading import Thread
from threading import current_thread
flag = False
def task():
    print(f"{current_thread().name}检测服务器是否正常开启.....")
    time.sleep(2)
    global flag
    flag = True
def task1():
    while 1:
        time.sleep(1)
        print(f"{current_thread().name}正在连接服务器....")
        if flag:
            print(f"{current_thread().name}连接成功")
            return

if __name__ == '__main__':
    t1 = Thread(target=task1)
    t2 = Thread(target=task1)
    t3 = Thread(target=task1)
    t = Thread(target=task)

    t.start()
    t1.start()
    t2.start()
    t3.start()

import time
from threading import Thread
from threading import current_thread
from threading import Event
import random
event = Event()   #默认为False
def task1():
    print(f"{current_thread().getName()} 检测服务器知否开启...") #获取线程名字
    time.sleep(random.randint(1,3))
def task2():
    print(f"{current_thread().getName()}正在尝试连接服务器...")
    count = 1
    while count < 4:
        time.sleep(1)
        print(f"{current_thread().getName()}连接第{count}次")
        if count < 4:
            time.sleep(0.5)
            event.set() #将event状态修改为True
            print('连接成功')
        count += 1
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.start()
t2.start()

import time
from threading import Thread
from threading import current_thread
from threading import Event
import random
event = Event()
def check():
    print(f"{current_thread().name}检测服务器是否开启...")
    time.sleep(random.randint(1,3))
    event.set()
    print("连接成功")
def connect():
    count = 1
    while not event.is_set(): #event.is_set()判断状态是True或False
        if count == 4:
            print('连接次数过多,已断开')
            break
        event.wait(1) #轮询检测event状态,如果为真,就向下执行,是个阻塞, # 只阻塞1秒,1秒之后如果还没有进行set 直接进行下一步操作.
        print(f"{current_thread().name}尝试连接{count}次")
        count += 1
    else:
        print(f"{current_thread().name}连接成功")
t1 = Thread(target=check)
t2 = Thread(target=connect)
t1.start()
t2.start()

线程队列

import queue
# 先进先出原则
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False)) #设置多取会报错
q.get(timeout=2)#阻塞两秒,后报错
# print(q.get())多去一个会阻塞

# 后进先出
q = queue.LifoQueue(4)
q.put(1)
q.put(2)
q.put('alex')
q.put('太白')

print(q.get())
print(q.get())
print(q.get())
print(q.get())
# 太白
# alex
# 2
# 1

优先级队列
最小的先取出
q = queue.PriorityQueue(4)
q.put((1,'qq'))
q.put((-2,'qq1'))
q.put((0,'qq2'))

print(q.get())
print(q.get())
print(q.get())

#(-2, 'qq1')
#(0, 'qq2')
#(1, 'qq')

同步

同步调用(提交完任务后,就在原地等待任务执行完后,拿到结果,再执行下一行代码)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
    print(f"{os.getpid()}开始任务")
    time.sleep(random.randint(1,3))
    print(f"{os.getpid()}结束任务")
    return i
if __name__ == '__main__':
    pool = ProcessPoolExecutor()
    for i in range(10):                #同时开启10个进程
        obj = pool.submit(task,i)      #类似于发布任务
        print(f"任务结果:{obj.result()}")  # 对象加result()就变成同步,获取的是运行状态.obj就为动态对象, (running,pending,finish)  动态对象.result()获取结果
                                       #obj.result() 必须等到这个任务完成后,返回了结果,在执行下一个任务
    pool.shutdown(wait=True)           #shutdown 让我的主进程等待进程池中所有子进程结束后,在执行,类似于join
                                       #在上一个进程池没有完成任务之前,不允许添加新的任务,一个任务通过一个函数实现,任务完成了他的返回值就是函数的返回值
    print("主")
  

异步

异步调用(提交完任务后,不再原地等待任务执行完)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
    print(f"{os.getpid()}开始任务")
    time.sleep(random.randint(1,3))
    print(f"{os.getpid()}结束任务")
    return i

if __name__ == '__main__':
    pool = ProcessPoolExecutor()
    for i in range(10):
        pool.submit(task,i)
    pool.shutdown(wait=True) #等待进程池中所有子进程结束后在向下执行,类似于join
    print("主")

存在一个列表中,统一回收结果 ,把动态对象放在列表中,(容器)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
    print(f"{os.getpid()}开始任务")
    time.sleep(random.randint(1,3))
    print(f"{os.getpid()}结束任务")
    return i

if __name__ == '__main__':
    pool = ProcessPoolExecutor()
    l = []
    for i in range(10):
        obj = pool.submit(task,i)
        l.append(obj)
    pool.shutdown(wait=True)
    for i in l:
        print(i.result())
    print("主")
虽然是同时发布了任务,但是在回收结果的时候,不能马上收到一个结束任务的返回值,只能等所有任务结束后统一收到结果 

异步调用的如何取值?
1.
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
import requests
# 模拟的就是爬取多个源代码 一定有IO操作
def task(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text
def parse(content):
    return len(content)

if __name__ == '__main__':
    ret = task('http://www.baidu.com')
    print(parse(ret))

    ret = task('http://www.JD.com')
    print(parse(ret))

    ret = task('http://www.taobao.com')
    print(parse(ret))

    ret = task('https://www.cnblogs.com/jin-xin/articles/7459977.html')
    print(parse(ret))
#这样写为执行一个task函数,在执行一个parse函数,执行效率低,一个任务执行2秒,20个任务就是40秒,耗时
#这两个函数为一个任务,当一个任务执行完成后再执行下一个任务,串行

2.

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
import requests
# 模拟的就是爬取多个源代码 一定有IO操作
def task(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text
def parse(content):
    return len(content)

if __name__ == '__main__':
#     开启一个线程池,并发执行
    url_list = ['http://www.baidu.com',
                'http://www.JD.com',
                'http://www.taobao.com',
                'https://www.cnblogs.com/jin-xin/articles/7459977.html'
    ]
    pool = ThreadPoolExecutor(4)
    obj_list = []

    for url in url_list:
        obj = pool.submit(task,url)
        obj_list.append(obj)
    pool.shutdown(wait=True)
#这个for循环把爬取的源码添加到一个类表中,对象,执行task函数
    for i in obj_list:
        print(parse(i.result()))
循环取出类表中的元素,当做参数传入parse函数中,进行数据分析
以上版本缺点:异步的发出10个任务,并发的执行任务,但是分析结果的流程是串行,没有做到结束一个任务就返回一个返回值,效率低

3.

def task(url):
    ret = requests.get(url)
    if ret.status_code == 200:
        return parse(ret.text)    #更改函数,直接返回并调用parse函数
def parse(content):
    return len(content)

if __name__ == '__main__':
#     开启一个线程池,并发执行
    url_list = [
        'http://www.baidu.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.taobao.com',
        'https://www.cnblogs.com/jin-xin/articles/7459977.html',
        'https://www.luffycity.com/',
        'https://www.cnblogs.com/jin-xin/articles/9811379.html',
        'https://www.cnblogs.com/jin-xin/articles/11245654.html',
        'https://www.sina.com.cn/'
    ]
    pool = ThreadPoolExecutor(4)
    obj_list = []

    for url in url_list:
        obj = pool.submit(task,url)
        obj_list.append(obj)
    pool.shutdown(wait=True)

    for i in obj_list:
        print(i.result())#直接返回结果,不用再调用函数,直接全部打印出来结果

4.异步调用+回调函数**************************
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
import requests
def task(url):
    '''模拟的就是爬取多个源代码 一定有IO操作'''
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text
def parse(obj):
    '''模拟对数据进行分析 一般没有IO'''
    print(len(obj.result()))
if __name__ == '__main__':
    # 开启线程池,并发并行的执行
    url_list = [
        'http://www.baidu.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.JD.com',
        'http://www.taobao.com',
        'https://www.cnblogs.com/jin-xin/articles/7459977.html',
        'https://www.luffycity.com/',
        'https://www.cnblogs.com/jin-xin/articles/9811379.html',
        'https://www.cnblogs.com/jin-xin/articles/11245654.html',
        'https://www.sina.com.cn/'
    ]
    pool = ThreadPoolExecutor(4)
    for url in url_list:
        obj = pool.submit(task, url)
        obj.add_done_callback(parse)

        #add_done_callback回调函数,执行一个任务就返回一个函数的返回值,增加回调函数,发布完任务后直接执行本行,当同一进程执行完第一次任务后,主进程立即对其结果进行分析,不必等所有进程全部执行完再分析,提高了效率
线程池设置4个线程,异步发起10个任务,每次执行4个任务,执行完任务的时间肯定有先后顺序,先执行完的,将parse分析代码的任务
交给空闲线程去执行,然后这个线程再去执行其他任务,比爬取源码的任务就和分析数据的任务共同并发执行

异步回收收取结果的两种方式?
1.将所有任务的结果同意回收
2.完成一个任务,返回一个结果

异步+回调函数
回调函数:按顺序接收每个任务的结果,进行下一步处理
前提:
    异步处理的多个任务(io任务),回调函数处理的非io
区别:
    多进程,由主进程执行回调函数的代码
    多线程,由空闲线程执行回调函数的代码

原文地址:https://www.cnblogs.com/tangjian219/p/11426743.html

时间: 2024-09-30 07:05:39

协程,事件,队列,同步,异步,回调函数的相关文章

同步异步 + 回调函数

重点记忆 异步回调函数 如果进程池+回调: 回调函数由主进程去执行. 如果线程池+回调: 回到函数由空闲的线程去执行.(比如有4个线程,10个任务,第一轮完成4个任务,交由主线程处理结果,第二轮同样如此,但是第三轮将会空闲出2个子进程,则这2个子进程将会和主进程一同处理结果,以此类推,当所有的任务完成时,所有的子进程和主进程一起处理结果,增加效率) 回调函数不管有没有返回数据,返回值都是None,回调函数内部加函数名是调用此函数,obj隐形传参 1.概念 1. 从执行的角度 阻塞: 程序运行时,

python 管道 事件 信号量 进程池(map/同步/异步)回调函数

####################总结######################## 管道:是进程间通信的第二种方式,但是不推荐使用,因为管道会导致数据不安全的情况出现 事件:当我运行主进程的时候 需要子执行某个进程后 需要的返回值时 可以使用 信号量:互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 . 内部维护了一个计数器,acquire-1,release+1,为0的时候,其他的进程都要在acquire之前等待 进程池:  进程的创建和销

2015/01/23 – 不要对异步回调函数进行同步调用

绝对不能对异步回调函数(即使在数据已经就绪)进行同步调用. 如果对异步回调函数进行同步调用的话,处理顺序可能会与预期不符,可能带来意料之外的后果. 对异步回调函数进行同步调用,还可能导致栈溢出或异常处理错乱等问题. 如果想在将来某时刻调用异步回调函数的话,可以使用 setTimeout 等异步API. function onReady(fn) { var readyState = document.readyState; if (readyState == 'interactive' || re

Python的异步编程[0] -&gt; 协程[1] -&gt; 使用协程建立自己的异步非阻塞模型

使用协程建立自己的异步非阻塞模型 接下来例子中,将使用纯粹的Python编码搭建一个异步模型,相当于自己构建的一个asyncio模块,这也许能对asyncio模块底层实现的理解有更大的帮助.主要参考为文末的链接,以及自己的补充理解. 完整代码 1 #!/usr/bin/python 2 # ============================================================= 3 # File Name: async_base.py 4 # Author: L

js同步-异步-回调

出处:https://blog.csdn.net/u010297791/article/details/71158212(1)上面主要讲了同步和回调执行顺序的问题,接着我就举一个包含同步.异步.回调的例子. let a = new Promise(//声明了一个Promise回调函数,能够使用then function(resolve, reject) { console.log(1) setTimeout(() => console.log(2), 0) console.log(3) cons

jQuery 源码解析(八) 异步队列模块 Callbacks 回调函数详解

异步队列用于实现异步任务和回调函数的解耦,为ajax模块.队列模块.ready事件提供基础功能,包含三个部分:Query.Callbacks(flags).jQuery.Deferred(funct)和jQuery.when().本节讲解Callbacks,也就是回调函数列表 回调函数用于管理一组回调函数,支持添加.移除.触发.锁定和禁用回调函数,为jQuery.ajax.jQuery.Deferred()和ready()事件提供基础功能,我们也可以基于它编写新的组件. 使用方法:$.Callb

同步回调函数和异步回调函数

回调函数 回调函数一般是在封装接口的时候,回调显得特别重要,我们首先假设有两个程序员在写代码,A程序员写底层驱动接口,B程序员写上层应用程序,然而此时底层驱动接口A有一个数据d需要传输给B,此时有两种方式: 1.A将数据d存储好放在接口函数中,B自己想什么时候去读就什么时候去读,这就是我们经常使用的函数调用,此时主动权是B. 2.A实现回调机制,当数据变化的时候才将通知B,你可以来读取数据了,然后B在用户层的回调函数中读取速度d,完成OK.此时主动权是A. 很明显第一种方法太低效了,B根本就不知

难得二逼的协程,事件驱动和异步IO

先来回顾一下多线程和多进程把.多线程像是在一个国家内,由A点往B点搬运东西,一条线程就是一条路,多条线程就是开启多条路,然后每条路上可以运输东西.多进程就像多个国家,每个国家里面在执行自己的事情. 然后轮到今天的主角:协程出场 1.携程 corotine, 是一种用户态的轻量级线程,被称为微线程.是自己控制的,cpu不知道其存在.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈. 协程的好处:? 无需线程上下文切换的开销? 无需原子操作锁定及同步的

线程、进程、协程和队列

一.线程.进程 1.简述 进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动.它是操作系统动态执行的基本单元,通俗讲就是自定义一段程序的执行过程,即一个正在运行的程序.线程是进程的基本单位,又称为轻量级进程. * 不同的进程在内存中会开辟独立的地址空间,默认进程之间的数据是不共享,线程是由进程创建,所以处在同一个进程中的所有线程都可以访问该进程所包含的地址空间,当然也包含存储在该空间中的所有资源. 应用场景: IO密集型操作由于不占用CPU资源,所以一般使用线程来完成 计算密集型操作