Python:线程、进程与协程(5)——multiprocessing模块(2)

上篇博文介绍了Python的multiprocessing模块创建进程Process 类,进程间通信,进程间的同步三个部分,下面接着介绍学习进程共享。

(1)内存共享

在多进程情况下,由于每个进程有自己独立的内存空间,怎样能实现内存共享呢?multiprocessing模块提供了Value, Array,这两个是函数,详细定义在sharedctypes.py里,有兴趣的可以去看看(等了解了ctypes模块后回头再分享下我的理解,今天就先放放)

  Value

Value的初始化非常简单,直接类似Value(‘d‘, 0.0)即可,具体构造方法如下:

  multiprocessing.Value(typecode_or_type, *args[,lock])。

  返回从共享内存中分配的一个ctypes 对象,其中typecode_or_type定义了返回的类型。它要么是一个ctypes类型,要么是一个代表ctypes类型的code。

  ctypes是Python的一个外部函数库,它提供了和C语言兼任的数据类型,可以调用DLLs或者共享库的函数,能被用作在python中包裹这些库。

  *args是传递给ctypes的构造参数

对于共享整数或者单个字符,初始化比较简单,参照下图映射关系:

Type Code C Type Python Type
‘c‘ char character
‘b‘ signed char int
‘B‘ unsigned char int
‘u‘ Py_UNICODE unicode character
‘h‘ signed short int
‘H‘ unsigned short int
‘i‘ signed int int
‘I‘ unsigned int int
‘l‘ signed long int
‘L‘ unsigned long int
‘f‘ float float
‘d‘ double float

比如整数1,可用Value(‘h‘,1)

如果共享的是字符串,则在上表是找不到映射关系的,就是没有对应的Type code可用。所以我们需要使用原始的ctype类型,对应关系如下:

ctypes type C type Python type

c_bool

_Bool bool (1)
char  char 1-character string
c_wchar wchar_t 1-character unicode string
c_byte char int/long
c_ubyte unsigned char int/long
c_short short int/long
c_ushort unsigned short int/long
c_int int int/long
c_uint unsigned in int/long
c_long long int/long
c_ulong unsigned long int/long
c_longlong __int64 or long long int/long
c_ulonglong unsigned __int64 or unsigned long long int/long
c_float float float
c_double double float
c_longdouble long double float
c_char_p char * (NUL terminated) string or None

c_wchar_p

wchar_t * (NUL terminated) unicode or None
c_void_p void * int/long or None

比如上面的Value(‘h‘,1)也可以用Value(c_short,1),字符串的话,可以用Value(c_char_p,"hello"),很好理解的。

它返回的是个对象,所以,它也有一些属性和方法,而返回的对象是基于SynchronizedBase类,该类的定义如下:

class SynchronizedBase(object):

    def __init__(self, obj, lock=None):
        self._obj = obj
        self._lock = lock or RLock()
        self.acquire = self._lock.acquire
        self.release = self._lock.release

    def __reduce__(self):
        assert_spawning(self)
        return synchronized, (self._obj, self._lock)

    def get_obj(self):
        return self._obj

    def get_lock(self):
        return self._lock

    def __repr__(self):
        return ‘<%s wrapper for %s>‘ % (type(self).__name__, self._obj)

所以它的属性和方法有:

value:获取值

get_lock():获取锁对象

acquire/release:参考RLock对象的acquire方法,release方法,是一样的,一个是获取锁,一个是释放锁。很好理解的。

下面举个例子来体会一下这些方法

#coding=utf-8
import time
from multiprocessing import Value,Process
def fun(val):
    for i in range(10):
        time.sleep(0.5)
        val.value += 1

v = Value(‘i‘,0)
p_list = [Process(target=fun,args=(v,)) for i in range(10)]
for p in p_list:
    p.start()
for p in p_list:
    p.join()
print v.value

上述代码是多个进程修改v值,我们期待它输出的是100,但是实际上并输出的并不是100,Value的构造函数默认的lock是True,它会创建一个锁对象用于同步访问控制,这就容易造成一个错误的意识,认为Value在多进程中是安全的,但实际上并不是,要想真正的控制同步访问,需要实现获取这个锁。所以需要修改fun()函数。如下:

