线程Queue、定时器、进程池和线程池、同步异步

目录

  • 线程Queue、定时器、进程池和线程池、多线程socket通信

    • 一、Queue队列实现线程通信
    • 二、线程定时器(Timer)
    • 三、进程池和线程池
    • 四、同步和异步
      • 4.1、同步
      • 4.2 、异步
    • 五、多线程socket升级

线程Queue、定时器、进程池和线程池、多线程socket通信

一、Queue队列实现线程通信

queue模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在queue模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。

关于这三个队列类的简单介绍如下:

  1. queue.Queue(maxsize=0):代表先进先出(FIFO)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。
  2. queue.LifoQueue(maxsize=0):代表后进先出(LIFO)的队列,与 Queue 的区别就是出队列的顺序不同。
  3. queue.PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。

这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:

  • Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
  • Queue.empty():判断队列是否为空。
  • Queue.full():判断队列是否已满。
  • Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
  • Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
  • Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
  • Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。

下面看具体代码

# 先进先出
import queue
q = queue.Queue()
q.put('123')
q.put('qweqwe')
print(q.get())  # 先拿到的是123
print(q.get())
# print(q.get()) # 两个值都取光了,队列为空,这里再取就会阻塞,直到队列中有值
q.task_done() # 结束任务
# q.join()
# 先进后出
import queue
q = queue.LifoQueue()
q.put('杨蓬蓬吃饭')
q.put('杨蓬蓬上厕所')
q.put('杨蓬蓬睡觉')
print(q.get())  # 后进先出,先取到杨蓬蓬睡觉
print(q.get())
print(q.get())  # 先进后出,最后取到杨蓬蓬吃饭
# 优先级
import queue
q = queue.PriorityQueue() # 可以根据优先级取数据
# 通常这个元组的第一个值是int类型,第一个值越小,优先级越高
q.put((50,'吃饭'))
q.put((80,'睡觉'))
q.put((1,'敲代码'))
print(q.get())  # 1最小,先取到敲代码 (1, '敲代码')
print(q.get())  # (50, '吃饭')
print(q.get())  # (80, '睡觉')

二、线程定时器(Timer)

Thread 类有一个 Timer子类,该子类可用于控制指定函数在特定时间内执行一次。例如如下程序:

from threading import Thread,Timer
import time

def task():
    print('线程执行了')
    time.sleep(2)
    print('线程结束了')

t = Timer(4,task)  # 指定4s后开启一个线程执行task
t.start()

需要说明的是,Timer 只能控制函数在指定时间内执行一次,如果要使用 Timer 控制函数多次重复执行,则需要再执行下一次调度。

三、进程池和线程池

用池的功能限制进程数或线程数。

为什么要限制:当并发的任务数量远远大于计算机所能承受的范围,即无法一次性开启过多的任务数量,就应该考虑去限制进程数或线程数,从而保证服务器不崩。

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好的提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

使用线程池来执行线程任务:

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time

def task(i):
    time.sleep(1.5)
    print(f"{currentThread().name}在执行任务{i+1}")
    return i**2

if __name__ == '__main__':
    fu_list = []
    pool = ThreadPoolExecutor(4)  #规定线程池有4个线程
    for i in range(20):  #模拟20个线程,task要做20次,4个线程负责做这个事情
        future = pool.submit(task,i)   #for循环一次,提交一次
        fu_list.append(future)   #先把提交的数据意义放到这个列表里面
    pool.shutdown()   # 关闭池的入口,不让你往里面再放东西
    for fu in fu_list:  #依次循环列表里面的值
        print(fu.result())  #打印返回值

-----------------------------------------------------------------------------
ThreadPoolExecutor-0_0在执行任务1
ThreadPoolExecutor-0_2在执行任务3
ThreadPoolExecutor-0_1在执行任务2
ThreadPoolExecutor-0_3在执行任务4
ThreadPoolExecutor-0_0在执行任务5
ThreadPoolExecutor-0_1在执行任务7
ThreadPoolExecutor-0_2在执行任务6
ThreadPoolExecutor-0_3在执行任务8
ThreadPoolExecutor-0_0在执行任务9
ThreadPoolExecutor-0_2在执行任务11
ThreadPoolExecutor-0_1在执行任务10
ThreadPoolExecutor-0_3在执行任务12
ThreadPoolExecutor-0_0在执行任务13
ThreadPoolExecutor-0_2在执行任务14
ThreadPoolExecutor-0_1在执行任务15
ThreadPoolExecutor-0_3在执行任务16
ThreadPoolExecutor-0_0在执行任务17
ThreadPoolExecutor-0_2在执行任务18
ThreadPoolExecutor-0_1在执行任务19
ThreadPoolExecutor-0_3在执行任务20
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361

