先提到线程同步是个什么,概念是什么,就是线程通讯中通过使用某种技术访问数据时,而一旦此线程访问到,其他线程也就不能访问了,直到该线程对数据完成操作才结束。
Event事件是一种实现方式:通过内部的标记看看是不是变化,也就是true or false了,
将set(),clear(),is_set(),为true,wait(timeout=None)此种设置true的时长,等到返回false,不等到超时返回false,无线等待为None,
来看一个wait的使用:
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)
def A(event:Event, interval:int):
while not event.wait(interval): # 要么true or false
logging.info(‘hello‘)
e = Event()
Thread(target=A, args=(e, 3)).start()
e.wait(8)
e.set()
print(‘end--------------‘)
使用锁Lock解决数据资源在争抢,从而使资源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默认阻塞,阻塞设置超时时间,非阻塞,timeout禁止使用,成功获取锁,返回True,否则False。
有阻塞就有释放 ,解开锁,release(),从线程释放锁,上锁的锁重置为unloced未上锁调用,抛出RuntimeError异常。
import threading
from threading import Thread, Lock
import logging
import time
FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
lock = Lock()
def worker(count=10):
logging.info("I‘m working for U.")
flag = False
while True:
lock.acquire() # 获取锁
if len(cups) >= count:
flag = True
time.sleep(0.0001)
if not flag:
cups.append(1)
if flag:
break
logging.info(‘I finished. cups = {}‘.format(len(cups)))
for _ in range(10):
Thread(target=worker, args=(1000,)).start()
使用锁的过程中,总是不经意加上锁,出现死锁的产生,出现了死锁,如何解决呢?
使用try finally 将锁释放,另一种使用with上下文管理。
锁的使用场景在于应该少用锁,还要就是如若上锁,将锁的使用时间缩短,避免时间太长而出现无法释放锁的结果。
可重入锁Lock,
import threading
import time
lock = threading.RLock()
print(lock.acquire())
print(‘------------‘)
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print(‘main thread {}‘.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意观察lock对象的信息
lock.release()
#lock.release() #多了一次
print(‘===============‘)
print()
print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨线程了,异常
lock.release()
print(‘~~~~~~~~~~~~~~~~~‘)
print()
# 测试多线程
print(lock.acquire())
def sub(l):
print(‘{}: {}‘.format(threading.current_thread(), l.acquire())) # 阻塞
print(‘{}: {}‘.format(threading.current_thread(), l.acquire(False)))
print(‘lock in sub thread {}‘.format(lock))
l.release()
print(‘sub 1‘)
l.release()
print(‘sub 2‘)
# l.release() # 多了一次
threading.Timer(2, sub, args=(lock,)).start() # 传入同一个lock对象
print(‘++++++++++++++++++++++‘)
print()
print(lock.acquire())
lock.release()
time.sleep(5)
print(‘释放主线程锁‘)
lock.release()
使用构造方法Condition(lock=None),默认是Rloc,
具体方法为;
acquire(*args),获取锁
wait(self,timeout=None),等待或超时
notify(n=1),唤醒线程,没有等待就没有任何操作,指线程
notify_all(),唤醒所有等待的线程。
Condition主要用于生产者和消费者模型中,解决匹配的问题。
使用方式:先获取acquire,使用完了要释放release,避免死锁最好使用with上下文;生产者和消费者可以使用notify and notify_all。
如下例子:
from threading import Thread, Event
import logging
import random
FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是为了演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是为了使用方便,与逻辑无关
def produce(self, total):
for _ in range(total):
data = random.randint(0,100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
data = self.data
logging.info("recieved {}".format(data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name=‘producer‘)
c = Thread(target=d.consume, name=‘consumer‘)
c.start()
p.start()
这里代码会有缺陷:优化如下:
from threading import Thread, Event, Condition
import logging
import random
FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是为了演示,不考虑线程安全问题
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是为了使用方便,与逻辑无关
self.cond = Condition()
def produce(self, total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) # 模拟产生数据速度
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait() # 阻塞等通知
logging.info("received {}".format(self.data))
self.event.wait(0.5) # 模拟消费的速度
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name=‘producer‘)
# 增加消费者
for i in range(5):
c = Thread(target=d.consume, name=‘consumer-{}‘.format(i))
c.start()
p.start()
Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),构建barrier对象,timeout未指定的默认值;
n_waiting ,当前barrier等待的线程数。;
parties ,需要等待
wait(timeout=None),wait方法设置超时并超时发送,barrie处于broken状态。
而broken的状态方法:
broken,打开状态,返回true;
abort(),barrier在broken状态中,wait等待的线程会抛出BrokenBarrierError异常,直到reset恢复barrier;
reset(),恢复barrier,重新开始拦截。
barrier不做演示:
还有semaphore信号量,每次acquire时,都会减一,到0时的线程再到release后,大于0,恢复阻塞的线程。
方法:
Semaphore(value=1) 构造方法,alue小于0,抛ValueError异常;
acquire(blocking=True, timeout=None) 获取信号量,计数器减1,获取成功返回True;
release() 释放信号量,计数器加1。
使用信号量处理时,需要注意release超界问题,边界问题,其实,在使用中,python有GIL的存在,有的多线程就变成线程安全的,注意一点,但实际上它们并不是线程安全类型。因此我们在使用中要具体场景具体分析具体使用。
原文地址:http://blog.51cto.com/13890970/2327927