并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制

1.线程queue :会有锁    q=queue.Queue(3)    q.get()    q.put()

先进先出 队列后进先出 堆栈优先级队列
 1 """先进先出 队列"""
 2 import queue
 3 q=queue.Queue(3) #先进先出->队列
 4
 5 q.put(‘first‘)
 6 q.put(2)
 7 # q.put(‘third‘)
 8 # q.put(4)
 9 q.put(4,block=False) #q.put_nowait(4)
10 # q.put_nowait(4)
11 # q.put(4,block=True)  # True 阻塞 False 不阻塞 直接告诉你 队列满了
12 # q.put(4,block=True,timeout=3) # 阻塞等待3秒 还没有拿走数据就抛异常
13 #
14 print(q.get())
15 print(q.get())
16 print(q.get())
17 print(q.get(block=True,timeout=2))    # false 不阻塞没有数据就抛异常  默认是阻塞 block=True
18 print(q.get_nowait()) # 相当于block=false
19 # def get(self, block=True, timeout=None):
20
21
22 """后进先出 堆栈"""
23 import queue
24 q=queue.LifoQueue(3)  #后进先出->堆栈
25 q.put(‘first‘)
26 q.put(2)
27 q.put(‘third‘)
28
29 print(q.get())
30 print(q.get())
31 print(q.get())
32
33 """优先级队列 """
34 import queue
35 q=queue.PriorityQueue(3) #优先级队列
36
37 q.put((10,{‘alice‘:12}))  # 数字越小 优先级越高 优先拿出来
38 q.put((40,‘two‘))
39 q.put((30,‘three‘))
40
41 print(q.get())
42 print(q.get())
43 print(q.get())
2.线程池进程池:client server 是IO 操作应该用多线程计算密集型: 用多进程io密集型:用多线程

池:对数目加以限制,保证机器正常运行
 1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 import os,time,random
 3
 4 def task(name):
 5     print(‘name:%s pid:%s run‘ %(name,os.getpid()))
 6     time.sleep(random.randint(1,3))
 7
 8
 9 if __name__ == ‘__main__‘:
10     pool=ProcessPoolExecutor(4)  # 不指定 默认是cpu的核数
11     # pool=ThreadPoolExecutor(5)
12
13     for i in range(10):
14         pool.submit(task,‘egon%s‘ %i)  # 异步调用池子收了10个任务,但同一时间只有4个任务在进行
15
16     pool.shutdown(wait=True)  # 类似join  代表往池子里面丢任务的入口封死了 计数器-1
17
18
19     print(‘主‘)
20 """
21 主                         # # 异步调用池子收了10个任务,但同一时间只有4个任务在进行
22 name:egon0 pid:60056 run     # 只有4个pid
23 name:egon1 pid:64700 run
24 name:egon2 pid:59940 run
25 name:egon3 pid:60888 run
26
27 name:egon4 pid:60888 run
28
29 name:egon5 pid:60056 run
30 name:egon6 pid:60888 run
31
32 name:egon7 pid:60056 run
33 name:egon8 pid:64700 run
34 name:egon9 pid:59940 run
35 """
36 # pool.shutdown(wait=True) # 代表往池子里面丢任务的入口封死了 计数器-1
37 """
38 name:egon0 pid:57124 run
39 name:egon1 pid:62252 run
40 name:egon2 pid:55736 run
41 name:egon3 pid:62060 run
42 name:egon4 pid:57124 run
43 name:egon5 pid:62252 run
44 name:egon6 pid:55736 run
45 name:egon7 pid:55736 run
46 name:egon8 pid:62060 run
47 name:egon9 pid:55736 run
48 主
49 """
50
51 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
52 from threading import currentThread
53 import os,time,random
54
55 def task():
56     print(‘name:%s pid:%s run‘ %(currentThread().getName(),os.getpid()))
57     time.sleep(random.randint(1,3))
58
59
60 if __name__ == ‘__main__‘:
61     pool=ThreadPoolExecutor(5)
62
63     for i in range(10):
64         pool.submit(task)
65
66     pool.shutdown(wait=True)
67
68
69     print(‘主‘)
70 """
71 name:ThreadPoolExecutor-0_0 pid:61508 run
72 name:ThreadPoolExecutor-0_1 pid:61508 run
73 name:ThreadPoolExecutor-0_2 pid:61508 run
74 name:ThreadPoolExecutor-0_3 pid:61508 run
75 name:ThreadPoolExecutor-0_4 pid:61508 run
76 name:ThreadPoolExecutor-0_2 pid:61508 run
77 name:ThreadPoolExecutor-0_4 pid:61508 run
78 name:ThreadPoolExecutor-0_0 pid:61508 run
79 name:ThreadPoolExecutor-0_3 pid:61508 run
80 name:ThreadPoolExecutor-0_1 pid:61508 run
81 主
82 """
3.异步调用与回调机制:提交任务的两种方式:    同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行,效率低    异步调用:提交完任务后,不等待任务执行完毕。异步调用+回调机制  自动触发叫回调
 1 """同步调用"""
 2 from concurrent.futures import ThreadPoolExecutor
 3 import time
 4 import random
 5
 6 def la(name):
 7     print(‘%s is laing‘ %name)
 8     time.sleep(random.randint(3,5))
 9     res=random.randint(7,13)*‘#‘
