基础入门_Python-进线协程.分分钟玩转multiprocessing多进程编程?

简单介绍:

此模块主要为了解决PYTHON非真正多线程导致无法充分利用多核CPU资源问题,提供了Process,Lock,Semaphore,Event,Queue,Pipe,Pool等组件实现子进程,通信,共享数据,同步方式等

快速安装:

pip install multiprocessing

公共属性:

multiprocessing.current_process() -> Process

说明: 返回当前运行的子进程对象

multiprocessing.cpu_count() -> int

说明: 返回宿主机CPU核心数

multiprocessing.active_children() -> list

说明: 返回存活的子进程列表

多线程类:

1. Process类,主要用于创建管理子进程

p = multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}) -> Process

说明: 创建子进程对象,target表示调用对象,name表示子进程名称,args表示调用对象的位置参数元组,kwargs表示调用对象的参数字典

p.daemon -> boolean

说明: 设置或返回子进程是否随主进程结束,默认为false,主进程必须等待所有子进程结束后才结束,一旦设置为true,则一旦主进程执行完毕后,即使子进程还没执行完毕也强制结束,必须在start之前设置,可设置p.join来强制主进程等待子进程执行完毕

p.join(timeout=None)

说明: 等待此子进程返回后再执行其它子进程/主进程,timeout为等待时间

p.pid -> int/None

说明: 返回子进程pid

p.exitcode -> int/None

说明: 运行时为None,-N表示信号N结束

p.is_alive() -> boolean

说明: 返回进程是否存活

p.start() -> None

说明: 启动子进程,会自动调用子类中的run方法

p.terminate() -> None

说明: 终止子进程


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import multiprocessing
# 说明: 导入其它模块
# 方式一: 任务处理类
class TaskHandler(multiprocessing.Process):
    def __init__(self, interval, *args, **kwargs):
        super(TaskHandler, self).__init__(*args, **kwargs)
        self.interval = interval
    # 调用p.start()时自动调用子类run方法
    def run(self):
        for _ in xrange(10):
            time.sleep(self.interval)
# 方式二: 任务处理函数
def taskhandler(interval):
    for _ in xrange(10):
        time.sleep(interval)
if __name__ == ‘__main__‘:
    processes = []
    for _ in xrange(5):
        processes.append(TaskHandler(1))
    for p in processes:
        p.start()
    print ‘cpu number is:‘, multiprocessing.cpu_count()
    for p in multiprocessing.active_children():
        print ‘process pid:‘, p.pid
        print ‘process name:‘, p.name
    time.sleep(10)
    print ‘-----------------------------------------‘
    processes = []
    for _ in xrange(5):
        processes.append(multiprocessing.Process(target=taskhandler, args=(1,)))
    for p in processes:
        p.daemon = True
        p.start()
        p.join()
    # 思考: 此处为何没有打印任何子进程信息?
    for p in multiprocessing.active_children():
        print ‘process pid:‘, p.pid
        print ‘process name:‘, p.name


2. Lock类,主要用于多个进程互斥访问共享资源,避免冲突

l = multiprocessing.Lock() -> Lock

说明: 创建互斥锁对象,推荐使用with写法来代替acquire()和release()来手动创建释放锁.


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
import time
import multiprocessing
# 说明: 导入其它模块
class TaskHandler(multiprocessing.Process):
    def __init__(self, lock, fpath, *args, **kwargs):
        super(TaskHandler, self).__init__(*args, **kwargs)
        self.lock = lock
        self.fpath = fpath
    def run(self):
        with self.lock:
            with open(self.fpath, ‘a+b‘) as f:
                data = ‘‘.join([str(time.time()), os.linesep])
                f.write(data)
if __name__ == ‘__main__‘:
    proceses = []
    lock = multiprocessing.Lock()
    fpath = ‘multiprocessing.log‘
    for _ in xrange(10):
        proceses.append(TaskHandler(lock, fpath))
    for p in proceses:
        p.start()


3. Semaphore类,主要用于控制同时对共享资源访问子进程数,如池的最大连接数限定

s = multiprocessing.Semaphore(value=1) -> Semaphore

