Python中的多进程

由于cPython的gill,多线程未必是CPU密集型程序的好的选择。

多线程可以完全独立的进程环境中运行程序,可以充分利用多处理器。

但是进程本身的隔离带来的数据不共享也是一种问题,线程比进程轻量级。

1、Multiprocessing

import multiprocessing
import datetime

def calc(i):
    sum = 0
    for _ in range(10000000):
        sum += 1
    print(i,sum)

if __name__ == ‘__main__‘:
    start = datetime.datetime.now()

    ps =[]
    for i in range(5):
        p = multiprocessing.Process(target=calc,args=(i,),name=‘calc-{}‘.format(i))

        ps.append(p)
        p.start()

    for p in ps:
        p.join()

    delta = (datetime.datetime.now()-start).total_seconds()
    print(delta)
    print(‘end==‘)
    

0 10000000

1 10000000

2 10000000

3 10000000

4 10000000

2.738819

end==

多进程,是真的并行。


名称


说明


Pid


进程id


Exitcode


进程的退出状态码


Terminate()


终止指定的进程

进程间同步:同步提供了和线程同步一样的类,使用的方法一样,使用效果也类似。

不过,进程间代价要高于线程间,而且底层实现是不同的,只不过Python屏蔽了这些不同的地方,让用户简单使用多进程。

Multiprocessing还提供了共享内存,服务器进程来共享数据,还提供了queue队列,pipe管道用于进程间通信。

通信方式不同:

多进程就是启动多个解释器进程,进程间通信必须序列化和反序列化。

数据的线程安全性问题。由于每个进程之间没有实现度贤臣,gil可以说没有什么用了。

2、进程池

Multiprocessing。Pool是进程池类;


名称


说明


Apply(self,func,args=(),kwds={})


阻塞执行,导致主进程执行其他子进程一个个执行


Apply_async(self,func,args=(),kwargs={},callback=None,error_callback=None)


与apply方法用法一直,非阻塞执行,得到结果后会执行回调


Close()


关闭池,池不能再接受新的任务


Terminate()


结束工作进程,不在处理为处理的任务


Join()


主进程阻塞等待子进程的退出,join方法要在close或terminate之后使用

3、多进程、多线程的选择

CPU密集型:cPython中使用到了gill,多线程的时候锁相互竞争,且多核优势不能发挥,Python多进程效率更高。

IO密集型:适合多线程,可以减少多进程之间的IO序列化开销,且在IO等待的时候,切换到其他线程继续执行,效率不错。

4、应用场景

请求/应答模型:web应用中常见的处理模型。

Master启动多个worker工作进程,一般和CPU数目相同。发挥多核优势。

Worker工作进程中,往往需要操作网络IO和磁盘IO,启动多线程,提高高并发处理能力,worker处理用户的请求,往往需要等待数据,处理完请求还需要通过网络IO响应返回,这个就是nginx工作模式。

、concurrent包

1、concurrent.futures

异步并行任务编程模块,提供一个高级的异步可执行的便利接口。

提供了两个池执行器

Threadpoolexecutor异步调用的线程池的executor

Processpoolexecutor异步调用的进程池的executor。

2、threadpoolexecutor对象

首先需要定义一个池的执行器对象,executor类子类对象。


方法


含义


Threadpoolexecutor(man_worker=1)


池中至多创建max_wokers个线程的池来同时异步执行,返回exector实例


Submit(fn,*args,**kwargs)


提交执行的函数及其参数,返回future实例


Shutdown(wait=True)


清理池

Future类


方法


含义


Done()


如果调用被成功的取消或者执行完成,返回true


Cancelled()


如果调用被成功的取消,返回true


Running()


如果正在运行且不能被取消,返回true


Cancel()


尝试取消调用,如果已经执行且不能取消返回False,否则返回true


Result(timeout=none)


取返回的结果,timeout为none,一直等待返回,timeout设置到期,抛出concurrent.futures.timeouterror异常


Exception(timeout=none)


取返回的结果,timeout为none,一直等待返回,timeout设置到期,抛出concurrent.futures.timeouterror异常

import threading
from concurrent import futures
import logging
import time

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT,level=logging.INFO)

def worker(n):
    logging.info(‘start to work{]‘.format(n))
    time.sleep(4)
    logging.info(‘stop{}‘.format(n))

exector = futures.ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
    futures = exector.submit(worker,i)
    fs.append(futures)