10     return {‘name‘:name,‘res‘:res}
11
12 def weigh(shit):
13     name=shit[‘name‘]
14     size=len(shit[‘res‘])
15     print(‘%s 拉了 《%s》kg‘ %(name,size))
16
17
18 if __name__ == ‘__main__‘:
19     pool=ThreadPoolExecutor(13)
20
21     shit1=pool.submit(la,‘alex‘).result()
22     weigh(shit1)
23
24     shit2=pool.submit(la,‘wupeiqi‘).result()
25     weigh(shit2)
26
27     shit3=pool.submit(la,‘yuanhao‘).result()
28     weigh(shit3)
29
30
31 """异步调用 + 回调机制  自动触发叫回调"""
32 from concurrent.futures import ThreadPoolExecutor
33 import time
34 import random
35
36 def la(name):
37     print(‘%s is laing‘ %name)
38     time.sleep(random.randint(3,5))
39     res=random.randint(7,13)*‘#‘
40     return {‘name‘:name,‘res‘:res}
41     # weigh({‘name‘:name,‘res‘:res})  # 这样写不好  所有功能 写在一起了
42
43
44 def weigh(shit):
45     shit=shit.result()  # 拿到是 对象 需要result()
46     name=shit[‘name‘]
47     size=len(shit[‘res‘])
48     print(‘%s 拉了 《%s》kg‘ %(name,size))
49
50
51 if __name__ == ‘__main__‘:
52     pool=ThreadPoolExecutor(13)
53
54     # pool.submit(la, ‘alex‘)
55     # pool.submit(la, ‘wupeiqi‘)
56     # pool.submit(la, ‘yuanhao‘)
57
58     pool.submit(la,‘alex‘).add_done_callback(weigh) # 实现了程序的解耦合
59     pool.submit(la,‘wupeiqi‘).add_done_callback(weigh)
60     pool.submit(la,‘yuanhao‘).add_done_callback(weigh)
4.异步调用与回调机制应用:pip3 install requests    requests

异步调用+回调机制的 应用场景:
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):   # io操作  基于线程 数目有限 用线程池
    print(‘GET %s‘ %url)
    response=requests.get(url)
    time.sleep(3)
    return {‘url‘:url,‘content‘:response.text}

def parse(res):
    res=res.result()
    print(‘%s parse res is %s‘ %(res[‘url‘],len(res[‘content‘])))

if __name__ == ‘__main__‘:
    urls=[
        ‘http://www.cnblogs.com/linhaifeng‘,
        ‘https://www.python.org‘,
        ‘https://www.openstack.org‘,
    ]

    pool=ThreadPoolExecutor(2)

    for url in urls:
        pool.submit(get,url).add_done_callback(parse)

原文地址:https://www.cnblogs.com/alice-bj/p/8716704.html

时间: 2024-10-09 19:36:48

并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制的相关文章

Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池

