进程池、线程池及回调函数使用

一、线程池与进程池

池表示容器

线程就是装线程的容器

为什么要装到容器中

  1. 可以避免频繁的创建和销毁(进程/线程)来的资源开销
  2. 可以限制同时存在的线程数量 以保证服务器不会应为资源不足而导致崩溃
  3. 帮我们管理了线程的生命周期
  4. 管理了任务的分配
    import os
    import time
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import activeCount,enumerate,currentThread
    
    # # 创建一个线程池   指定最多可以容纳两个线程
    # pool = ThreadPoolExecutor(20)
    #
    # def task():
    #     print(currentThread().name)
    #
    # # 提交任务到池子中
    # pool.submit(task)
    # pool.submit(task)
    #
    # print(enumerate())
    
    # 进程池的使用
    
    def task():
        time.sleep(1)
        print(os.getpid())
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(2)
        pool.submit(task)
        pool.submit(task)
        pool.submit(task)

如果进程不结束 池子里面的进程或线程 也是一直存活的

二、同步异步

阻塞 非阻塞 程序的状态

并行 并发 串行 处理任务的方式

1、同步

指的是 提交任务后必须在原地等待 直到任务结束 ===阻塞????

2、异步

提交任务后不需要在原地等待 可以继续往下执行代码

异步效率高于同步 ,异步任务将导致一个问题 就是 任务的发起方不知道任务何时 处理完毕

异步同步指的是提交任务的方式

3、解决方法:

? 1.轮询 重复的隔一段时间就问一次

        效率低 无法及时获取结果  不推荐

? 2.让任务的执行方主动通知 (异步回调)

? 可以及时拿到任务的结果 推荐方式

异步回调使用案例:

# 异步回调
from threading import Thread
# 具体的任务
def task(callback):
    print("run")
    for i in range(100000000):
        1+1
    callback("ok")

#回调函数 参数为任务的结果
def finished(res):
    print("任务完成!",res)

print("start")
t = Thread(target=task,args=(finished,))
t.start()  #执行task时 没有导致主线程卡主 而是继续运行
print("over")

线程池中回调的使用

# 使用案例:
def task(num):
    time.sleep(1)
    print(num)
    return "hello python"

def callback(obj):
    print(obj.result())

pool = ThreadPoolExecutor()
res = pool.submit(task,123)
res.add_done_callback(callback)
print("over") 

三、线程池回到函数案例(爬取校花网)

import requests
import re
from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(50)

# 爬虫三部曲

# 一 发送请求
def get_page(url):
    print('%s GET start ...' % url)
    index_res = requests.get(url)
    return index_res.text

# 二 解析数据
# 解析主页
def parse_index(index_page):
    # 拿到主页的返回结果
    res = index_page.result()
    detail_urls = re.findall('<div class="items">.*?href="(.*?)"', res, re.S)
    # print(detail_urls)

    for detail_url in detail_urls:
        if not detail_url.startswith('http'):
            detail_url = 'http://www.xiaohuar.com' + detail_url

        pool.submit(get_page, detail_url).add_done_callback(parse_detail)
        # yield detail_url

# 解析详情页
def parse_detail(detail_page):
    res = detail_page.result()

    video_urls = re.findall('<source src="(.*?.mp4)">', res, re.S)
    print("==",video_urls)

    if video_urls:
        video_urls = video_urls[0]
        pool.submit(save_video, video_urls)

    # print(video_urls)

# 三 保存数据
import uuid
def save_video(video_url):
    try:
        print("---",video_url)
        res = requests.get(video_url)
        with open(r'/Users/haiyuanyang/Desktop/python9/movie/%s.mp4' % uuid.uuid4(), 'wb') as f:
            f.write(res.content)
            f.flush()
            print('%s done ...' % video_url)

    except Exception:
        pass

if __name__ == '__main__':
    base_url = 'http://www.xiaohuar.com/list-3-{}.html'
    for line in range(10):
        index_url = base_url.format(line)
        pool.submit(get_page, index_url).add_done_callback(parse_index)

原文地址:https://www.cnblogs.com/chuwanliu/p/11176977.html

时间: 2024-11-09 10:24:56

进程池、线程池及回调函数使用的相关文章

python 管道 事件 信号量 进程池(map/同步/异步)回调函数