说明: 创建信号量对象,value表示同时对共享资源访问的子进程数


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
import time
import multiprocessing
# 说明: 导入其它模块
class TaskHandler(multiprocessing.Process):
    def __init__(self, s, *args, **kwargs):
        super(TaskHandler, self).__init__(*args, **kwargs)
        self.semaphore = s
    def run(self):
        # 限制同时只能有5个子进程访问共享资源
        with self.semaphore:
            time.sleep(5)
if __name__ == ‘__main__‘:
    s = multiprocessing.Semaphore(5)
    for _ in xrange(20):
        p = TaskHandler(s)
        p.daemon = True
        p.start()
    while True:
        processes = multiprocessing.active_children()
        if not len(processes):
            break
        print ‘running process => num: %s list: %s‘ % (len(processes), processes)
        time.sleep(1)


4. Event类,主要用于控制进程间同步通信

e = multiprocessing.Event() -> Event

说明: 创建信号对象,主要用于子进程之间同步通信

e.set() -> None

说明: 设置标志位

e.clear() -> None

说明: 清除标志位

e.is_set() -> boolean

说明: 判断是否设置了标志位

e.wait(self, timeout=None) -> None

说明: 阻塞当前子进程直到标志位被设置


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
import time
import multiprocessing
# 说明: 导入其它模块
def task001(e):
    for _ in xrange(0, 100):
        print _
    e.set()
def task002(e):
    e.wait()
    print ‘found notice: event is set...‘
    for _ in xrange(100, 200):
        print _
if __name__ == ‘__main__‘:
    e = multiprocessing.Event()
    p001 = multiprocessing.Process(target=task001, args=(e,))
    p002 = multiprocessing.Process(target=task002, args=(e,))
    p001.start()
    p002.start()


说明: 通过Evant类可以实现很方便的实现子进程与子进程,子进程与主进程之间的通信,甚至可以将所有子进程daemon设置为True,最后e.wait()阻塞,子进程中去设置此标识位来控制主进程的执行流程.

5. Pipe类,主要用于两个子进程之间的数据传递

p = multiprocessing.Pipe(duplex=True) -> tuple

说明: 创建通道对象,主要用于两个子进程之间的数据传递,返回管道的两个端对象1/2,如果duplex为true则全双工可以互相收发,否则1端只能接受消息,2端只能发送消息

p[0/1].send(picklable) -> None

说明: 发送数据支持任意可序列化对象

p[0/1].recv() -> picklable

说明: 如果没有消息可接收,recv会一直阻塞直至管道被关闭抛出EOFError异常


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import multiprocessing
from Queue import Empty, Full
# 说明: 导入其它模块
def producer(pipe):
    while True:
        data = {
            ‘thread‘: multiprocessing.current_process().name,
            ‘value‘: time.time()
        }
        try:
            pipe.send(data)
        except EOFError, e:
            break
        time.sleep(1)
def consumer(pipe):
    while True:
        try:
            print ‘producer: %(thread)s current value: %(value)s‘ % pipe.recv()
        except EOFError, e:
            break
        time.sleep(1)
if __name__ == ‘__main__‘:
    # 半双工模式下pipe[0]负责接收消息,pipe[1]负责发送消息
    pipe = multiprocessing.Pipe(duplex=False)
    p = multiprocessing.Process(target=producer, args=(pipe[1],))
    c = multiprocessing.Process(target=consumer, args=(pipe[0],))
    p.start()
    c.start()


6. Queue类,主要用于多个子进程之间的数据传递

q = multiprocessing.Queue(maxsize=0) -> Queue

说明: 创建队列对象,主要用于多个进程之间的数据传递

q.full() -> boolean

说明: 判断队列是否已满

q.close() -> None

说明: 关闭队列

q.empty() -> boolean

说明: 判断队列是否已空

q.put(obj, block=True, timeout=None) -> None

说明: 插入队列,block为False会立即抛出Queue.Full异常,否则会阻塞timeout时间,直到队列有剩余的空间,如果超时会抛出Queue.Full异常,还有一个同类方法q.put_nowait(obj)非阻塞插入立即抛Queue.Full异常

q.get(block=True, timeout=None) -> None

