线程queue、事件event及协程

线程queue、事件event及协程

线程queue

多线程抢占资源,让其保持串行的两种方式:

? 1、互斥锁

? 2、队列

线程队列分为以下三种:

1、Queue(先进先出)

import queue

q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
# q.put(4,block=False)  # 若不设置block参数,默认为True,大于队列长度进入阻塞状态,若设置block为False,大于对列长度直接报错
print(q.get())
print(q.get())
print(q.get())
# print(q.get(timeout=2)) 阻塞2s 还没有值直接报错
# 结果
1
2
3

2、LifoQueue(后进先出)

import queue

q = queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# 结果:
3
2
1

3、PriorityQueue(优先级队列)

import queue

q = queue.PriorityQueue(3)
q.put((-1,'awe'))   # 操作对象为元祖,第一个位置的数字越小,优先级越高
q.put((2,6))
q.put((0,3))
print(q.get())
print(q.get())
print(q.get())
# 结果:
(-1, 'awe')
(0, 3)
(2, 6)

事件event

开启两个线程,一个线程运行到中间的某个阶段,触发另个线程执行.两个线程增加了耦合性.

引入事件event的两个阶段:

版本1:(判断全局变量状态)

from threading import Thread
from threading import current_thread
import time

flag =False
def check():
    print(f'{current_thread().name}监测服务器是否开启')
    time.sleep(3)
    global flag
    flag = True
    print('服务器已开启')

def connect():
    while not flag:
        print(f'{current_thread().name}等待连接')
        time.sleep(0.5)
    else:
        print(f'{current_thread().name} 连接成功...')

t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start()
# 结果:
Thread-1监测服务器是否开启
Thread-2等待连接
Thread-2等待连接
Thread-2等待连接
Thread-2等待连接
Thread-2等待连接
Thread-2等待连接
服务器已开启
Thread-2 连接成功...

版本2:(事件Event)

from threading import Thread
from threading import current_thread
from threading import Event
import time

event = Event() # 创建事件对象

def check():
    print(f'{current_thread().name}监测服务器是否开启')
    time.sleep(3)
    print(event.is_set()) # 判断事件是否设置
    event.set() # 设置事件
    print(event.is_set())
    print('服务器已开启')

def connect():

    print(f'{current_thread().name}等待连接')
    event.wait() # 等待事件设置,阻塞状态
    print(f'{current_thread().name} 连接成功...')

t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start()

小练习:

将上述例子改为一个线程监测服务器状态,另一个线程判断服务器状态,如果服务器状态开启,则显示连接成功,此线程每1秒尝试连接服务器一次,一共连接3次,还没连接成功,则显示连接失败

from threading import Thread
from threading import current_thread
from threading import Event
import time

event = Event() # 创建事件对象

def check():
    print(f'{current_thread().name}监测服务器是否开启')
    time.sleep(3)
    event.set() # 设置事件
    print('服务器已开启')

def connect():
    count = 1
    while 1:
        print(f'{current_thread().name}等待连接')
        event.wait(1) # 等待事件设置,阻塞状态
        if count == 4:
            print(f'{current_thread().name}连接成功')
        count += 1
        print(f'{current_thread().name}尝试连接{count}次...')
    else:
        print(f'{current_thread().name}连接成功')

t1 = Thread(target=check,)
t2 = Thread(target=connect,)
t1.start()
t2.start()

协程

协程:简单的来说就是一个线程并发的处理任务.

串行:一个线程执行一个任务,执行完毕之后,执行下一个任务.

并行: 多个cpu执行多个任务, 4个cpu 执行4个任务.

并发: 一个cpu执行多个任务,看起来像是同时运行.

并发真正的核心:切换并且保持状态.

多线程的并发: 3个线程处理10个任务,如果线程1处理的这个任务,遇到阻塞,cpu被操作系统切换到另一个线程,

一个线程并发处理任务:以一个线程执行3个任务为例:

协程定义:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

单个cpu并发执行10个任务的三种方式:

? 1、方式一:开启多进程并发执行, 操作系统切换+保持状态.

