threading.local()源码分析

前段时间写了个多线程的程序,了解到Python中有个与众不同的thread.local()方法,可以创建一个全局对象,各个线程可以用这个全局对象保存各自的局部变量,而在使用时不受其他线程的影响。于是抽时间分析了一下thread.local()方法的源码。

相关知识储备:

__slots__变量:__slots__变量一个元组,可以限制该类可使用的成员变量,不在__slots__变量中的成员变量名不能被动态添加到该类中。

参考:https://www.cnblogs.com/zhaoshizi/p/9384430.html

Python上下文管理器contextmanager:引入contextlib模块中的contextmanager,使用contextmanager来注解一个函数,则可以用with语句来调用该函数,在函数中遇到yield时,在yield处停止,并把yield后的值返回。当执行完with的所有的语句后,再执行yield后面的语句。

参考:https://www.jb51.net/article/92387.htm

Python弱引用weakref:你可以通过调用weakref模块的ref(obj[,callback])来创建一个弱引用,obj是你想弱引用的对象,callback是一个可选的函数,当因没有引用导致Python要销毁这个对象时调用

参考:https://segmentfault.com/a/1190000005729873

下面进入正题,thread.local类原理分析(不同版本的Python该类的实现方式可以不同,本文以Python3.6为例):

thread.local类使用了一个_localimpl类,来作为各线程局部变量的管理类,该类的作用是保存各个线程的局部变量。那如何让各个线程的局部变量相互独立且不相互影响?没错,答案就是用字典类型变量。每个线程分配一个字典项,字典项的key为线程的id,value值保存的是该线程局部变量组成的另一个字典。_localimpl类中通过__slots__变量限制定义了几个成员变量,包括locallock(锁)和dicts(字典),dicts变量中就如上所述保存了各线程的局部变量构成的字典。所以dicts是字典的字典。

为了保证线程被删除时,对应的局部变量也会被删除,_localimpl类中还定义了一个该线程的弱引用wrthread,当该线路引用数为0时,调用回调函数thread_deleted删除dicts字典中该线程的记录项。

 1 class _localimpl:
 2     """A class managing thread-local dicts"""
 3     """管理线程局部字典的类"""
 4     __slots__ = ‘key‘, ‘dicts‘, ‘localargs‘, ‘locallock‘, ‘__weakref__‘
 5
 6     def __init__(self):
 7         # The key used in the Thread objects‘ attribute dicts.
 8         # We keep it a string for speed but make it unlikely to clash with
 9         # a "real" attribute.
10         self.key = ‘_threading_local._localimpl.‘ + str(id(self))
11         # { id(Thread) -> (ref(Thread), thread-local dict) }
12         #字典变量中的值中以id(Thread)为key,当前线程的弱引用ref(Thread)和
13         #当前线程的局部变量字典(thread-local dict)组成的元组
14         self.dicts = {}
15
16     def get_dict(self):
17         """Return the dict for the current thread. Raises KeyError if none
18         defined."""
19         """返回当前线程的局部变量字典,其是元组中的第二个元素"""
20         thread = current_thread()
21         return self.dicts[id(thread)][1]
22
23     def create_dict(self):
24         """Create a new dict for the current thread, and return it."""
25         """为当前线程造建一个局部变量字典,用来保存局部变量"""
26         localdict = {}
27         key = self.key
28         thread = current_thread()
29         idt = id(thread)
30         def local_deleted(_, key=key):
31             # When the localimpl is deleted, remove the thread attribute.
32             # 定义一个弱引用的回调函数,当线程局部变量的管理类localimpl被删除时,从线程中删除对应的变量
33             thread = wrthread()
34             if thread is not None:
35                 del thread.__dict__[key]
36         def thread_deleted(_, idt=idt):
37             # When the thread is deleted, remove the local dict.
38             # Note that this is suboptimal if the thread object gets
39             # caught in a reference loop. We would like to be called
40             # as soon as the OS-level thread ends instead.
41             # 定义一个弱引用的回调函数,当线程被删除时,在管理类localimpl对象的字典中删除该线程的字典项
42             local = wrlocal()
43             if local is not None:
44                 dct = local.dicts.pop(idt)
45         #定义两个弱引用
46         wrlocal = ref(self, local_deleted)
47         wrthread = ref(thread, thread_deleted)
48         #线程和局部变量管理类相互关联
49         thread.__dict__[key] = wrlocal
50         #在字典中以线程id为key,保存了弱引用和线程局部变量的字典
51         self.dicts[idt] = wrthread, localdict
52         return localdict            

local类中通过__slots__变量定义了两个变量_local__impl和__dict__,_local__impl用来保存上述局部变量管理类_localimpl,__dict__则保存local类自己的成员函数或变量,可以通过self.xxx来引用到。local类中重载了__getattribute__、__setattr__、__delattr__等函数,通过重载这些函数,将local类中的__dict__对象指向了_local__impl中保存的该线程的局部变量字典,对local类进行赋值和获取操作时,实际上是对_local__impl中保存的该线程的局部变量字典进行操作。

 1 #定义上下文管理器,用来做统一的预处理,把_local__impl类中的保存的该线程的字典值赋值为该线程的__dict__,local类就可以对局部变量进行操作了。
 2 @contextmanager
 3 def _patch(self):
 4     #获取_local__impl值
 5     impl = object.__getattribute__(self, ‘_local__impl‘)
 6     try:
 7         #获取该线程的局部变量字典
 8         dct = impl.get_dict()
 9     except KeyError:
