[python 并行3]进程

进程篇

基本使用 1

#coding=utf-8

import multiprocessing
import os       # 获取pid用
import time     # 延时用

# 子进程要执行的函数
def child_proc(name):
    print(f'child process {name} pid: {os.getpid()}')
    time.sleep(3)
    print(f'{name} finish')

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    print(f'parent process {os.getpid()} is running')

    # 生成子进程
    p1 = multiprocessing.Process(target = child_proc, args = ('child-1',))
    p2 = multiprocessing.Process(target = child_proc, args = ('child-2',))
    p1.start()
    p2.start()

    print(f'parent process {os.getpid()} is end')

输出

parent process 20114 is running
parent process 20114 is end
child process child-1 pid: 20115
child process child-2 pid: 20116
child-1 finish
child-2 finish

注意! Python官方文档提到为何必须要使用if __name__ = ‘__main__‘,由于该包的所有功能都需要将主模块导入到子模块中,但是IDLE无法将__main__模块导入子模块,所以只能在文件中编辑好程序执行
更多multiprocessing — Process-based parallelism


multiprocessing模块

:根据查看multiprocessing模块声明,第一行注释显示# Package analogous to ‘threading.py‘ but using processes,可以发现进程的操作与线程相似 (甚至多进程的模块直接就是线程模块改过来的)

函数声明: class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
(group: 官方预留的参数)
target: 子线程要执行的函数
name: 给子线程命名
args: 传递参数到要执行的函数中
(类型为元组)*
daemon: 将线程设置为后台线程 2

Thread类包含的方法:

  • start(): 开始进程,它会安排在单独的控制进程中使该对象的run()方法被调用 (invoked) (如果多次调用,会发生错误AssertionError
  • run(): 你可以在子类中重写这个方法,标准的run()方法会在构造器传递了target参数后调用它
  • join(timeout=None): 阻塞当前进程,直到等待调用了join()的进程结束,或到达设置的超时timeout的参数为止
  • name: 进程名
  • is_alive(): 判断进程是否在运行
  • daemon: 是否为后台进程的属性值
  • pid: 返回进程的id
  • terminate(): 结束进程
    (在Unix上,使用SIGTERM信号量完成;在windows上使用TerminateProcess())
    (请注意,不会执行退出处理程序和最后的子句等。请注意,进程的后代进程不会被终止 - 它们将简单地变成孤立的。)
    (注:如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能被其他进程无法使用。 类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。)
  • kill(): 杀掉进程,与terminate()相同,但在Unix中使用SIGKILL信号量
  • close(): 关闭进程对象,释放所有与之相关的资源。如果是还在运行,会raised错误ValueError,第一次调用会返回成功,其它调用会raise错误ValueError
  • exitcode: 子进程的退出码。如果不是被terminate终止的,将会是None;如果被信号量N终止的,将会返回-N
  • authkey: 进程的身份钥匙(1字节字符串)。当multipriocessing在主进程中被初始化时,会使用os.urandom()标记一个随机字符串。当一个进程对象被创建时,它将会从父进程继承这个身份钥匙,虽然它可能会被改为其它字节字符串
  • sentinel: (哨兵)当进程结束时,一个数值的系统对象处理将变为ready。如果你想立即要等待几个事件,你能用这个值使用multiprocessing.connection.wait(),否则调用join()更简单。

Pool进程池

如果进程太多,超过了CPU核数,会导致进程之间的来回切换,影响性能。可以通过创建进程池,把进程加入到里面,如果池中进程没满,就会创建一个进程来执行请求;如果池中进程达到规定的最大值,那么请求会等待
函数声明: class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes: 指定池中进程数量,如果不指定,则为None,默认就会使用os.cpu_count()
initializer: 如果不是None,每个进程在开始时都会调用initializer(*initargs)
maxtasksperchild: 工作进程在退出和替换新的工作进程之前,可以完成的任务数量,以释放资源。默认为None,表示工作进程和这个池的生存时间一样长
context: 用来指定工作进程的上下文

下面有2种方法添加进程到进程池中,分别是apply_asyncapply

apply_async

apply_async()用来同步执行进程,允许多个进程同时进入池;是异步非阻塞的
函数声明: apply_async(func[, args[, kwds[, callback[, error_callback]]]])
callback: 如果指定了回调,那么它应该是一个可调用的,它接受一个参数

#coding=utf-8

import multiprocessing
import time     # 延时用

# 子进程要执行的函数
def child_proc(index):
    print(f'{index} process is running')
    time.sleep(3)
    print(f'{index} process is end')

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    print(f'all process start')

    # 生成进程池
    p = multiprocessing.Pool()
    for i in range(5):
        p.apply_async(func = child_proc, args = (i,))
    p.close()
    p.join()    # 注! 如果不执行此句,将会直接退出主进程

    print(f'all process done!')

输出

all process start
0 process is running
1 process is running
2 process is running
3 process is running
4 process is running
0 process is end
3 process is end
2 process is end
4 process is end
1 process is end
all process done!

apply

apply(): 只允许一个进程进入池,在一个进程结束后,另一个才能进入;是阻塞的
函数声明: apply(func[, args[, kwds]])

p.apply(func = child_proc, args = (i,)) # 将上面代码中的apply_async换成apply即可

输出

all process start
0 process is running
0 process is end
1 process is running
1 process is end
2 process is running
2 process is end
3 process is running
3 process is end
4 process is running
4 process is end
all process done!

分析:对比apply_async和apply的输出,可以发现,apply_async是同步执行的,而apply是一个一个进入池中执行的


数据共享

参考:Python多进程编程-进程间共享数据
PipeQueue都有一定的数据共享功能,但他们会阻塞进程)
Queue:采用共享队列的内存的方式共享数据
注意:存在queue.Queuemultiprocessing.Queue两种队列
queue.Queue:是进程内非阻塞队列,各进程私有
multiprocessing.Queue:是跨进程通信队列,各个子进程共有