? 2、方式二:开启多线程并发执行,操作系统切换+保持状态.

? 3、方式三:开启协程并发的执行, 自己的程序 把控着cpu 在3个任务之间来回切换+保持状态.

以上三种实现方式,协程最好,这是因为:

? 1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

? 2.协程的运行速度更快

? 3.协程会长期霸占cpu只执行我程序里面的所有任务.

协程的特点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈(保持状态)
  4. 附加:一个协程遇到IO操作自动切换到其它协程

Greenlet

Greenlet是python中的一个第三方模块,真正的协程模块就是使用greenlet完成的切换

并发的两个核心:切换并且保持状态.接下来我们从一个例子慢慢引入此模块的用法

# 版本一:单切换
def func1():
    print('in func1')

def func2():
    print('in func2')
    func1()
    print('end')

func2()

# 版本二:切换+保持状态
import time
def gen():
    while 1:
        yield 1
        time.sleep(0.5)  # 手动设置IO,遇到IO无法自动切换

def func():
    obj = gen()
    for i in range(10):
        next(obj)
func()

# 版本三:切换+保持状态,遇到IO自动切换
from greenlet import greenlet
import time
def eat(name):
    print('%s eat 1' %name)  # 2
    g2.switch('taibai')  # 3
    time.sleep(3)
    print('%s eat 2' %name)  # 6
    g2.switch()  # 7

def play(name):
    print('%s play 1' %name)  # 4
    g1.switch()  # 5
    print('%s play 2' %name)  # 8

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('taibai')  # 1 切换到eat任务

协程模块gevent

gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

# gevent模块的几个用法
# 用法:
g1=gevent.spawn(func,1,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务

g2=gevent.spawn(func2)

g1.join() # 等待g1结束

g2.join() # 等待g2结束  有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,但是你会发现,如果g2里面的任务执行的时间长,但是不写join的话,就不会执行完等到g2剩下的任务了

# 或者上述两步合作一步:gevent.joinall([g1,g2])

使用time.sleep模拟程序中遇到的阻塞:

import gevent
import time
from threading import current_thread
def eat(name):
    print('%s eat 1' %name)
    print(current_thread().name)
    # gevent.sleep(2)
    time.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    print(current_thread().name)
    # gevent.sleep(1) # gevent.sleep(1)模拟的是gevent可以识别的io阻塞
    time.sleep(1)
    # time.sleep(1)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
    print('%s play 2' %name)

g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,name='egon')
print(f'主{current_thread().name}')
g1.join()
g2.join()
# 结果:
主MainThread
egon eat 1
MainThread
egon eat 2
egon play 1
MainThread
egon play 2

最终版本:

import gevent
from gevent import monkey
monkey.patch_all()  # 打补丁: 将下面的所有的任务的阻塞都打上标记
def eat(name):
    print('%s eat 1' %name)
    time.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    time.sleep(1)
    print('%s play 2' %name)

g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,name='egon')

# g1.join()
# g2.join()
gevent.joinall([g1,g2])
# 结果:
egon eat 1
egon play 1
egon play 2
egon eat 2

负载均衡:就是指将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行

Nginx:Nginx是一款轻量级的Web服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器,其特点是占有内存少,并发能力强。

一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个

单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。

原文地址:https://www.cnblogs.com/lifangzheng/p/11420036.html

时间: 2024-10-07 18:02:45

线程queue、事件event及协程的相关文章

python38 1.线程一堆队列 2.事件Event 3.协程 4.断点续传

复习 1.GIL锁 2.如何避免GIL锁给程序带来的效率影响 3.与自定义锁的区别 4. 线程池进程池 5 同步  异步 6.异步回调 1.GIL锁 ? 全局解释器锁,   用来锁住解释器的互斥锁 ? 为啥加: CPython 中内存管理是非线程安全的,  GIL是为了   保护解释器的数据不被并发修改 ? 加锁后的问题:导致多个线程无法并行执行, 降低了效率 ? 当然 是可以并发的 ? 2.如何避免GIL锁给程序带来的效率影响 ? 什么时候会影响效率 ?  如果是计算密集型任务,开多线程不能提