def fun(val):
    for i in range(10):
        time.sleep(0.5)
        with v.get_lock():
            val.value += 1

或者如下:

def fun(val):
    for i in range(10):
        time.sleep(0.5)
        if v.acquire():
            val.value += 1
        v.release()

Array

有了上面的基础,这个就比较好理解了,它返回从共享内存分配的ctypes数组,原型如下:

multiprocessing.Array(typecode_or_type, size_or_initializer, *,lock=True)

ypecode_or_type确定返回数组的元素的类型:它是一个ctypes类型或一个字符类型代码类型的数组模块使用的类型。

size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。

如果关键字参数中有lock的话,lock为True,则会创建一个新的锁对象,以同步对该值的访问。如果lock是Lock或RLock对象,那么它将用于同步对该值的访问。如果lock是False,那么对返回的对象的访问不会被锁自动保护,因此它不一定是“进程安全的”。

它返回值的属性和方法同Value差不多,有兴趣的可以自己写代码试试,在此不举例子。

(2)服务器进程

通过Manager()返回的一个manager对象控制一个服务器进程,它保持住Python对象并允许其它进程使用代理操作它们。同时它用起来很方便,而且支持本地和远程内存共享。

Manager()返回的manager支持的类型有list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和Array。

该部分的实现在managers.py文件里,

Manager()的定义很简单,如下:

def Manager():
    ‘‘‘
    Returns a manager associated with a running server process

    The managers methods such as `Lock()`, `Condition()` and `Queue()`
    can be used to create shared objects.
    ‘‘‘
    from multiprocessing.managers import SyncManager
    m = SyncManager()
    m.start()
    return m

它返回一个已经启动的SyncManager对象,管理器进程将在垃圾收集或其父进程退出时立即关闭。SyncManager继承自BaseManager。BaseManager的定义也在managers.py文件里,有兴趣的可以看看,初始化如下:BaseManager([address[, authkey]])

address:是管理器进程侦听新连接的地址。 如果地址是无,则选择任意一个。

authkey:是将用于检查到服务器进程的传入连接的有效性的认证密钥。 如果authkey是None,那么使用当前进程current_process()的authkey; 否则使用的authkey,它必须是字符串。

一旦创建BaseManager对象,应调用start()或get_server()。serve_forever()以确保管理器对象引用已启动的管理器进程。

BaseManager对象的方法和属性有:

start([initializer [,initargs]])

启动子过程以启动管理器。 如果初始化程序不是None,那么子程序在启动时会调用initializer(*initargs)

get_server():

返回一个Server对象,它表示在Manager控制下的实际服务器。 Server对象支持serve_forever()方法,Server对象也定义在managers.py文件里,该类的作用用因为解释就是“Server class which runs in a process controlled by a manager object”,有兴趣的可以去看看,了解下。

connect():将本地管理器对象连接到远程管理器进程

shutdown():停止管理器在使用的进程。这仅在用start()已启动服务器进程时可用,可以被多次调用。

register(typeid [,callable [,proxytype [,exposed [,method_to_typeid [,create_method]]]]]):

可以用于向管理器类注册类型或可调用的类方法。

typeid是用于标识特定类型的共享对象的“类型标识符”。这必须是字符串。

callable是用于为该类型标识符创建可调用的对象。如果将使用from_address()类方法创建管理器实例,或者如果create_method参数为False,那么这可以保留为None。

proxytype是BaseProxy的子类,BaseProxy使用typeid来创建共享对象的代理。如果为None,那么会自动创建一个代理类。

exposed用于指定一个序列的方法名称,该名称可以允许使用typeid的代理对象BaseProxy的_callmethod()方法来访问,(如果exposed为None,则使用proxytype._exposed_,如果存在)。在没有指定公开列表的情况下,将可以访问共享对象的所有“公共方法”。(这里的“公共方法”是指具有__call __()方法并且名称不以“_”开头的任何属性。)

