信号量
其实本质上是锁,Lock是单锁,信号量是指定多把锁,也就是说通过信号量指定多个数线程可以访问相同资源,一般情况下读操作可以有多个,但写操作同时只有一个
信号量模块 semaphore
# 使用起来和普通锁没 什么区别,但这个是比锁更加粗粒度锁,锁的是线程
# 在线程实例前加锁,把锁传递进线程,在线程结束时候释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
from threading import Thread, Semaphore
from queue import Queue
def add(chan, sem_lock):
for i in range ( 10 ):
chan.put(i)
# 释放锁
sem_lock.release()
if __name__ = = ‘__main__‘ :
numbers = Queue()
# 申明信号量
sem_lock = Semaphore( 4 )
sem_lock.acquire()
# 把锁传递进线程
tasks = {Thread(target = add, args = (numbers, sem_lock), name = "北门吹雪 %s" % i) for i in range ( 10 )}
for task in tasks:
task.start()
for task in tasks:
task.join()
print (numbers.get())
|
线程池
不仅仅是数量控制,可以获取线程状态、任务状态、线程返回值等信息
线程池模块 ThreadPollExecutor
线程池使用过程
1. 实例化线程池
2. 提交任务,会有个返回对象,submit是不会堵塞,立即返回
3. 让主线程等待线程执行完成
4. 关闭线程池
获取状态信息 线程对象
1. 判断是否执行完 .done()
2. 获取任务执行结果,堵塞 .result()
3. 取消任务 .cancle()
对多个线程列表获取结果 线程对象
1. as_complated 获取已经执行完成的线程结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
def add(number, name):
sum = 0
for i in range (number):
sum + = i
# 模拟个线程执行堵塞情况
time.sleep(random())
# 返回线程执行结果
return sum
if __name__ = = ‘__main__‘ :
thread_pool = ThreadPoolExecutor(max_workers = 3 )
print ( "北门吹雪:http://www.cnblogs.com/2bjiujiu/" )
name = "北门吹雪"
tasks = {thread_pool.submit(add, randint( 10 , 20 ), name) for _ in range ( 20 )}
# map方法和as_completed最大区别在于map变化的只是参数线程是同一个线程,而as_completed可以执行不同的线程任务
for data in thread_pool. map (add, {randint( 10 , 20 ) for _ in range ( 20 )}):
print (data)
|
2. map 直接返回线程执行结果,保持传递进去顺序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def add(number):
sum = 0
for i in range (number):
sum + = i
# 模拟个线程执行堵塞情况
time.sleep(random())
# 返回线程执行结果
return sum
if __name__ = = ‘__main__‘ :
print ( "北门吹雪" )
thread_pool = ThreadPoolExecutor(max_workers = 3 )
tasks = {thread_pool.submit(add, randint( 10 , 20 )) for _ in range ( 20 )}
# map方法和as_completed最大区别在于map变化的只是参数线程是同一个线程,而as_completed可以执行不同的线程任务
for data in thread_pool. map (add, {randint( 10 , 20 ) for _ in range ( 20 )}):
print (data)
|
3. wait 等待所有线程执行完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from random import randint, random
import time
def add(number):
sum = 0
for i in range (number):
sum + = i
# 模拟个线程执行堵塞情况
time.sleep(random())
# 返回线程执行结果
return sum
if __name__ = = ‘__main__‘ :
thread_pool = ThreadPoolExecutor(max_workers = 3 )
tasks = {thread_pool.submit(add, randint( 10 , 20 )) for _ in range ( 20 )} print ( "北门吹雪" )
# 主线程等待所有子线程执行完,不需要结果
# wait(tasks)
|
1
|
<span style = "font-family: ‘Microsoft YaHei‘;" >北门吹雪:http: / / www.cnblogs.com / 2bjiujiu / < / span>
|
经验:
1. 线程池和信号量在某种程度如允许执行的线程数效果上是一样,但线程池可以获取线程执行结果得到线程执行状态
2. 使用线程池需要首先实例化,然后提交线程,返回线程对象,然后在主线程中选择获取结果或者不需要结果,也可以选择堵塞等待线程执行完或不等待线程执行完
3. 获取线程执行结果,可以参照Go语言中CSP通信模式,个人觉得这是个非常好的解决方案,这样的线程池接口提交远比CSP通信来的复杂
原文地址:https://www.cnblogs.com/LucasSong/p/12262879.html
时间: 2024-11-07 04:38:24