转--- 秒杀多线程第六篇 经典线程同步 事件Event

阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇 一个经典的多线程同步问题> <秒杀多线程第五篇 经典线程同步关键段CS> 上一篇中使用关键段来解决经典的多线程同步互斥问题,由于关键段的“线程所有权”特性所以关键段只能用于线程的互斥而不能用于同步.本篇介绍用事件Event来尝试解决这个线程同步问题. 首先介绍下如何使用事件.事件Event实际上是个内核对象,它的使用非常方便.下面列出一些常用的函数. 第一个 CreateEvent 函数功能:创建事件 函数原型: HANDLEC

秒杀多线程第六篇 经典线程同步 事件Event

阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇 一个经典的多线程同步问题> <秒杀多线程第五篇 经典线程同步关键段CS> 上一篇中使用关键段来解决经典的多线程同步互斥问题,由于关键段的"线程所有权"特性所以关键段只能用于线程的互斥而不能用于同步.本篇介绍用事件Event来尝试解决这个线程同步问题. 首先介绍下如何使用事件.事件Event实际上是个内核对象,它的使用非常方便.下面列出一些常用的函数. 第一个 CreateEvent 函数功能:创建事件 函数原

day 32异步+回调、线程queue、线程Event、协程、单线程下实现遇到IO切换

一.异步+回调:线程是谁空谁调,进程是主进程调用 from concurrent.futures import ProcessPoolExcutor,ThreadPoolExecutor from threading import current_thread import requests,os,time,random def get(url): print('%s GET %s'%(current_thread().name,url)) response=requests.get(url)

python开发线程:死锁和递归锁&amp;信号量&amp;定时器&amp;线程queue&amp;事件evevt

一 死锁现象与递归锁 进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread):

进程、线程、轻量级进程、协程和go中的Goroutine

一.进程 操作系统中最核心的概念是进程,分布式系统中最重要的问题是进程间通信. 进程是“程序执行的一个实例” ,担当分配系统资源的实体.进程创建必须分配一个完整的独立地址空间. 进程切换只发生在内核态,两步:1 切换页全局目录以安装一个新的地址空间 2 切换内核态堆栈和硬件上下文.  另一种说法类似:1 保存CPU环境(寄存器值.程序计数器.堆栈指针)2修改内存管理单元MMU的寄存器 3 转换后备缓冲器TLB中的地址转换缓存内容标记为无效. 二.线程 书中的定义:线程是进程的一个执行流,独立执行

MFC线程(三):线程同步事件(event)与互斥(mutex)

前面讲了临界区可以用来达到线程同步.而事件(event)与互斥(mutex)也同样可以做到. Win32 API中的线程事件 HANDLE hEvent = NULL; void MainTestFun{ hEvent = CreateEvent(NULL,FALSE,FALSE,NULL); SetEvent(hEvent); char g_charArray[4]; CString szResult; //下面三个线程中的任意一个访问g_charArray的时候其他线程都不能访问 AfxBe

IO多路复用, 基于IO多路复用+socket实现并发请求(一个线程100个请求), 协程

一. IO多路复用 IO多路复用作用:检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可读/可写) 二. 基于IO多路复用+socket实现并发请求(一个线程100个请求) IO多路复用 socket非阻塞 基于事件循环实现的异步非阻塞框架:aaaa.py 非阻塞:不等待 异步:执行完某个任务后自动调用我给他的函数. Python中开源 基于事件循环实现的异步非阻塞框架 Twisted # aaaa.py import socket import select cla

python 38 线程队列与协程

目录 1. 线程队列 1.1 先进先出(FIFO) 1.2 后进先出(LIFO)堆栈 1.3 优先级队列 2. 事件event 3. 协程 4. Greenlet 模块 5. Gevent模块 1. 线程队列 1.1 先进先出(FIFO) import queue q = queue.Queue(3) q.put('a') q.put('b') q.put('c') print(q.qsize()) # 队列大小 print(q.get()) print(q.get()) print(q.get