method_to_typeid是一个映射,用于指定返回代理的那些公开方法的返回类型。它将方法名映射到typeid字符串。 (如果method_to_typeid为None,则使用proxytype._method_to_typeid_,如果存在)。如果方法的名称不是此映射的键,或者映射为None,则方法返回的对象将按值复制。

create_method确定是否应该使用名称typeid创建一个方法,该方法可以用于告诉服务器进程创建一个新的共享对象并为其返回一个代理。默认情况下为True。

address:管理器使用的地址

join(timeout=None):阻塞

现在可以来看看,SyncManager类的定义了,其实很简单。

class SyncManager(BaseManager):
    ‘‘‘
    Subclass of `BaseManager` which supports a number of shared object types.

    The types registered are those intended for the synchronization
    of threads, plus `dict`, `list` and `Namespace`.

    The `multiprocessing.Manager()` function creates started instances of
    this class.
    ‘‘‘

SyncManager.register(‘Queue‘, Queue.Queue)
SyncManager.register(‘JoinableQueue‘, Queue.Queue)
SyncManager.register(‘Event‘, threading.Event, EventProxy)
SyncManager.register(‘Lock‘, threading.Lock, AcquirerProxy)
SyncManager.register(‘RLock‘, threading.RLock, AcquirerProxy)
SyncManager.register(‘Semaphore‘, threading.Semaphore, AcquirerProxy)
SyncManager.register(‘BoundedSemaphore‘, threading.BoundedSemaphore,
                     AcquirerProxy)
SyncManager.register(‘Condition‘, threading.Condition, ConditionProxy)
SyncManager.register(‘Pool‘, Pool, PoolProxy)
SyncManager.register(‘list‘, list, ListProxy)
SyncManager.register(‘dict‘, dict, DictProxy)
SyncManager.register(‘Value‘, Value, ValueProxy)
SyncManager.register(‘Array‘, Array, ArrayProxy)
SyncManager.register(‘Namespace‘, Namespace, NamespaceProxy)

# types returned by methods of PoolProxy
SyncManager.register(‘Iterator‘, proxytype=IteratorProxy, create_method=False)
SyncManager.register(‘AsyncResult‘, create_method=False)

上面的Queue()、Event()等等都是该类的方法,比如Event(),它是创建一个共享的threading.Event对象并返回一个代理。当然除了上面这些外,其实我们也可以用register()向管理器注册新的类型,如下:

#coding=utf-8
from multiprocessing.managers import BaseManager

class MathsClass(object):
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register(‘Maths‘, MathsClass)

if __name__ == ‘__main__‘:
    manager = MyManager()
    manager.start()
    maths = manager.Maths()
    print maths.add(4, 3)         # prints 7
    print maths.mul(7, 8)         # prints 56

下面看个简单的例子

#coding=utf-8
import multiprocessing

def fun(ns):
    ns.x.append(1)
    ns.y.append(‘x‘)
   

if __name__ == ‘__main__‘:
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = []
    ns.y = []
    print "before",ns
    p = multiprocessing.Process(target=fun,args=(ns))
    p.start()
    p.join()
    print "after",ns

本程序的目的是想得到x=[1],y=[‘x‘],但是没有得到,这是为什么呢?这是因为manager对象仅能传播一个可变对象本身所做的修改,如果一个manager.list()对象,管理列表本身的任何更改会传播到所有其他进程,但是如果容器对象内部还包括可修改对象,则内部可修改对象的任何更改都不会传播到其他进程。上面例子中,ns是一个容器,它本身的改变会传播到所有进程,但是它的内部对象x,y是可变对象,它们的改变不会传播到其他进程,所有没有得到我们所要的结果。可以作如下修改:

#coding=utf-8
import multiprocessing

def fun(ns,x,y):
    x.append(1)
    y.append(‘x‘)
    ns.x = x
    ns.y = y

if __name__ == ‘__main__‘:
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = []
    ns.y = []
    print "before",ns
    p = multiprocessing.Process(target=fun,args=(ns,ns.x,ns.y,))
    p.start()
    p.join()
    print "after",ns

