python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)

9.11 进程池与线程池

池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务

池子内什么时候装进程:并发的任务属于计算密集型 池子内什么时候装线程:并发的任务属于IO密集型

进程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,os,random
?
def task(x):
    print(‘%s 接客‘ %os.getpid())
    time.sleep(random.randint(2,5))
    return x**2
?
if __name__ == ‘__main__‘:  # ProcessPoolExecutor创建并开启指定数目的进程
    p=ProcessPoolExecutor() # 默认开启的进程数是cpu的核数
?
    for i in range(20):
        p.submit(task,i)    # 一下并行执行四个任务,等其中一个任务执行完后再执行下一个

线程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,os,random
?
def task(x):
    print(‘%s 接客‘ %x)
    time.sleep(random.randint(2,5))
    return x**2
?
if __name__ == ‘__main__‘:  # ThreadPoolExecutor创建并开启指定数目的线程
    p=ThreadPoolExecutor(4) # 默认开启的线程数是cpu的核数*5
?
    for i in range(20):
        p.submit(task,i)    # 一下并发执行四个任务,等其中一个任务执行完后再并发执行下一个

9.112 基于多线程实现并发的套接字通信(使用线程池)

服务端:

from socket import *
from threading import Thread
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
?
tpool=ThreadPoolExecutor(3)         #ThreadPoolExecutor创建并开启指定数目的线程
def communicate(conn,client_addr):
    while True:  # 通讯循环
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
?
def server():
    server=socket(AF_INET,SOCK_STREAM)
    server.bind((‘127.0.0.1‘,8080))
    server.listen(5)
?
    while True: # 链接循环
        conn,client_addr=server.accept()
        print(client_addr)
        # t=Thread(target=communicate,args=(conn,client_addr))
        # t.start()
        tpool.submit(communicate,conn,client_addr)#一下并发执行3个任务,等其中一个任务执行完后再并发执行下一个
    server.close()
?
if __name__ == ‘__main__‘:
    server()

客户端:

from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))
?
while True:
    msg=input(‘>>>: ‘).strip()
    if not msg:continue
    client.send(msg.encode(‘utf-8‘))
    data=client.recv(1024)
    print(data.decode(‘utf-8‘))
?
client.close()

9.12 同步异步阻塞非阻塞

阻塞与非阻塞指的是程序的两种运行状态:

阻塞:遇到 I/O 就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源

非阻塞(就绪态或运行态):没有遇到 I/O 操作,或者通过某种手段让程序即便是遇到 I/O 操作也不会停在原地,执行其他操作,力求尽可能多的占有CPU

同步与异步指的是提交任务的两种方式:

同步调用:提交完任务后,就在原地等待,直到任务运行完毕后,拿到任务的返回值,才继续执行下一行代码

异步调用:提交完任务后,不在原地等待,直接执行下一行代码

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,os,random
#from multiprocessing import Pool
def task(x):
    print(‘%s 接客‘ %x)
    time.sleep(random.randint(1,3))
    return x**2
?
if __name__ == ‘__main__‘:
    # 异步调用
    p=ThreadPoolExecutor(4) # 默认开启的线程数是cpu的核数*5
    obj_l=[]
    for i in range(10):
        obj=p.submit(task,i)
        obj_l.append(obj)
?
    # p.close()
    # p.join()
    p.shutdown(wait=True)# shutdown指的是不能再往进程池内提交任务,wait=True指等待进程池或线程池内所有的任务都运行完毕
    print(obj_l[3].result()) # 9    #最后拿结果
    print(‘主‘)
?
    # 同步调用
    p=ThreadPoolExecutor(4) # 默认开启的线程数是cpu的核数*5
    for i in range(10):
        print(p.submit(task,i).result())
    print(‘主‘)

9.121 异步调用+回调机制

问题:

1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理

2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s

基于进程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random
?
def get(url):
    print(‘%s GET %s‘ %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
    if response.status_code == 200:
        return response.text
?
def pasrse(obj):            # 干解析的活
    res=obj.result()        # 回调拿结果
    print(‘%s 解析结果为:%s‘ %(os.getpid(),len(res))) # 4108 解析结果为:2443
?
if __name__ == ‘__main__‘:
    urls=[
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.python.org‘,
    ]
?
    pool=ProcessPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)    #parse函数会在obj对应的任务执行完毕后自动执行,会把obj自动传给parse
        obj.add_done_callback(pasrse)   #四个进程并发爬取信息,主进程在执行解析操作
?
    print(‘主进程‘,os.getpid())         # 主进程 4108

