Python:线程、进程与协程(4)——multiprocessing模块(1)

multiprocessing模块是Python提供的用于多进程开发的包,multiprocessing包提供本地和远程两种并发,通过使用子进程而非线程有效地回避了全局解释器锁。

(一)创建进程Process 类

创建进程的类,其源码在multiprocessing包的process.py里,有兴趣的可以对照着源码边理解边学习。它的用法同threading.Thread差不多,从它的类定义上就可以看的出来,如下:

class Process(object):
    ‘‘‘
    Process objects represent activity that is run in a separate process

    The class is analagous to `threading.Thread`
    ‘‘‘
    _Popen = None

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        assert group is None, ‘group argument must be None for now‘
        count = _current_process._counter.next()
        self._identity = _current_process._identity + (count,)
        self._authkey = _current_process._authkey
        self._daemonic = _current_process._daemonic
        self._tempdir = _current_process._tempdir
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + ‘-‘ +                      ‘:‘.join(str(i) for i in self._identity)

Process([group [, target [, name [, args [, kwargs]]]]])

group实质上不使用,是保留项,便于以后扩展。

target表示调用对象,

args表示调用对象的位置参数元组

kwargs表示调用对象的字典

name为别名,即进程的名字

它的方法/属性跟threading.Thread也有很多类似的地方,主要有:

start():开始进程活动。

run():表示进程的活动方法,可以在子类中覆盖它。

join([timeout]):是用来阻塞当前上下文,直至该进程运行结束,一个进程可以被join()多次,timeout单位是秒。

terminate():结束进程。在Unix上使用的是SIGTERM,在Windows平台上使用TerminateProcess

is_alive():判断进程是否还活着。

name:一个字符串,表示进程的名字,也可以通过赋值语句利用它来修改进程的名字

ident:进程的ID,如果进程没开始,结果是None

pid:同ident,大家可以看看ident和pid的实现,是利用了os模块的getpid()方法。

authkey:设置/获取进程的授权密码。当初始化多进程时,使用os.urandom()为主进程分配一个随机字符串。当创建一个Process对象时,它将继承其父进程的认证密钥, 但是可以通过将authkey设置为另一个字节字符串来改变。这里authkey为什么既可以设置授权密码又可以获取呢?那是因为它的定义使用了property装饰器,源码如下:

@property
def authkey(self):
   return self._authkey

@authkey.setter
def authkey(self, authkey):
   ‘‘‘
    Set authorization key of process
   ‘‘‘
   self._authkey = AuthenticationString(authkey)

这是property的一个高级用法,如果理解了其实也很简单,有兴趣的去查看其它资料。

daemon:一个布尔值,指示进程是(True)否(False)是一个守护进程。它必须在调用start()之前设置,否则会引发RuntimeError。它的初始值继承自创建它的进程;进程不是一个守护进程,所以在进程中创建的所有进程默认daemon = False。

exitcode:返回进程退出时的代码。进程运行时其值为None,如果为–N,表示被信号N结束。

(1)一个简单的单进程例子

#coding=utf-8
import multiprocessing
import datetime
import time

def worker(interval):
    n = 5
    while n > 0:
        print "The now is %s"% datetime.datetime.now()
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":

    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()#开始进程
    #p.terminate()#结束进程
    #p.join(9)#阻塞当前上下文
    print "p.authkey",p.authkey#获取进程的授权密码
    p.authkey = u"123"#设置进程的授权密码
    print "p.authkey", p.authkey#获取进程的授权密码
    print "p.pid:", p.pid,p.ident#进程ID
    p.name = ‘helloworld‘#修改进程名字
    print "p.name:", p.name#进程名字
    print "p.is_alive:", p.is_alive()#是否是活的

运行结果如下图:

上面的代码有两行注释掉的,大家可以把注释去掉,体会、理解这两个方法的用处,在此不贴我的运行结果了。

(2)自定义进程类,并开启多个进程

import multiprocessing
import datetime
import time