这个例子比较简单,以后碰到好的例子,再跟大家分享。另外Python官方手册上有很多帮助大家理解这些概念的例子,有兴趣的可以去看看,今天就写到这儿了,不正之处欢迎批评指正!

时间: 2024-10-18 07:55:44

Python:线程、进程与协程(5)——multiprocessing模块(2)的相关文章

Python:线程、进程与协程(4)——multiprocessing模块(1)

multiprocessing模块是Python提供的用于多进程开发的包,multiprocessing包提供本地和远程两种并发,通过使用子进程而非线程有效地回避了全局解释器锁. (一)创建进程Process 类 创建进程的类,其源码在multiprocessing包的process.py里,有兴趣的可以对照着源码边理解边学习.它的用法同threading.Thread差不多,从它的类定义上就可以看的出来,如下: class Process(object):     '''     Proces

Python:线程、进程与协程(3)——Queue模块及源码分析

Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式.该模块提供了三种队列: Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列 Queue.LifoQueue(maxsize):后进先出,相当于栈 Queue.PriorityQueue(maxsize):优先级队列. 其中LifoQueue,PriorityQueue是Queue的子类.三者拥有以下共同的方法: qsize():返回近似的队列大小.为什么要加"近似&q

Python:线程、进程与协程(2)——threading模块(1)

上一篇博文介绍了Python中线程.进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块.首先来看看threading模块有哪些方法和类吧. 主要有: Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能. Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类. Lock :原锁,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用. RLock :可重入锁

python全栈脱产第37天------进程池与线程池、协程、gevent模块、单线程下实现并发的套接字通信

一.进程池与线程池 调用concurrent.futures下的ThreadPoolExecutor,ProcessPoolExecutor来实现 提交任务有两种方式:同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,在执行下一段代码,是串行的 异步调用:提交完一个任务之后,不在原地等待,直接运行下一段代码,任务是并发的 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimp

多任务-python实现-进程,协程,线程总结(2.1.16)

目录 1.类比 2.总结 关于作者 @ 1.类比 一个生产玩具的工厂: 一个生产线成为一个进程,一个生产线有多个工人,所以工人为线程 单进程-多线程:一条生产线,多个工人 多进程-多线程:多条生产线,多个工人 协程:工人空闲的时候安排做其他事 2.总结 1.进程是资源分配的单位 2.线程为操作系统调度的单位 3.进程切换需要的资源很大,效率很低 4.线程需要的资源一般,效率一般(不考虑GIL) 5.协程切换的任务资源很小,效率高 6.多进程,多线程根据cpu核数不同可能是并行的,但协程是在一个线

Python入门学习-DAY37-进程池与线程池、协程、gevent模块

一.进程池与线程池 基本使用: 进程池和线程池操作一样 提交任务的两种方式: 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的 同步调用 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os

Python之路【第七篇】:线程、进程和协程

Python之路[第七篇]:线程.进程和协程 Python线程 Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 #!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time   def show(arg):     time.sleep(1)     print 'thread'+str(arg)   for i in

Python:线程、进程与协程(1)——概念

最近的业余时间主要放在了学习Python线程.进程和协程里,第一次用python的多线程和多进程是在两个月前,当时只是简单的看了几篇博文然后就跟着用,没有仔细去研究,第一次用的感觉它们其实挺简单的,最近这段时间通过看书, 看Python 中文官方文档等等相关资料,发现并没有想想中的那么简单,很多知识点需要仔细去理解,Python线程.进程和协程应该是Python的高级用法.Python的高级用法有很多,看看Python 中文官方文档就知道了,当然有时间看看这些模块是怎么实现的对自己的提高是很有帮

Python菜鸟之路:Python基础-线程、进程、协程

上节内容,简单的介绍了线程和进程,并且介绍了Python中的GIL机制.本节详细介绍线程.进程以及协程的概念及实现. 线程 基本使用 方法1: 创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入 import threading import time def worker(): time.sleep(2) print("test") for i in range(5): t = threading.Thread(target=