目录 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 2.死锁现象与递归锁 2.1死锁现象 2.2递归锁 3.信号量 4.GIL全局解释器锁 4.1背景 4.2为什么加锁 5.GIL与Lock锁的区别 6.验证计算密集型IO密集型的效率 6.1 IO密集型 6.2 计算密集型 7.多线程实现socket通信 7.1服务端 7.2客户端 8.进程池,线程池 Python并发编程05/ 死锁/递归锁/信号量/GIL锁/进程池/线程池 1.昨日回顾 #生产者消

并发编程—— 任务取消 之 停止基于线程的服务

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 并发编程—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程—— Callable和Future 并发编程—— CompletionService : Executor 和 BlockingQueue 并发编程—— 任务取消 并发编程—— 任务取消 之 中断 并发编程—— 任务取消 之 停止基于线程的服务 概述 第1 部分 问题描述 第2 部分

Python并发编程之线程池/进程池--concurrent.futures模块

h2 { color: #fff; background-color: #f7af0d; padding: 3px; margin: 10px 0px } 一.关于concurrent.futures模块 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间.但从Python3.2开始,标准库为我们提供了conc

Java并发编程学习笔记(一)线程安全性 1

什么是线程安全性: 要编写线程安全的代码,其核心在于要对状态访问操作进行管理,特别是对共享的和可变的状态的访问."共享"意味着变量可以由多个线程同时访问,而"可变"则意味着变量的值在其生命周期内可以发生变化. 一个对象是否需要线程安全的,取决于他是否被多个线程访问.这指的是在程序中访问对象的方式,而不是对象要实现的功能.要使得对象时线程安全的,需要采用同步机制来协同对对象可变状态的访问.如果无法实现协同,那么可能导致数据破坏以及其他不该出现的结果. 如果当多个线程访

线程Queue、定时器、进程池和线程池、同步异步

目录 线程Queue.定时器.进程池和线程池.多线程socket通信 一.Queue队列实现线程通信 二.线程定时器(Timer) 三.进程池和线程池 四.同步和异步 4.1.同步 4.2 .异步 五.多线程socket升级 线程Queue.定时器.进程池和线程池.多线程socket通信 一.Queue队列实现线程通信 queue模块下提供了几个阻塞队列,这些队列主要用于实现线程通信.在queue模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列.出队列的不同. 关于这三个队列

GIL 线程池 进程池 同步 异步

1.GIL(理论 重点)2.线程池 进程池3.同步 异步 GIL 是一个全局解释器锁,是一个互斥锁 为了防止竞争解释器资源而产生的 为何需要gil:因为一个python.exe进程中只有一份解释器,如果这个进程开启了多个线程 都要执行代码 多线程之间要竞争解释器 一旦竞争就有可能出现问题 带来的问题:同一时间只有一个线程可以访问解释器 好处:保证了多线程的数据安全 thread-safe 线程安全的 多个线程同时访问也不会出问题 not thread-safe 非线程安全的 多个线程同时访问可能

线程池&进程池

线程池&进程池 池子解决什么问题? 1.创建/销毁线程伴随着系统开销,如果过于频繁会影响系统运行效率 2.线程并发数量过多,抢占系统资源,从而导致系统阻塞甚至死机 3.能够刚好的控制和管理池子里面的线程和进程 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor:进程池,提供异步调用 常用方法 submit(fn, *args, **kwargs):异步提交任务 map(func,

[笔记][Java7并发编程实战手册]第三章-线程同步辅助类-概要

[笔记][Java7并发编程实战手册]系列目录 有点着急了,没有太注重质量,自己也没有理解透,从本章起,读书和随笔笔记的质量会更好. 第三章 在本章中,我们将学习: 资源的并发访问控制 资源的多副本的并发访问控制 等待多个并发事件的完成 在集合点的同步 并发阶段任务的运行 并发阶段任务中的阶段交换 并发任务间的数据交换 回顾 在第二章中主要学习了以下接口 synchronized关键字 Lock接口以及实现类,如ReentrantLock.ReentrantReadWriteLock中的Read

Java并发编程(十二):线程池的使用(转载)

本文转载自:http://www.cnblogs.com/dolphin0520/p/3932921.html 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.