说明: 取出队列,block为false会立即抛出Queue.Empty异常,否则会阻塞timeout时间,直到队列有新对象插入,如果超时会抛出Queue.Empty异常,还有一个同类方法q.get_nowait()非阻塞读取立抛Queue.Empty异常


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import multiprocessing
from Queue import Empty, Full
# 说明: 导入其它模块
def producer(q):
    while True:
        data = {
            ‘thread‘: multiprocessing.current_process().name,
            ‘value‘: time.time()
        }
        try:
            q.put(data, block=False)
        except Full, e:
            continue
        time.sleep(1)
def consumer(q):
    while True:
        try:
            print ‘producer: %(thread)s current value: %(value)s‘ % q.get(block=False)
        except Empty, e:
            continue
        time.sleep(1)
if __name__ == ‘__main__‘:
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=producer, args=(q,))
    c = multiprocessing.Process(target=consumer, args=(q,))
    p.start()
    c.start()


7. Pool类,主要用于以进程池的形式自动管控进程池内子进程数目

p = multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) -> Pool

说明: 创建包含规定数目子进程池对象,并向这些工作进程传递作业,直到没有更多作业为止,processes表示初始化状态下的子进程数,maxtasksperchild表示为每个进程执行N个作业数后重新启动一个工作子进程防止运行时间过长导致消耗太多系统资源.

p.close() -> None

说明: 禁止新的子进程加入,所以必须放在p.join()前面

p.join() -> None

说明: 主进程阻塞等待子进程退出,必须出现在p.close()和p.terminate() 的后面

p.terminate() -> None

说明: 结束工作进程,不再处理未处理的任务.

p.apply(self, func, args=(), kwds={}) -> obj

说明: 同内置函数apply,默认等待进程池中子进程返回结果

p.apply_async(self, func, args=(), kwds={}, callback=None) -> ApplyResult

说明: 同内置函数apply,默认不等待子进程返回结果直接返回,结果使用返回对象get()方法回调获取

p.map(self, func, iterable, chunksize=None) -> list

p.map_async(self, func, iterable, chunksize=None, callback=None) -> MapResult

说明: 同上,但是支持接受iterable序列化对象,简化进程池调用,而且速度更快,推荐使用,结果使用返回对象get()方法回调获取


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
import pprint
import multiprocessing
# 说明: 导入其它模块
def read_filelist(p):
    result = []
    if not os.path.isdir(p):
        result.append(p)
        return result
    for root, dirs, files in os.walk(p):
        for item in files:
            fpath = os.path.join(root, item)
            result.append(fpath)
    return result
def read_filesize(f):
    return os.path.getsize(f), f
if __name__ == ‘__main__‘:
    file_list = read_filelist(‘C:\Users\Administrator\Desktop‘)
    pool = multiprocessing.Pool(20)
    file_size = pool.map_async(read_filesize, file_list)
    pool.close()
    pool.join()
    pprint.pprint(file_size.get())


说明: 如上例子先获取文件列表,然后通过异步回调获取所有文件大小,相对于使用apply或是apply_async需要每次append到一个列表中,此方法更加简化了多进程池的使用.推荐使用

时间: 2024-10-26 09:45:12

基础入门_Python-进线协程.分分钟玩转multiprocessing多进程编程?的相关文章

FreeRTOS基础以及UIP之协程--C语言剑走偏锋

在FreeRTOS中和UIP中,都使用到了一种C语言实现的多任务计数,专业的定义叫做协程(coroutine),顾名思义,这是一种协作的例程, 跟具有操作系统概念的线程不一样,协程是在用户空间利用程序语言的语法语义就能实现逻辑上类似多任务的编程技巧. 意思就是说协程不需要每次调用的时候都为任务准备一次空间,我们知道像ucos这种操作系统,它内置的多任务是需要在中断过程中切换堆栈的,开销较大,而协程的功能就是在尽量降低开销的情况下,实现能够保存函数上下文快速切换的办法,用操作系统的概念来说,一千个

协程(Coroutine)与多线程,多进程

执行多个任务可以使用多线程或多进程. 多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响 多线程中,所有变量都由所有线程共享.而线程间的切换是系统进行调度,无法控制,所以可能 一个进程中的多个线程可能会同时调用某个变量的值,造成变量值的混乱,这时就引进了线程锁,但是线程锁又容易造成死锁,也阻止了多线程的并发. 另外Python 解释器由于设计时有GIL全局锁,导致了多线程无法利用多核.多线程的并发在Python中就是一个美丽的梦. 在Thread和Process中,应当优选Proce