####################总结######################## 管道:是进程间通信的第二种方式,但是不推荐使用,因为管道会导致数据不安全的情况出现 事件:当我运行主进程的时候 需要子执行某个进程后 需要的返回值时 可以使用 信号量:互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 . 内部维护了一个计数器,acquire-1,release+1,为0的时候,其他的进程都要在acquire之前等待 进程池:  进程的创建和销

进程池/线程池/协程

导入进程池线程池模块@@ from concurrent import ProcessPoolExecutor,ThreadPoolExecutor 同步提交,异步提交@@ from concurrent import ProcessPoolExecutor,ThreadPoolExecutorimport osdef foo(name): print('进程%s,进程号为%s'%(name,os.getpid())) return os.getpid() if __nmae__=='__mai

Python3 从零单排28_线程队列&amp;进程池&amp;线程池

1.线程队列 线程队列有三种:先进先出,后进先出,按优先级进出,具体如下: 1 import queue 2 3 # 先进先出 4 q = queue.Queue(3) 5 6 q.put(1) 7 q.put(2) 8 q.put(3) 9 # q.put(4) # 再放阻塞,等待队列消费 10 # q.put(4,block = False) # 不阻塞,强制放数据,如果满的情况下直接报错 等价与 q.put_nowait(4) 11 # q.put(4,block = True) # 阻塞

进程池线程池 协程

socket服务端实现并发 服务端需要满足以下3点: 1 固定的ip和port 2 24小时提供服务 3 能够实现并发 多线程实现并发: 服务端: import socket from threading import Thread import os server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) #半连接池 def communicate(conn): while True: try: dat

并发编程---线程queue---进程池线程池---异部调用(回调机制)

线程 队列:先进先出 堆栈:后进先出 优先级:数字越小优先级越大,越先输出 import queue q = queue.Queue(3) # 先进先出-->队列 q.put('first') q.put(2) # q.put('third') # q.put(4) #由于没有人取走,就会卡主 q.put(4,block=False) #等同于q.get_nowait(), Ture 阻塞,Flase不阻塞,报异常满了 # # q.put(4,block=True,timeout=3) prin

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

目录 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 2.死锁现象与递归锁 2.1死锁现象 2.2递归锁 3.信号量 4.GIL全局解释器锁 4.1背景 4.2为什么加锁 5.GIL与Lock锁的区别 6.验证计算密集型IO密集型的效率 6.1 IO密集型 6.2 计算密集型 7.多线程实现socket通信 7.1服务端 7.2客户端 8.进程池,线程池 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 #生产者消

进程池线程池

进程池,线程池 什么是池 # 要在程序开始的时候,还没提交任务先创建固定数量的进程或线程 放在一个池子里,这就是池 为什么要用池? # 如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的数据了 # 并且开好的线程或者进程会一直存在在池中 处理完毕进程并不关闭,可以被多个任务反复利用 # 这样极大的减少了开启\关闭\调度线程/进程的时间开销 # 池中的线程/进程个数控制了操作系统需要调度的任务个数,控制池中的单位 # 有利于提高操作系统的效率,减轻操作系统的负担 开启一个池 from c

android线程与线程池-----线程池(二)《android开发艺术与探索》

android 中的线程池 线程池的优点: 1 重用线程池中的线程,避免了线程的创建和销毁带来的性能开销 2 能有效的控制最大并发数,避免大量线程之间因为喜欢抢资源而导致阻塞 3 能够对线程进行简单的管理,提供定时执行以及指定间隔时间循环执行等 android 中的线程池源自java 中的Executor,Executor是一个接口,正真的实现是ThreadPoolExecutor. ThreadPoolExecutor 提供参数配置线程池. 下面是一个常用的构造方法: public Threa

(并发编程)进程池线程池--提交任务的2种方式、协程--yield greenlet,gevent模块

一:进程池与线程池(同步,异步+回调函数)先造个池子,然后放任务为什么要用"池":池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务池子内什么时候装进程:并发的任务属于计算密集型池子内什么时候装线程:并发的任务属于IO密集型 #提交任务的两种方式:    # 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的    # 异步调用:提交完一个任务之后,不在原地等待,结果???,而是

9-[多线程] 进程池线程池

1 # 进程池,线程池 from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os import time def task(name): print('%s is running <pid: %s>' % (name, os.getpid())) time.sleep(2) if __name__ == '__main__': # p = Process(target=task, args=