for i in range(3,6):
    futures = exector.submit(worker,i)
    fs.append(futures)

while True:
    time.sleep(5)
    logging.info(threading.enumerate())

    flag = True
    for f in fs:
        logging.info(f.done())
        flag = flag and f.done()
    print(‘-‘*30)

    if flag:
        exector.shutdown()
        logging.info(threading.enumerate())
        break

------------------------------

2018-06-13 09:57:35,049 MainThread 8376 [<_MainThread(MainThread, started 8376)>, <Thread(Thread-1, started daemon 7480)>, <Thread(Thread-3, started daemon 7368)>, <Thread(Thread-2, started daemon 7464)>]

2018-06-13 09:57:35,049 MainThread 8376 True

2018-06-13 09:57:35,050 MainThread 8376 True

2018-06-13 09:57:35,050 MainThread 8376 True

2018-06-13 09:57:35,050 MainThread 8376 True

2018-06-13 09:57:35,050 MainThread 8376 True

2018-06-13 09:57:35,051 MainThread 8376 True

2018-06-13 09:57:35,051 MainThread 8376 [<_MainThread(MainThread, started 8376)>]

3、processpoolexector对象

import threading
from concurrent import futures
import logging
import time

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT,level=logging.INFO)

def worker(n):
    logging.info(‘start to work{]‘.format(n))
    time.sleep(4)
    logging.info(‘stop{}‘.format(n))

if __name__ == ‘__main__‘:
    exector = futures.ThreadPoolExecutor(max_workers=3)
    fs = []
    for i in range(3):
        futures = exector.submit(worker,i)
        fs.append(futures)

    for i in range(3,6):
        futures = exector.submit(worker,i)
        fs.append(futures)

    while True:
        time.sleep(5)
        logging.info(threading.enumerate())

        flag = True
        for f in fs:
            logging.info(f.done())
            flag = flag and f.done()
        print(‘-‘*30)

        if flag:
            exector.shutdown()
            logging.info(threading.enumerate())
            break

------------------------------

2018-06-13 10:01:18,076 MainThread 6436 [<Thread(Thread-3, started daemon 4188)>, <Thread(Thread-1, started daemon 7284)>, <_MainThread(MainThread, started 6436)>, <Thread(Thread-2, started daemon 6164)>]

2018-06-13 10:01:18,076 MainThread 6436 True

2018-06-13 10:01:18,076 MainThread 6436 True

2018-06-13 10:01:18,077 MainThread 6436 True

2018-06-13 10:01:18,077 MainThread 6436 True

2018-06-13 10:01:18,077 MainThread 6436 True

2018-06-13 10:01:18,077 MainThread 6436 True

2018-06-13 10:01:18,078 MainThread 6436 [<_MainThread(MainThread, started 6436)>]

进程代码的执行过程中,必须要加上if __name__ == ‘__main__’

4、支持上下文管理的调用

Concurrent.futures.processpoolexecutor继承自concurrent.futures.base.executor,而父类有__enter__、__exit__、方法,支持上下文管理,可以使用with语句。

__exit__方法本质上还是调用shutdown(wait=True),就会一直阻塞到所有运行的任务完成。

import threading
from concurrent import futures
import logging
import time

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s‘
logging.basicConfig(format=FORMAT,level=logging.INFO)

def worker(n):
    logging.info(‘start to work{}‘.format(n))
    time.sleep(5)
    logging.info(‘stop{}‘.format(n))
    return n + 100

if __name__ == ‘__main__‘:
    executor = futures.ProcessPoolExecutor(max_workers=3)

    with executor:
        fs = []
        for i in range(3):
            futures = executor.submit(worker,i)
            fs.append(futures)

        for i in range(3,6):
            futures = executor.submit(worker,i)
            fs.append(futures)

        while True:
            time.sleep(2)
            logging.info(threading.enumerate())

            flag = True
            for f in fs:
                logging.info(f.done())
                flag = flag and f.done()
                if f.done():
                    logging.info(‘result={}‘.format(f.result()))

            print(‘-‘*30)
            if flag:break

    logging.info(‘-------end--------‘)
    logging.info(threading.enumerate())

2018-06-13 10:18:35,744 MainThread 5468 start to work0

2018-06-13 10:18:35,751 MainThread 7936 start to work1

2018-06-13 10:18:35,763 MainThread 7020 start to work2