10         dct = impl.create_dict()
11         args, kw = impl.localargs
12         self.__init__(*args, **kw)
13     #操作时加锁
14     with impl.locallock:
15         #将局部变量dict值赋值给__dict__
16         object.__setattr__(self, ‘__dict__‘, dct)
17         #跳过_patch继续执行with语句块,执行完成后再执行yield后面的语句(退出)
18         yield
19
20
21 class local:
22    #只定义了两个变量,局部变量存在__dict__指向的字典里
23     __slots__ = ‘_local__impl‘, ‘__dict__‘
24
25     def __new__(cls, *args, **kw):
26         if (args or kw) and (cls.__init__ is object.__init__):
27             raise TypeError("Initialization arguments are not supported")
28         self = object.__new__(cls)
29         impl = _localimpl()
30         impl.localargs = (args, kw)
31         impl.locallock = RLock()
32         object.__setattr__(self, ‘_local__impl‘, impl)
33         # We need to create the thread dict in anticipation of
34         # __init__ being called, to make sure we don‘t call it
35         # again ourselves.
36         impl.create_dict()
37         return self
38
39     #每个取值操作都用调用该函数
40     def __getattribute__(self, name):
41         #从局部变量管理类中获取该线程的局部变量字典,赋值给__dict__
42         with _patch(self):
43             #从__dict__中获取局部变量值
44             return object.__getattribute__(self, name)
45
46     def __setattr__(self, name, value):
47         if name == ‘__dict__‘:
48             raise AttributeError(
49                 "%r object attribute ‘__dict__‘ is read-only"
50                 % self.__class__.__name__)
51         with _patch(self):
52             return object.__setattr__(self, name, value)
53
54     def __delattr__(self, name):
55         if name == ‘__dict__‘:
56             raise AttributeError(
57                 "%r object attribute ‘__dict__‘ is read-only"
58                 % self.__class__.__name__)
59         with _patch(self):
60             return object.__delattr__(self, name)
61
62
63 from threading import current_thread, RLock  

原文地址:https://www.cnblogs.com/zhaoshizi/p/9380385.html

时间: 2024-07-31 10:25:32

threading.local()源码分析的相关文章

python语言线程标准库threading.local源码解读

本段源码可以学习的地方: 1. 考虑到效率问题,可以通过上下文的机制,在属性被访问的时候临时构建: 2. 可以重写一些魔术方法,比如 __new__ 方法,在调用 object.__new__(cls) 前后进行属性的一些小设置: 3. 在本库中使用的重写魔术方法,上下文这两种基础之上,我们可以想到函数装饰器,类装饰器,异常捕获,以及两种上下文的结构: 灵活运用这些手法,可以让我们在代码架构上更上一层,能够更加省时省力. 1 from weakref import ref # ref用在了构造大

python线程threading.Timer源码解读

threading.Timer的作用 官方给的定义是: """Call a function after a specified number of seconds: t = Timer(30.0, f, args=None, kwargs=None) t.start() t.cancel() # stop the timer's action if it's still waiting """ 意思是: 在一个特定的秒数之后调用一个函数,使用方

SDN实验---Ryu的源码分析

一:安装Pycharm https://www.cnblogs.com/huozf/p/9304396.html(有可取之处) https://www.jetbrains.com/idea/buy/#discounts?billing=yearly(学生注册,免费) 二:推文 https://www.cnblogs.com/ssyfj/p/11730362.html(含目录介绍) 三:源码分析流程 四:找入口函数main (一)我们编写的应用:全部继承于app_manager.RyuApp---

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

转:Mongodb源码分析之Replication模式

原文出处:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_rep mongodb中提供了复制(Replication)机制,通过该机制可以帮助我们很容易实现读写分离方案,并支持灾难恢复(服务器断电)等意外情况下的数据安全. 在老版本(1.6)中,Mongo提供了两种方式的复制:master-slave及replica pair模式(注:mongodb最新支持的replset复制集方式可看成是pair的升级版,

Solr4.8.0源码分析(4)之Eclipse Solr调试环境搭建

Solr4.8.0源码分析(4)之Eclipse Solr调试环境搭建 由于公司里的Solr调试都是用远程jpda进行的,但是家里只有一台电脑所以不能jpda进行调试,这是因为jpda的端口冲突.所以只能在Eclipse 搭建Solr的环境,折腾了一小时终于完成了. 1. JDPA远程调试 搭建换完成Solr环境后,对${TOMCAT_HOME}/bin/startup.sh 最后一行进行修改,如下所示: 1 set JPDA_ADDRESS=7070 2 exec "$PRGDIR"

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种