class MyProcess(multiprocessing.Process):
    """
    自定义进程类
    """
    def __init__(self,interval,group=None,target=None,name=None,args=(),kwargs={}):
        multiprocessing.Process.__init__(self,group,target,name,args,kwargs=kwargs)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is %s"%datetime.datetime.now())
            time.sleep(self.interval)
            n -= 1

def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"

if __name__ == "__main__":
    p1 = MyProcess(interval=2,target = worker_1, args = (2,))
    p2 = MyProcess(interval=2,target = worker_2, args = (3,))
    p3 = MyProcess(interval=2,target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()
    print "current process",multiprocessing.current_process(),multiprocessing.active_children()
    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"

运行结果如下:

看看打印出来的时间,三个进程应该是并行执行的。

(二)进程间通信

multiprocessing模块支持两种进程间的通信方式:Queue(队列)和Pipe(管道)。

(1)Queue

multiprocessing中的Queue类的定义在queues.py文件里。和Queue.Queue差不多,multiprocessing中的Queue类实现了Queue.Queue的大部分方法(可以参考上篇博文Python:线程、进程与协程(3)——Queue模块及源码分析),但task_done()和join()没有实现,主要方法和属性有

qsize():返回Queue的大小

empty():返回一个布尔值,表示Queue是否为空

full():返回一个布尔值,表示Queue是否满

put(item[, block[, timeout]]):向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。

put_nowait(item):等价与put(item,False)。

get([block[, timeout]]):从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。

get_nowait():等价于get(False)

close():表示该Queue不在加入新的元素

join_thread():加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。默认情况下,如果进程不是队列的创建者,则退出, 它将尝试加入队列的后台线程。 该进程可以调用cancel_join_thread()来做

cancel_join_thread():在阻塞中阻止join_thread(),防止后台线程在进程退出时被自动连接 ,肯能会导致数据丢失。

(2)Pipe

Pipe不是类,是函数,该函数定义在 multiprocessing中的connection.py里,函数原型Pipe(duplex=True),

返回一对通过管道连接的连接对象conn1和conn2。

如果duplex是True(默认值),则管道是双向的。

如果duplex是False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息。

Pipe()返回的两个连接对象表示管道的两端,每个连接对象都有send()和recv()方法(还有其它方法),分别是发送和接受消息。下面举个简单的例子,一个发送数据,一个接受数据

#coding=utf-8
import multiprocessing
import time

def proc1(pipe):
    """
    发送数据
    """
    while True:
        for i in xrange(100):
            print "send: %s" %(i)
            pipe.send(i)#发送数据
            time.sleep(1)

def proc2(pipe):
    """
    接收数据
    """
    while True:
        print "proc2 rev:", pipe.recv()#接受数据
        time.sleep(1)
if __name__ == "__main__":
    pipe1,pipe2 = multiprocessing.Pipe()#返回两个连接对象
    p1 = multiprocessing.Process(target=proc1, args=(pipe1,))
    p2 = multiprocessing.Process(target=proc2, args=(pipe2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

运行结果如下:

(三)进程间的同步

multiprocessing包含与threading中所有同步原语等同的原语,它也有Lock,RLock,Even,Condition,Semaphore,BoundedSemaphore。用法都差不多,它们的定义在 multiprocessing包的synchronize.py文件里,在此不过多介绍,有兴趣的可以参考Python:线程、进程与协程(2)——threading模块里相关的概念理解。如果理解了相关概念,在 multiprocessing模块中使用是一样的,看下面这个简单的例子吧,有两个进程要向某个文件中写入内容,为了避免访问冲突,可以使用锁。

#coding=utf-8
import multiprocessing
def worker_with(lock, f):
    with lock:#Lock等对象也是支持上下文管理器协议的。
        fs = open(f, ‘a+‘)
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, ‘a+‘)
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()#定义锁
    f = "/home/liulonghua/files.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。

多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。下篇博文再接着讲进程共享和进程池等。

时间: 2024-08-02 02:48:02

Python:线程、进程与协程(4)——multiprocessing模块(1)的相关文章

Python:线程、进程与协程(5)——multiprocessing模块(2)

上篇博文介绍了Python的multiprocessing模块创建进程Process 类,进程间通信,进程间的同步三个部分,下面接着介绍学习进程共享. (1)内存共享 在多进程情况下,由于每个进程有自己独立的内存空间,怎样能实现内存共享呢?multiprocessing模块提供了Value, Array,这两个是函数,详细定义在sharedctypes.py里,有兴趣的可以去看看(等了解了ctypes模块后回头再分享下我的理解,今天就先放放)   Value Value的初始化非常简单,直接类似

Python:线程、进程与协程(3)——Queue模块及源码分析

Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式.该模块提供了三种队列: Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列 Queue.LifoQueue(maxsize):后进先出,相当于栈 Queue.PriorityQueue(maxsize):优先级队列. 其中LifoQueue,PriorityQueue是Queue的子类.三者拥有以下共同的方法: qsize():返回近似的队列大小.为什么要加"近似&q

Python:线程、进程与协程(2)——threading模块(1)

上一篇博文介绍了Python中线程.进程与协程的基本概念,通过这几天的学习总结,下面来讲讲Python的threading模块.首先来看看threading模块有哪些方法和类吧. 主要有: Thread :线程类,这是用的最多的一个类,可以指定线程函数执行或者继承自它都可以实现子线程功能. Timer:与Thread类似,但要等待一段时间后才开始运行,是Thread的子类. Lock :原锁,是一个同步原语,当它锁住时不归某个特定的线程所有,这个可以对全局变量互斥时使用. RLock :可重入锁

python全栈脱产第37天------进程池与线程池、协程、gevent模块、单线程下实现并发的套接字通信

一.进程池与线程池 调用concurrent.futures下的ThreadPoolExecutor,ProcessPoolExecutor来实现 提交任务有两种方式:同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,在执行下一段代码,是串行的 异步调用:提交完一个任务之后,不在原地等待,直接运行下一段代码,任务是并发的 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimp

多任务-python实现-进程,协程,线程总结(2.1.16)

目录 1.类比 2.总结 关于作者 @ 1.类比 一个生产玩具的工厂: 一个生产线成为一个进程,一个生产线有多个工人,所以工人为线程 单进程-多线程:一条生产线,多个工人 多进程-多线程:多条生产线,多个工人 协程:工人空闲的时候安排做其他事 2.总结 1.进程是资源分配的单位 2.线程为操作系统调度的单位 3.进程切换需要的资源很大,效率很低 4.线程需要的资源一般,效率一般(不考虑GIL) 5.协程切换的任务资源很小,效率高 6.多进程,多线程根据cpu核数不同可能是并行的,但协程是在一个线

Python入门学习-DAY37-进程池与线程池、协程、gevent模块

一.进程池与线程池 基本使用: 进程池和线程池操作一样 提交任务的两种方式: 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的 同步调用 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os

Python之路【第七篇】:线程、进程和协程

Python之路[第七篇]:线程.进程和协程 Python线程 Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 #!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time   def show(arg):     time.sleep(1)     print 'thread'+str(arg)   for i in

Python:线程、进程与协程(1)——概念

最近的业余时间主要放在了学习Python线程.进程和协程里,第一次用python的多线程和多进程是在两个月前,当时只是简单的看了几篇博文然后就跟着用,没有仔细去研究,第一次用的感觉它们其实挺简单的,最近这段时间通过看书, 看Python 中文官方文档等等相关资料,发现并没有想想中的那么简单,很多知识点需要仔细去理解,Python线程.进程和协程应该是Python的高级用法.Python的高级用法有很多,看看Python 中文官方文档就知道了,当然有时间看看这些模块是怎么实现的对自己的提高是很有帮

Python菜鸟之路:Python基础-线程、进程、协程

上节内容,简单的介绍了线程和进程,并且介绍了Python中的GIL机制.本节详细介绍线程.进程以及协程的概念及实现. 线程 基本使用 方法1: 创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入 import threading import time def worker(): time.sleep(2) print("test") for i in range(5): t = threading.Thread(target=