2018-06-13 10:18:37,528 MainThread 7976 [<_MainThread(MainThread, started 7976)>, <Thread(QueueFeederThread, started daemon 8136)>, <Thread(Thread-1, started daemon 3932)>]

2018-06-13 10:18:37,528 MainThread 7976 False

2018-06-13 10:18:37,529 MainThread 7976 False

2018-06-13 10:18:37,529 MainThread 7976 False

2018-06-13 10:18:37,529 MainThread 7976 False

2018-06-13 10:18:37,529 MainThread 7976 False

2018-06-13 10:18:37,529 MainThread 7976 False

------------------------------

------------------------------

2018-06-13 10:18:39,530 MainThread 7976 [<_MainThread(MainThread, started 7976)>, <Thread(QueueFeederThread, started daemon 8136)>, <Thread(Thread-1, started daemon 3932)>]

2018-06-13 10:18:39,530 MainThread 7976 False

2018-06-13 10:18:39,531 MainThread 7976 False

2018-06-13 10:18:39,531 MainThread 7976 False

2018-06-13 10:18:39,531 MainThread 7976 False

2018-06-13 10:18:39,532 MainThread 7976 False

2018-06-13 10:18:39,532 MainThread 7976 False

2018-06-13 10:18:40,744 MainThread 5468 stop0

2018-06-13 10:18:40,745 MainThread 5468 start to work3

2018-06-13 10:18:40,751 MainThread 7936 stop1

2018-06-13 10:18:40,753 MainThread 7936 start to work4

2018-06-13 10:18:40,764 MainThread 7020 stop2

2018-06-13 10:18:40,764 MainThread 7020 start to work5

2018-06-13 10:18:41,533 MainThread 7976 [<_MainThread(MainThread, started 7976)>, <Thread(QueueFeederThread, started daemon 8136)>, <Thread(Thread-1, started daemon 3932)>]

2018-06-13 10:18:41,533 MainThread 7976 True

2018-06-13 10:18:41,534 MainThread 7976 result=100

2018-06-13 10:18:41,534 MainThread 7976 True

2018-06-13 10:18:41,535 MainThread 7976 result=101

2018-06-13 10:18:41,535 MainThread 7976 True

2018-06-13 10:18:41,536 MainThread 7976 result=102

2018-06-13 10:18:41,536 MainThread 7976 False

2018-06-13 10:18:41,536 MainThread 7976 False

2018-06-13 10:18:41,537 MainThread 7976 False

------------------------------

------------------------------

2018-06-13 10:18:43,537 MainThread 7976 [<_MainThread(MainThread, started 7976)>, <Thread(QueueFeederThread, started daemon 8136)>, <Thread(Thread-1, started daemon 3932)>]

2018-06-13 10:18:43,537 MainThread 7976 True

2018-06-13 10:18:43,537 MainThread 7976 result=100

2018-06-13 10:18:43,538 MainThread 7976 True

2018-06-13 10:18:43,538 MainThread 7976 result=101

2018-06-13 10:18:43,538 MainThread 7976 True

2018-06-13 10:18:43,538 MainThread 7976 result=102

2018-06-13 10:18:43,538 MainThread 7976 False

2018-06-13 10:18:43,539 MainThread 7976 False

2018-06-13 10:18:43,539 MainThread 7976 False

------------------------------

2018-06-13 10:18:45,540 MainThread 7976 [<_MainThread(MainThread, started 7976)>, <Thread(QueueFeederThread, started daemon 8136)>, <Thread(Thread-1, started daemon 3932)>]

2018-06-13 10:18:45,540 MainThread 7976 True

2018-06-13 10:18:45,540 MainThread 7976 result=100

2018-06-13 10:18:45,541 MainThread 7976 True

2018-06-13 10:18:45,541 MainThread 7976 result=101

2018-06-13 10:18:45,541 MainThread 7976 True

2018-06-13 10:18:45,541 MainThread 7976 result=102

2018-06-13 10:18:45,542 MainThread 7976 False

2018-06-13 10:18:45,542 MainThread 7976 False

2018-06-13 10:18:45,542 MainThread 7976 False

2018-06-13 10:18:45,746 MainThread 5468 stop3

2018-06-13 10:18:45,754 MainThread 7936 stop4

2018-06-13 10:18:45,765 MainThread 7020 stop5

------------------------------

2018-06-13 10:18:47,542 MainThread 7976 [<_MainThread(MainThread, started 7976)>, <Thread(QueueFeederThread, started daemon 8136)>, <Thread(Thread-1, started daemon 3932)>]

