python解决线程同步方案有哪些

先提到线程同步是个什么,概念是什么,就是线程通讯中通过使用某种技术访问数据时,而一旦此线程访问到,其他线程也就不能访问了,直到该线程对数据完成操作才结束。
Event事件是一种实现方式:通过内部的标记看看是不是变化,也就是true or false了,
将set(),clear(),is_set(),为true,wait(timeout=None)此种设置true的时长,等到返回false,不等到超时返回false,无线等待为None,

来看一个wait的使用:

from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def A(event:Event, interval:int):
    while not event.wait(interval): # 要么true  or false
        logging.info(‘hello‘)

e = Event()
Thread(target=A, args=(e, 3)).start()

e.wait(8)
e.set()
print(‘end--------------‘)

使用锁Lock解决数据资源在争抢,从而使资源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默认阻塞,阻塞设置超时时间,非阻塞,timeout禁止使用,成功获取锁,返回True,否则False。
有阻塞就有释放 ,解开锁,release(),从线程释放锁,上锁的锁重置为unloced未上锁调用,抛出RuntimeError异常。

import threading
from threading import Thread, Lock
import logging
import time

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘

logging.basicConfig(format=FORMAT, level=logging.INFO)


cups = []
lock = Lock()

def worker(count=10):
    logging.info("I‘m working for U.")
    flag = False
    while True:
    lock.acquire() # 获取锁

    if len(cups) >= count:
        flag = True

    time.sleep(0.0001)
    if not flag:
        cups.append(1)

    if flag:
        break

logging.info(‘I finished. cups = {}‘.format(len(cups)))

for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

使用锁的过程中,总是不经意加上锁,出现死锁的产生,出现了死锁,如何解决呢?
使用try finally 将锁释放,另一种使用with上下文管理。
锁的使用场景在于应该少用锁,还要就是如若上锁,将锁的使用时间缩短,避免时间太长而出现无法释放锁的结果。

可重入锁Lock,


import threading
import time

lock = threading.RLock()
print(lock.acquire())
print(‘------------‘)
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print(‘main thread {}‘.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意观察lock对象的信息
lock.release()
#lock.release() #多了一次
print(‘===============‘)
print()

print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨线程了,异常
lock.release()
print(‘~~~~~~~~~~~~~~~~~‘)
print()

# 测试多线程
print(lock.acquire())

def sub(l):
    print(‘{}: {}‘.format(threading.current_thread(), l.acquire())) # 阻塞
    print(‘{}: {}‘.format(threading.current_thread(), l.acquire(False)))
    print(‘lock in sub thread {}‘.format(lock))
    l.release()
    print(‘sub 1‘)
    l.release()
    print(‘sub 2‘)
    # l.release() # 多了一次

threading.Timer(2, sub, args=(lock,)).start() # 传入同一个lock对象
print(‘++++++++++++++++++++++‘)
print()

print(lock.acquire())

lock.release()
time.sleep(5)
print(‘释放主线程锁‘)
lock.release()

使用构造方法Condition(lock=None),默认是Rloc,
具体方法为;
acquire(*args),获取锁
wait(self,timeout=None),等待或超时
notify(n=1),唤醒线程,没有等待就没有任何操作,指线程
notify_all(),唤醒所有等待的线程。
Condition主要用于生产者和消费者模型中,解决匹配的问题。
使用方式:先获取acquire,使用完了要释放release,避免死锁最好使用with上下文;生产者和消费者可以使用notify and notify_all。

如下例子:

from threading import Thread, Event
import logging
import random

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是为了使用方便,与逻辑无关

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
            self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name=‘producer‘)
c = Thread(target=d.consume, name=‘consumer‘)
c.start()
p.start()

这里代码会有缺陷:优化如下:

from threading import Thread, Event, Condition
import logging
import random

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
                self.event.wait(1) # 模拟产生数据速度
                self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait() # 阻塞等通知
                logging.info("received {}".format(self.data))
            self.event.wait(0.5) # 模拟消费的速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name=‘producer‘)
# 增加消费者

for i in range(5):
    c = Thread(target=d.consume, name=‘consumer-{}‘.format(i))
    c.start()
p.start()

Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),构建barrier对象,timeout未指定的默认值;
n_waiting ,当前barrier等待的线程数。;
parties ,需要等待
wait(timeout=None),wait方法设置超时并超时发送,barrie处于broken状态。
而broken的状态方法:
broken,打开状态,返回true;
abort(),barrier在broken状态中,wait等待的线程会抛出BrokenBarrierError异常,直到reset恢复barrier;
reset(),恢复barrier,重新开始拦截。
barrier不做演示:

还有semaphore信号量,每次acquire时,都会减一,到0时的线程再到release后,大于0,恢复阻塞的线程。
方法:
Semaphore(value=1) 构造方法,alue小于0,抛ValueError异常;
acquire(blocking=True, timeout=None) 获取信号量,计数器减1,获取成功返回True;
release() 释放信号量,计数器加1。
使用信号量处理时,需要注意release超界问题,边界问题,其实,在使用中,python有GIL的存在,有的多线程就变成线程安全的,注意一点,但实际上它们并不是线程安全类型。因此我们在使用中要具体场景具体分析具体使用。