项目实战_Python.子进程/协程在固件检测更新升级程序中的正确姿势?

项目简介: 说明: 主要用于对接OA审批流程后自动下载固件生成更新后自动上传,具体实现代码请阅读代码 项目思路: 项目结构: firmwareupload/ ├── app │   ├── conf │   │   ├── config.py │   │   └── __init__.py │   ├── core │   │   ├── __init__.py │   │   ├── main.py │   │   └── task.py │   ├── __init__.py │   └──

FastRPC 3.2 发布,高性能 C++ 协程 RPC 框架

用过go erlang gevent的亲们应该都会知道协程在应用中带来的方便. 如果对协程不理解的同学,通过阅读下面例子可以快速了解我们框架的协程的意义,已了解的可以跳过这部分. 协程例子:假设我们要发个Get请求获取百度首页内容: php同步方式:$result = file_get_contents("http://www.baidu.com"), php果然是世界上最好的语言,多么简洁. 然后java和c++的同学开始不屑了: "呵呵, 同步,鄙视你不解释."

[转载]协程-cooperative multitasking

[转载]协程三讲 http://ravenw.com/blog/2011/08/24/coroutine-part-1-defination-and-classification-of-coroutine/ http://ravenw.com/blog/2011/09/01/coroutine-part-2-the-use-of-coroutines/ http://ravenw.com/blog/2011/09/06/coroutine-part-3-coroutine-and-continu

(转)零基础入门深度学习(6) - 长短时记忆网络(LSTM)

无论即将到来的是大数据时代还是人工智能时代,亦或是传统行业使用人工智能在云上处理大数据的时代,作为一个有理想有追求的程序员,不懂深度学习(Deep Learning)这个超热的技术,会不会感觉马上就out了?现在救命稻草来了,<零基础入门深度学习>系列文章旨在讲帮助爱编程的你从零基础达到入门级水平.零基础意味着你不需要太多的数学知识,只要会写程序就行了,没错,这是专门为程序员写的文章.虽然文中会有很多公式你也许看不懂,但同时也会有更多的代码,程序员的你一定能看懂的(我周围是一群狂热的Clean

游戏服务器之多进程架构通信 协程切换只是简单地改变执行函数栈,不涉及内核态与用户态转化,也涉及上下文切换,

游戏服务器之多进程架构通信 https://gameinstitute.qq.com/community/detail/124098 https://www.zhihu.com/question/23508968 游戏服务器与普通服务器有什么区别? 游戏开发中的TCP.UDP.HTTP.WebSocket四种网络通讯协议对比 https://gameinstitute.qq.com/community/detail/127562 https://www.jianshu.com/p/4eb37c1

 PHP_Yield协程从入门到精通

本文和大家分享的主要是PHP中Yield协程相关内容,一起来看看吧,希望对大家学习php有所帮助. 协程 基本概念 "协程"(Coroutine)概念最早由 Melvin Conway 于1958年提出.协程可以理解为纯用户态的线程,其通过协作而不是抢占来进行切换.相对于进程或者线程,协程所有的操作都可以在用户态完成,创建和切换的消耗更低.总的来说,协程为协同任务提供了一种运行时抽象,这种抽象非常适合于协同多任务调度和数据流处理.在现代操作系统和编程语言中,因为用户态线程切换代价比内核

Android基础入门教程——1.6 .9(九妹)图片怎么玩

Android基础入门教程--1.6 .9(九妹)图片怎么玩 标签(空格分隔): Android基础入门教程 1.本节引言: 可能有的一些疑问: 1.什么是.9图片? 答:图片后缀名前有.9的图片,如pic1.9.png这样的图片 2. .9图片能干嘛? 答: 在图片拉伸的时候特定的区域不会发生图片失真,而不失真的区域可以由我们自己绘制 3. .9图片用什么做? 答:工欲善其事,必先利其器,做.9图片的工具有: ①Android SDK自带:draw9patch.bat,不过这玩意出了好久,谷歌