上述代码就是只有0,1,2,3四个线程来执行20次任务,同一时间只能有四个任务执行,只有能一个线程执行完一个任务,空出来了,才能执行下一个任务。

使用进程池来执行进程任务

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time

def task(i):
    time.sleep(1)
    print(f"{current_process().name}在执行任务{i+1}")
    time.sleep(1)
    return i**2

if __name__ == '__main__':
    fu_list = []
    pool = ProcessPoolExecutor(4)  #规定进程池有是个线程
    for i in range(20):  #模拟20个进程,task要做20次,4个进程负责做这个事情
        future = pool.submit(task,i)   #for循环一次,提交一次
        fu_list.append(future)   #先把提交的数据意义放到这个列表里面
    pool.shutdown()   # 关闭池的入口,不让你往里面再放东西
    for fu in fu_list:
        print(fu.result())

-----------------------------------------------------------------------------
Process-1在执行任务1
Process-2在执行任务2
Process-3在执行任务3
Process-4在执行任务4
Process-1在执行任务5
Process-2在执行任务6
Process-3在执行任务7
Process-4在执行任务8
Process-1在执行任务9
Process-2在执行任务10
Process-3在执行任务11
Process-4在执行任务12
Process-1在执行任务13
Process-2在执行任务14
Process-3在执行任务15
Process-4在执行任务16
Process-1在执行任务17
Process-2在执行任务18
Process-3在执行任务19
Process-4在执行任务20
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361

不要和信号量用混了,线程池里面始终没有产生新的线程,比如ThreadPoolExecutor(4),所有的任务始终是由这4个线程去执行。

四、同步和异步

是提交任务的两种方式

4.1、同步

提交了一个任务,必须等任务执行完 (拿到返回值)才能执行下一行代码

import os
import time
import random
from multiprocessing import Process

def work(n):
    print(f'{n}: {os.getpid()} is running' )
    time.sleep(random.randint(1,3))
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):  #这种就是同步了
        p=Process(target=work,args=(i,))
        p.start()

-----------------------------------------------------------------------------
1: 7504 is running
0: 15736 is running
2: 17896 is running
2:17896 is done
1:7504 is done
0:15736 is done

4.2 、异步

提交了一个任务,不要等任务执行完,可以直接执行下一行代码

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time

def task(i):
    time.sleep(1)
    print(f"{current_process().name}在执行任务{i+1}")
    time.sleep(1)
    return i**2

def parse(future):
    print(future.result())

if __name__ == '__main__':
    fu_list = []
    pool = ProcessPoolExecutor(4)  #规定进程池有是个线程
    for i in range(20):  #模拟20个进程,task要做20次,4个进程负责做这个事情
        future = pool.submit(task,i)   #for循环一次,提交一次
        future.add_done_callback(parse)
        # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数,
        # 会把future对象作为参数传给函数
        # 这个称之为回调函数,处理完了回来就调用这个函数.

-----------------------------------------------------------------------------
Process-1在执行任务1
Process-2在执行任务2
Process-3在执行任务3
Process-4在执行任务4
1
0
4
9
Process-2在执行任务5
Process-1在执行任务6
Process-3在执行任务7
Process-4在执行任务8
16
25
36
49
Process-2在执行任务9
Process-1在执行任务10
Process-3在执行任务11
Process-4在执行任务12
64
81
100
121
Process-2在执行任务13
Process-1在执行任务14
Process-3在执行任务15
Process-4在执行任务16
144
169
196
225
Process-2在执行任务17
Process-1在执行任务18
Process-3在执行任务19
Process-4在执行任务20
256
289
324
361

五、多线程socket升级

服务端

import socket
from threading import Thread

def talk(conn):
    while True:
        try:
            msg = conn.recv(1024)
            if len(msg) == 0: break
            conn.send(msg.upper())
        except connectionResetError
            print('客户端关闭了一个链接')
            break
    conn.close()   

def serve_demo():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('192.168.11.199', 8010))
    server.listen(5)

    while True:
        conn, addr = server.accept()
        t = Thread(target=talk, args(conn,))
        t.start()

if __name__ == '__main__':
    server_demo()

客户端

import socket
from threading import Thread

def client_demo():
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    client.connect(('192.168.11.199', 8010))
    while True:
        msg = f'{currentThread().name}'
        if len(msg) == 0: break
        client.send(msg.encode('utf-8'))
        feedback = client.recv(1024)
        print(feedback.decode('utf-8'))

    client.close()

if __name__ == '__main__':
    for i in range(5):
        t = Thread(target=client_demo)
        t.start()

原文地址:https://www.cnblogs.com/zhuangyl23/p/11553183.html