共享内存:使用multiprocessing的ValueArray类,实现共享内存的方式共享数据
共享进程:使用multiprocessing的Manager类,实现共享进程的方式共享数据

获取返回值(仅主进程获有数据)

针对进程池实现的方式,可以直接通过获取进程对象的返回值

#coding=utf-8

import multiprocessing

# 子进程要执行的函数
def child_proc(x, y):
    return x + y

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 生成进程池
    p = multiprocessing.Pool()
    z = p.apply(func = child_proc, args = (1, 2))
    print(z)

Queue

使用multiprocessing的Queue类,实现进程之间的数据共享

#coding=utf-8

from multiprocessing import Process, Queue

# 子进程要执行的函数
def child_proc(queue):
    num = queue.get()
    num += 10
    queue.put(num)

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 创建共享数据
    queue = Queue()
    queue.put(1000)

    # 创建进程
    p1 = Process(target = child_proc, args = (queue,))
    p2 = Process(target = child_proc, args = (queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 打印结果
    print(queue.get())

Value、Array

共享内存有2个结构 - ValueArray,它们内部都实现了锁机制,因此是多进程安全的

#coding=utf-8

from multiprocessing import Process, Value, Array

# 子进程要执行的函数
def child_proc(num, li):
    num.value += 100
    for i in range(len(li)):
        li[i] += 10

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 创建共享数据
    num = Value('d', 0.0)
    li = Array('i', range(10))

    # 创建进程
    p1 = Process(target = child_proc, args = (num, li))
    p2 = Process(target = child_proc, args = (num, li))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 打印结果
    print(num.value)
    for x in li:
        print(x)

ValueArray都需要设置其中存放值的类型。d:double;i:int;c:char等
详细:转到multiprocessing.sharedctypes可以查看到各种类型的字符串定义

Manager

(上面的共享内存通过Value和Array结构实现,这些值在主进程中管理,很分散)
Manager通过共享内存来实现共享数据,支持的数据类型很多
详细multiprocessing.managers

#coding=utf-8

from multiprocessing import Process, Manager

# 子进程要执行的函数
def child_proc(dict1, list1):
    dict1['yourname'] += ' snow'
    list1[3] += 10

# 主进程,必须在主模块中执行
if __name__ == '__main__':
    # 创建共享数据
    manager = Manager()
    dict1 = manager.dict()
    list1 = manager.list(range(4))

    dict1['yourname'] = 'youmux'
    list1[3] = 0

    # 创建进程
    p1 = Process(target = child_proc, args = (dict1, list1))
    p2 = Process(target = child_proc, args = (dict1, list1))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 打印结果
    print(dict1['yourname'])
    for x in list1:
        print(x)


1.参考书籍: 参考书籍:《Python并行编程手册》

2.后台线程: 后台线程在主线程停止后就直接停止运行。他们资源(如打开的文件,数据库事务等)可能不会被正确的释放。如果你想要你的线程优美的停止,让他们不要变为后台和使用一个合适的信号机制如事件Event

原文地址:https://www.cnblogs.com/maplesnow/p/12044387.html

时间: 2024-11-02 18:53:00

[python 并行3]进程的相关文章

python中的进程、线程(threading、multiprocessing、Queue、subprocess)

Python中的进程与线程 学习知识,我们不但要知其然,还是知其所以然.你做到了你就比别人NB. 我们先了解一下什么是进程和线程. 进程与线程的历史 我们都知道计算机是由硬件和软件组成的.硬件中的CPU是计算机的核心,它承担计算机的所有任务. 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配.任务的调度. 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等. 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专

Python 线程(threading) 进程(multiprocessing)

*:first-child { margin-top: 0 !important; } body>*:last-child { margin-bottom: 0 !important; } /* BLOCKS =============================================================================*/ p, blockquote, ul, ol, dl, table, pre { margin: 15px 0; } /* HEAD

Python 并行分布式框架 Celery

Celery 简介 除了redis,还可以使用另外一个神器---Celery.Celery是一个异步任务的调度工具. Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农. 在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是"中间人"的意思,在这里 Broker 起到一个中间人的角色.在工头提

【转】Python 并行分布式框架 Celery

原文链接:https://blog.csdn.net/freeking101/article/details/74707619 Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/ celery配置:http://docs.jinkan.o

线程池Python 线程、进程和协程

Python   线程 Threading是用于提供线程相关的操作,线程是应用程序中工作的最小单元.线程与进程的关系下图所示: 子线程是由主线程产生的,但两者并没有关联. 利用threading创建线程: 1 '''利用threading包创建''' 2 import threading 3 import time 4 5 def run(n): 6 time.sleep(2) 7 print("task:",n) 8 9 '''串行:一个运行完后,再运行另外一个''' 10 run(

[python 并行1]简介

并行编程-介绍篇 设计并行编程 1 任务分解:将程序分解为任务,在不同处理器上执行以实现并行化.(可以使用以下两种方法) 领域分解:将问题数据分解 (当处理的数据量很大时,分开处理) 功能性分解:将问题分解为任务 (把大的任务分解为多个小任务处理) 任务分配:将任务分配到各个处理器上 (目的是负载均衡) 聚集:将小任务与大任务合并到一起从而改进性能的过程 如果任务数量远远超过可用的处理器数量,由于线程切换等其它因素会导致效率降低 如果计算机有针对大量小任务而进行特别的设计,如采用GPU计算,那将

Python 中的进程、线程、协程、同步、异步、回调

进程和线程究竟是什么东西?传统网络服务模型是如何工作的?协程和线程的关系和区别有哪些?IO过程在什么时间发生? 在刚刚结束的 PyCon2014 上海站,来自七牛云存储的 Python 高级工程师许智翔带来了关于 Python 的分享<Python中的进程.线程.协程.同步.异步.回调>. 一.上下文切换技术 简述 在进一步之前,让我们先回顾一下各种上下文切换技术. 不过首先说明一点术语.当我们说"上下文"的时候,指的是程序在执行中的一个状态.通常我们会用调用栈来表示这个状

python定时杀进程

python定时杀进程 之前写了个python脚本用selenium+phantomjs爬新帖子,在循环拉取页面的过程中,phantomjs总是block住,使用WebDriverWait设置最长等待时间无效.用firefox替换phantomjs无改善 因为这个脚本不会长期使用,因此采取临时办法,新开一个子线程固定周期杀死phantomjs进程,这样selenium就会在block最多不超过此周期后返回.当然在爬虫脚本中做一些微调防止部分url被跳过 定时执行任务采用sched模块,很多人将其

Python线程,进程,携程,I/O同步,异步

只有本人能看懂的-Python线程,进程,携程,I/O同步,异步 举个栗子: 我想get三个url,先用普通的for循环 import requests from multiprocessing import Process from threading import Thread import requests import time # -----正常遍历 串行 同步----- def get_page(url): page = requests.get(url) print(url) st