基于线程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random
?
def get(url):
    print(‘%s GET %s‘ %(current_thread().name,url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
    if response.status_code == 200:
        return response.text
?
def pasrse(obj):     # 干解析的活
    res=obj.result()
    print(‘%s 解析结果为:%s‘ %(current_thread().name,len(res)))#ThreadPoolExecutor-0_1 解析结果为:
                                                         #2443
if __name__ == ‘__main__‘:                              #ThreadPoolExecutor-0_3 解析结果为:2443
    urls=[
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.baidu.com‘,
        ‘https://www.python.org‘,
    ]
?
    pool=ThreadPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)    #parse函数会在obj对应的任务执行完毕后自动执行,会把obj自动传给parse
        obj.add_done_callback(pasrse)     #四个线程并发爬取信息,空闲者执行解析操作
    print(‘主线程‘,current_thread().name)  #主线程 MainThread

9.13 线程queue

队列:先进先出 queue.Queue()

import queue
q=queue.Queue(3)
?
q.put(1)
q.put(2)
q.put(3)
# q.put(4)   阻塞
?
print(q.get())  #1
print(q.get())  #2
print(q.get())  #3

堆栈:后进先出 queue.LifoQueue()

import queue
q=queue.LifoQueue(3)
?
q.put(‘a‘)
q.put(‘b‘)
q.put(‘c‘)
?
print(q.get())  #c
print(q.get())  #b
print(q.get())  #a

优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高

PriorityQueue()

import queue
q=queue.PriorityQueue(3)
q.put((10,‘user1‘))
q.put((-3,‘user2‘))
q.put((-2,‘user3‘))
?
print(q.get())  #(-3, ‘user2‘)
print(q.get())  #(-2, ‘user3‘)
print(q.get())  #(10, ‘user1‘)

原文地址:https://www.cnblogs.com/mylu/p/11247125.html

时间: 2024-11-10 06:49:47

python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)的相关文章

并发编程--一堆锁,GIL,同步异步,Event事件

目录 一堆锁 死锁现象(*****) 递归锁 RLock (了解) 信号量 (了解) GIL(*****) 什么时GIL锁 为什么需要GIL锁 Cpython解释器与GC的问题 GIL锁带来的问题 多线程与多进程性能对比 进程池与线程池 同步异步(*****) Event事件 一堆锁 死锁现象(*****) ? 死锁指的是,某个资源被占用之后,一直得不到释放,导致其他需要这个资源的线程进入阻塞状态 产生死锁的情况 对同一把互斥锁,进行了多次加锁 一个共享资源,在访问时必须具备多把锁,但是这些锁被

python语法基础-并发编程-进程-长期维护

###############    进程的启动方式1    ############## """ 并发编程: 进程 1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了, 操作系统可以管理进程,进程是操作系统基本的执行单元, 2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间, 操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因, ####################################### 进

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

操作系统发展史(主要的几个阶段) 初始系统 1946年第一台计算机诞生,采用手工操作的方式(用穿孔卡片操作) 同一个房间同一时刻只能运行一个程序,效率极低(操作一两个小时,CPU一两秒可能就运算完了) 联机批处理系统 脱机批处理系统 多道程序系统 1.空间上的复用 ? 多个程序公用一套计算机硬件 2.时间上的复用 ? 切换+保存状态 ? 保存状态:保存当前的运行状态,下次接着该状态继续执行 ? 切换的两种情况 ? (1) 当一个程序遇到 I/O 操作(不需要使用CPU),操作系统会剥夺该程序的C

# 进程/线程/协程 # IO:同步/异步/阻塞/非阻塞 # greenlet gevent # 事件驱动与异步IO # Select\Poll\Epoll异步IO 以及selectors模块 # Python队列/RabbitMQ队列

1 # 进程/线程/协程 2 # IO:同步/异步/阻塞/非阻塞 3 # greenlet gevent 4 # 事件驱动与异步IO 5 # Select\Poll\Epoll异步IO 以及selectors模块 6 # Python队列/RabbitMQ队列 7 8 ############################################################################################## 9 1.什么是进程?进程和程序之间有什么

python之并发编程

一.操作系统 1.概念 操作系统就是一个协调.管理和控制计算机硬件资源和软件资源的控制程序,操作系统位于计算机硬件与应用软件之间,本质也是一个软件. 操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序员写的应用程序提供系统调用接口)两部分组成 现代计算机或者网络都是多用户的,多个用户不仅共享硬件,而且共享文件,数据库等信息,共享意味着冲突和无序. 2.操作系统功能 1.记录哪个程序使用什么资源 2.对资源请求进行分配 3.为不同的程序和用户调解互相冲突的

Python 3 并发编程多进程之队列(推荐使用)

Python 3 并发编程多进程之队列(推荐使用) 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 可以往队列里放任意类型的数据 创建队列的类(底层就是以管道和锁定的方式实现): 1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: 1 maxsize是队列中允许最大项数,省略则无大小限制. 方法介绍: 1.主要

Python并发编程之同步\异步and阻塞\非阻塞

一.什么是进程 进程: 正在进行的一个过程或者说一个任务.而负责执行任务则是cpu. 进程和程序的区别: 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程. 需要强调的是:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播郭德纲,一个可以播高晓松. 二.并行和并发 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务 (一)并发:是伪并行,即

Python 3 并发编程多进程之进程同步(锁)

Python 3 并发编程多进程之进程同步(锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,竞争带来的结果就是错乱,如何控制,就是加锁处理. 1.多个进程共享同一打印终端 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.g

30分钟读懂进程线程、同步异步、阻塞非阻塞、并发并行

基本概念 1 进程和线程 进程(Process): 是Windows系统中的一个基本概念,它包含着一个运行程序所需要的资源.一个正在运行的应用程序在操作系统中被视为一个进程,进程可以包括一个或多个线程.线程是操作系统分配处理器时间的基本单元,在进程中可以有多个线程同时执行代码.进程之间是相对独立的,一个进程无法访问另一个进程的数据(除非利用分布式计算方式),一个进程运行的失败也不会影响其他进程的运行,Windows系统就是利用进程把工作划分为多个独立的区域的.进程可以理解为一个程序的基本边界.是