时间: 2024-11-09 06:24:18

线程Queue、定时器、进程池和线程池、同步异步的相关文章

GIL、定时器、线程queue、进程池和线程池

一.GIL1.什么是GIL(这是Cpython解释器) GIL本质就是一把互斥锁,那既然是互斥锁,原理都一样,都是让多个并发线程同一时间只能 有一个执行 即:有了GIL的存在,同一进程内的多个线程同一时刻只能有一个在运行,意味着在Cpython中 一个进程下的多个线程无法实现并行===>意味着无法利用多核优势 但不影响并发的实现 GIL可以被比喻成执行权限,同一进程下的所以线程 要想执行都需要先抢执行权限 2.为何要有GIL 因为Cpython解释器自带垃圾回收机制不是线程安全的(对共享数据修改

一个线程oom,进程里其他线程还能运行吗?

线程之间互相不影响:守护线程生活周期相同 引言 这题是一个网友@大脸猫爱吃鱼给我的提问,出自今年校招美团三面的一个真题.大致如下 一个进程有3个线程,如果一个线程抛出oom,其他两个线程还能运行么? 先说一下答案,答案是还能运行 不瞒大家说,真在面试中,我遇到这一题,我估计也是答错.因为我初看到这一题,内心嘿嘿一笑,觉得这题是在考察JVM的内存结构.我第一反应是OOM的常见情况堆溢出,也就是下面的这种异常 java.lang.OutOfMemoryError: Java heap space 先

进程池、线程池及回调函数使用

一.线程池与进程池 池表示容器 线程就是装线程的容器 为什么要装到容器中 可以避免频繁的创建和销毁(进程/线程)来的资源开销 可以限制同时存在的线程数量 以保证服务器不会应为资源不足而导致崩溃 帮我们管理了线程的生命周期 管理了任务的分配 import os import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import activeCount,enum

GIL锁,线程池,同步异步

1.GIL 是一个全局解释器锁 ,是一种互斥锁 为什么需要GIL:因为一个python.exe进程中只有一分解释器,如果这个进程开启了多个线程都要执行代码 多线程之间要竞争解释器,一旦竞争就有可能出现问题 带来的问题:同一时间只有一个线程可以访问解释器 好处:保证了多线程的数据完全 thread-safe 线程安全的 多个线程同时访问也不会出问题 not thread-safe 非线程安全的 多个线程同时访问可能会出问题 (加锁) 默认情况下一个进程只有一个线程 是不会出现问题的 ,但是不要忘记

进程、线程、轻量级进程、协程和go中的Goroutine

一.进程 操作系统中最核心的概念是进程,分布式系统中最重要的问题是进程间通信. 进程是“程序执行的一个实例” ,担当分配系统资源的实体.进程创建必须分配一个完整的独立地址空间. 进程切换只发生在内核态,两步:1 切换页全局目录以安装一个新的地址空间 2 切换内核态堆栈和硬件上下文.  另一种说法类似:1 保存CPU环境(寄存器值.程序计数器.堆栈指针)2修改内存管理单元MMU的寄存器 3 转换后备缓冲器TLB中的地址转换缓存内容标记为无效. 二.线程 书中的定义:线程是进程的一个执行流,独立执行

Linux下的进程类别(内核线程、轻量级进程和用户进程)以及其创建方式--Linux进程的管理与调度(四)

本文声明 日期 内核版本 架构 作者 GitHub CSDN 2016-05-12 Linux-4.5 X86 & arm gatieme LinuxDeviceDrivers Linux进程管理与调度-之-进程的创建 本文中出现的,内核线程,轻量级进程,用户进程,用户线程等概念,如果不太熟悉, 可以参见 内核线程.轻量级进程.用户线程三种线程概念解惑(线程≠轻量级进程) Linux进程类别 虽然我们在区分Linux进程类别, 但是我还是想说Linux下只有一种类型的进程,那就是task_str

线程queue、线程进程池,协程

线程queue import queue q = queue.Queue() #先进先出 q = queue.LifoQueue() #先进后出 t = queue.PriorityQueue() #优先级取数据,通常这个元组的第一个值是int类型 q.put('123') q.put('qweqwe') print(q.get()) print(q.get()) t.put('100', 'tank') t.put('10', 'nick') t.put('1', 'jason') print

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制

1.线程queue :会有锁 q=queue.Queue(3) q.get() q.put() 先进先出 队列后进先出 堆栈优先级队列 1 """先进先出 队列""" 2 import queue 3 q=queue.Queue(3) #先进先出->队列 4 5 q.put('first') 6 q.put(2) 7 # q.put('third') 8 # q.put(4) 9 q.put(4,block=False) #q.put_no