原文地址:http://blog.51cto.com/13890970/2327927

时间: 2024-10-31 07:09:42

python解决线程同步方案有哪些的相关文章

python多线程--线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步. 使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间.如下: 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在数据不同步的问题. 考虑这样一种情况:一个列表里所有元素都是0,线程"se

线程同步的情景之二

情景一中,我主要介绍了用于解决资源争用时各种方式的区别,本篇文章我们将进一步介绍线程同步的第二种场景. 情景二:数量有限,先到先得   情景简介:与情景一类似,但是这次茅坑的数量不只一个.如果有需求的人数少于茅坑数量,那一切都很和谐.但是人数超过茅坑数量的时候该怎么办?多个人占用一个坑? 解决办法:当所有茅坑都客满的时候,其他人必须乖乖等在外面,只有当有人从里面出来的时候,下一个人才能进去. 问题抽象:当某一资源同一时刻允许一定数量的线程使用的时候,需要有个机制来阻塞多余的线程,直到资源再次变得

线程同步的情景之一

从本篇文章开始,我将陆续介绍多线程中会遇到的三种情况. 情景一:此茅坑有主了 大锤:“我擦,居然一个茅坑有两个人在用.” 大锤:“啊,忍不住了,一起挤挤吧~~~” 叫兽:“舒坦了,先走了.” 叫兽按下了冲水开关.... "哗啦啦....." 大锤:“你妹啊,冲什么水啊,冲得我一身 shit ” 解决方案:为了解决这种混乱的情况,管理员给茅坑加了道门,一次只允许一个人使用,其他人只能在外面等待.而且只要有人占着,就算不拉屎,其他人也只能乖乖排队. 问题抽象:当某一资源可能同时被多个线程读

从生产者消费者窥探线程同步(下)

欢迎转载,转载请注明出处.尊重他人的一丢丢努力,谢谢啦! 阅读本篇之前,如果你还没有看过从生产者消费者窥探线程同步(上) ,那不妨先戳一下,两篇一起嚼才更好呢. 上一篇分析了使用BlockQueue和synchronized来实现生产者消费者模式.这一篇来看一下其他的实现,闲言少叙. (3)Lock实现 核心:Lock的用法中规中矩,有点类似于非静态同步方法,只是前者是对lock对象显式加锁,而后者是对当前对象隐式加锁. 我相信大多数人在第一次接触Lock锁的时候,内心都会有这样的疑惑:明明提供

经典线程同步 信号量Semaphore

阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇一个经典的多线程同步问题> <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event> <秒杀多线程第七篇经典线程同步互斥量Mutex> 前面介绍了关键段CS.事件Event.互斥量Mutex在经典线程同步问题中的使用.本篇介绍用信号量Semaphore来解决这个问题. 首先也来看看如何使用信号量,信号量Semaphore常用有三个函数,使用很方便.下面是这几个函数的原型和使

秒杀多线程第八篇 经典线程同步 信号量Semaphore

版权声明:本文为博主原创文章,未经博主允许不得转载. 阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇一个经典的多线程同步问题> <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event> <秒杀多线程第七篇经典线程同步互斥量Mutex> 前面介绍了关键段CS.事件Event.互斥量Mutex在经典线程同步问题中的使用.本篇介绍用信号量Semaphore来解决这个问题. 首先也来看看如何使用信号量,信号量Semaphore

Delphi 线程同步技术(转)

上次跟大家分享了线程的标准代码,其实在线程的使用中最重要的是线程的同步问题,如果你在使用线程后,发现你的界面经常被卡死,或者无法显示出来,显示混乱,你的使用的变量值老是不按预想的变化,结果往往出乎意料,那么你很有可能是忽略了线程同步的问题. 当有多个线程的时候,经常需要去同步这些线程以访问同一个数据或资源.例如,假设有一个程序,其中一个线程用于把文件读到内存,而另一个线程用于统计文件中的字符数.当然,在把整个文件调入内存之前,统计它的计数是没有意义的.但是,由于每个操作都有自己的 线程,操作系统

线程同步(一)

解决方案:一个资源,一次只允许一个线程使用,其他线程只能等待.直到资源被释放. 问题抽象:当某一资源可能同时被多个线程读取和修改时,资源的状态将变得难以预料. 线程同步方案:volatile.lock.Interlocked.Moniter.SpinLock.ReadWriteLockSlim.Mutex 方案特性:除所有者外,其他人无条件等待:先到先得 各方案间的区别:这些方案从它们各自的实现方式可分为三种:用户模式构造.内核模式构造 和 混合模式构造. 应该尽量使用用户模式构造,它们的速度要

delphi线程同步

本文完全摘自网络,仅供自己查询 上次跟大家分享了线程的标准代码,其实在线程的使用中最重要的是线程的同步问题,如果你在使用线程后,发现你的界面经常被卡死,或者无法显示出来,显示混乱,你的使用的变量值老是不按预想的变化,结果往往出乎意料,那么你很有可能是忽略了线程同步的问题. 当有多个线程的时候,经常需要去同步这些线程以访问同一个数据或资源.例如,假设有一个程序,其中一个线程用于把文件读到内存,而另一个线程用于统计文件中的字符数.当然,在把整个文件调入内存之前,统计它的计数是没有意义的.但是,由于每