2018-06-13 10:18:47,542 MainThread 7976 True

2018-06-13 10:18:47,542 MainThread 7976 result=100

2018-06-13 10:18:47,543 MainThread 7976 True

2018-06-13 10:18:47,543 MainThread 7976 result=101

2018-06-13 10:18:47,544 MainThread 7976 True

2018-06-13 10:18:47,544 MainThread 7976 result=102

2018-06-13 10:18:47,544 MainThread 7976 True

2018-06-13 10:18:47,544 MainThread 7976 result=103

2018-06-13 10:18:47,544 MainThread 7976 True

2018-06-13 10:18:47,545 MainThread 7976 result=104

2018-06-13 10:18:47,545 MainThread 7976 True

2018-06-13 10:18:47,545 MainThread 7976 result=105

2018-06-13 10:18:47,587 MainThread 7976 -------end--------

2018-06-13 10:18:47,587 MainThread 7976 [<_MainThread(MainThread, started 7976)>]

总结,统一了线程池、进程池调用,简化了编程。

无法设置线程名称。

原文地址:https://www.cnblogs.com/wangchunli-blogs/p/9949898.html

时间: 2024-10-08 00:16:45

Python中的多进程的相关文章

python中的多进程处理

转载于:http://blog.csdn.net/jj_liuxin/article/details/3564365 帮助文档见https://docs.python.org/2.7/library/multiprocessing.html 众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的:但在python 3.0中吸收了开源模块,开始支持系统原生的进程处理——multiprocessing. 注意:这个模块的某些函数需要操作系统的支持,例如,mu

Python中的多进程与多线程(二)

在上一章中,学习了Python多进程编程的一些基本方法:使用跨平台多进程模块multiprocessing提供的Process.Pool.Queue.Lock.Pipe等类,实现子进程创建.进程池(批量创建子进程并管理子进程数量上限)以及进程间通信.这一章学习下Python下的多线程编程方法. 一.threading 线程是操作系统执行任务的最小单元.Python标准库中提供了threading模块,对多线程编程提供了很便捷的支持. 下面是使用threading实现多线程的代码: 1 #!/us

Python中的多进程与多线程/分布式该如何使用

在批评Python的讨论中,常常说起Python多线程是多么的难用.还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行.因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行.必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情.如果你还没看过的话,我建议你看看Eqbal Quran的文章<Ruby

在Python中使用多进程快速处理数据

转自:https://blog.csdn.net/bryan__/article/details/78786648 数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并.应用场景:多进程爬虫,类mapreduce任务.缺点是子进程会拷贝父进程所有状态,内存浪费严重. import math from multiprocessing import Pool def run(data, index, size): # d

python中的Queue与多进程(multiprocessing)

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程 一.先说说Queue(队列对象) Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的"先吃先拉"与"后吃先吐",其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多 import Queue q = Queue.Queue(10

python 多线程和多进程

多线程与多进程 知识预览 一 进程与线程的概念 二 threading模块 三 multiprocessing模块 四 协程 五 IO模型 回到顶部 一 进程与线程的概念 1.1 进程 考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?另外,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源.你是不是已经想到在程序A读取数据的过程中,

python多线程,多进程编程。

程,是目前计算机中为应用程序分配资源的最小单位: 进程,是目前计算机中运行应用程序的最小单位: 在实际系统中,其实进程都是被分为进程来实现的,所以参与时间片轮转的是线程: 但是管理应用程序的资源的单位和任务调度的单位都是进程.更像是一个逻辑概念. 线程是进程分出来的更精细的单位,线程间的上下文切换比进程间的上下文切换,要快很多. 多进程与多核,这个概念很奇怪,因为进程是不会直接在核心上运行的. 多线程与多核,涉及一个内核线程与用户线程的对应关系. 内核线程(Kernel Thread),一般与核

搞定python多线程和多进程

1.1 线程 1.1.1 什么是线程 线程是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务.一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令. 1.1.2 线程的工作方式 假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时恢复到当时读的具体进度.有一个方法就是记下页数.行数与字数这三个数值,这些数值就是exe

Python — 多线程与多进程

1.多线程 线程是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位,一个进程可以包含多个线程.一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务.本质上CPU同一时刻只干了一件事,只能执行一个线程.一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令. 并行是指两个或者多个事件在同一时刻发生:而并发是指两个或多个事件在同一时间间隔内发生. 2.多进程 一个程序的执行实例就是一