多进程 (二) — 信号传递与进程控

内容目录:

  1. multiprocessing.Queue()
  2. JoinableQueue
  3. 进程间的信号传递 Event
  4. 控制对资源的访问 Lock
  5. 同步操作 Condition
  6. 控制对资源的并发访问 Semaphore
  7. 管理共享状态 Manager
  8. 共享命名空间 mgr.Namespace()
  9. 进程池 multiprocessing.Pool

1. multiprocessing.Queue()

和线程一样,多进程的一个常见的使用模式是将一个任务划分为几个worker,以便并行运行。有效地使用多进程通常需要它们之间的一些通信,这样工作就可以被分割,结果可以被聚合。一种简单方法是使用队列multiprocessing.Queue()来回传递消息。任何可以用pickle序列化的对象都可以通过队列。

import multiprocessing

class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print(‘Doing something fancy in {} for {}!‘.format(
            proc_name, self.name))

def worker(q):
    obj = q.get()
    obj.do_something()

if __name__ == ‘__main__‘:
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass(‘Fancy Dan‘))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

结果:当q是空的时候,q.get()会等。

Doing something fancy in Process-1 for Fancy Da

2. JoinableQueue

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

使用None这个特殊值来判断是否结束Worker

import multiprocessing
import time

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print(‘{}: Exiting‘.format(proc_name))
                self.task_queue.task_done()
                break
            # next_task是Task()的一个实例,打印next_task会输出__str__
            print(‘{}: {}‘.format(proc_name, next_task))
            # 执行next_task()会执行__call__
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)

class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return ‘{self.a} * {self.b} = {product}‘.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return ‘{self.a} * {self.b}‘.format(self=self)

if __name__ == ‘__main__‘:
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print(‘Creating {} consumers‘.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print(‘Result:‘, result)
        num_jobs -= 1

执行结果:

Creating 8 consumers
Consumer-4: 0 * 0
Consumer-1: 1 * 1
Consumer-2: 2 * 2
Consumer-4: 3 * 3
Consumer-1: 4 * 4
Consumer-2: 5 * 5
Consumer-1: 6 * 6
Consumer-6: 7 * 7
Consumer-4: 8 * 8
Consumer-2: 9 * 9
Consumer-1: Exiting
Consumer-4: Exiting
Consumer-6: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-8: Exiting
Consumer-3: Exiting
Consumer-7: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 4 * 4 = 16
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 7 * 7 = 49
Result: 9 * 9 = 81

3. 进程间的信号传递 Event

Event类提供了一种简单的方法来在进程之间传递状态信息。

当wait()超时时,它返回时不会出现错误。调用者负责使用is_set()检查事件的状态

import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print(‘wait_for_event: starting‘)
    e.wait()
    print(‘wait_for_event: e.is_set()->‘, e.is_set())

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print(‘wait_for_event_timeout: starting‘)
    e.wait(t)
    print(‘wait_for_event_timeout: e.is_set()->‘, e.is_set())

if __name__ == ‘__main__‘:
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name=‘block‘,
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name=‘nonblock‘,
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()
    print(‘main: waiting before calling Event.set()‘)
    time.sleep(3)
    e.set()
    print(‘main: event is set‘)

执行结果:

main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True

4. 控制对资源的访问 Lock

在需要在多个进程之间共享单个资源的情况下,可以使用锁来避免冲突的访问。

import multiprocessing
import sys

def worker_with(lock):
    with lock:
        sys.stdout.write(‘Lock acquired via with\n‘)

def worker_no_with(lock):
    lock.acquire()
    try:
        sys.stdout.write(‘Lock acquired directly\n‘)
    finally:
        lock.release()

if __name__ == ‘__main__‘:
    lock = multiprocessing.Lock()
    w = multiprocessing.Process(
        target=worker_with,
        args=(lock,),
    )
    nw = multiprocessing.Process(
        target=worker_no_with,
        args=(lock,),
    )

    w.start()
    nw.start()

    w.join()
    nw.join()

运行结果:

Lock acquired via with
Lock acquired directly

5. 同步操作 Condition

cond.wait()等着,cond.notify_all()通知可以往下运行了

import multiprocessing
import time

def stage_1(cond):
    """perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print(‘Starting‘, name)
    with cond:
        print(‘{} done and ready for stage 2‘.format(name))
        cond.notify_all()

def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print(‘Starting‘, name)
    with cond:
        cond.wait()
        print(‘{} running‘.format(name))

if __name__ == ‘__main__‘:
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name=‘s1‘,
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name=‘stage_2[{}]‘.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

运行结果:在这个例子中,两个进程并行地运行第二阶段的工作,但是只有在第一个阶段完成之后。

Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running

6. 控制对资源的并发访问 Semaphore

有时,允许多个worker一次访问一个资源是很有用的,但要限制了数量。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

运行结果:

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-1acquire
Process-1release

Process-5acquire
Process-4release

Process-5release

7. 管理共享状态 Manager

通过Manager共享信息,所有进程都能看得到。

import multiprocessing
import pprint

def worker(d, key, value):
    d[key] = value

if __name__ == ‘__main__‘:
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print(‘Results:‘, d

运行结果:通过Manager创建列表,它是共享的,并且在所有进程中都可以看到更新。字典也支持。

Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}

8. 共享命名空间 Manager

除了字典和列表之外,管理者还可以创建一个共享的名称空间。

import multiprocessing

def producer(ns, event):
    ns.value = ‘This is the value‘
    event.set()

def consumer(ns, event):
    try:
        print(‘Before event: {}‘.format(ns.value))
    except Exception as err:
        print(‘Before event, error:‘, str(err))
    event.wait()
    print(‘After event:‘, ns.value)

if __name__ == ‘__main__‘:
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

运行结果:可以看到在另一个进程中可以对mgr.Namespace()进行复制,其他进程可以访问。

Before event, error: ‘Namespace‘ object has no attribute ‘value‘
After event: This is the value

重要的是要知道mgr.Namespace()中可变值的内容的更新不会自动传播。

import multiprocessing

def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append(‘This is the value‘)
    event.set()

def consumer(ns, event):
    print(‘Before event:‘, ns.my_list)
    event.wait()
    print(‘After event :‘, ns.my_list)

if __name__ == ‘__main__‘:
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

运行结果:

Before event: []
After event : []

9. 进程池 multiprocessing.Pool

池类可用于管理固定数量的worker,用于简单的工作,在这些情况下,可以将工作分解并独立地分配给worker。

import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print(‘Starting‘, multiprocessing.current_process().name)

if __name__ == ‘__main__‘:
    inputs = list(range(10))
    print(‘Input   :‘, inputs)

    builtin_outputs = map(do_calculation, inputs)
    print(‘Built-in:‘, builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print(‘Pool    :‘, pool_outputs)

运行结果:进程的返回值被收集并作为一个列表返回。

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x000000000256A080>
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-8
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

在默认情况下,池会创建固定数量的worker进程,并将作业传递给他们,直到没有更多的工作。设置maxtasksperchild参数告诉池在完成了几个任务后重新启动worker进程,防止长时间运行的worker消耗更多的系统资源。

import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print(‘Starting‘, multiprocessing.current_process().name)

if __name__ == ‘__main__‘:
    inputs = list(range(10))
    print(‘Input   :‘, inputs)

    builtin_outputs = map(do_calculation, inputs)
    print(‘Built-in:‘, builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print(‘Pool    :‘, pool_outputs)

运行结果:当工人完成分配的任务时,即使没有更多的工作,他们也会重新开始工作。在这个输出中,有9个worker被创建,尽管只有10个任务,有的worker一次可以完成其中的两个任务。

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x00000000025CA080>
Starting SpawnPoolWorker-4
Starting SpawnPoolWorker-2
Starting SpawnPoolWorker-1
Starting SpawnPoolWorker-5
Starting SpawnPoolWorker-3
Starting SpawnPoolWorker-8
Starting SpawnPoolWorker-6
Starting SpawnPoolWorker-7
Starting SpawnPoolWorker-9
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

原文地址:https://www.cnblogs.com/zhang-anan/p/8996529.html

时间: 2024-10-10 11:46:45

多进程 (二) — 信号传递与进程控的相关文章

二十四进制编码串转换为32位无符号整数(C语言实现)

typedef int BOOL; #define TRUE 1; #define FALSE 0; #define UINT_MAX 0xffffffff /* maximum unsigned int value */ enum Scale24AsciiVal { sav_aADis = 32, // 小写字母与大写字母ASCII码差值 sav_chIntDis = 48, // 字符'0'ASCII码值 }; static const char scale24[24] = {'0', '1

MFC非模态添加进程控件方法一(线程方法)

由于非模态对话框的自己没有消息循环,创建后无法进行消息处理.需要和父窗口共用消息循环.如果单独在子窗口进行控件由于自己没有单独的消息循环,更新是无法进行的. 如果在父窗口更新控件会造成程序假死.如以下代码在主窗口更新子窗口消息,界面进入假死状态.因为主界面对主进程进行了sleep(100),如下代码所示 void CModelessDlg::OnBnClickedOk() { DLGModeLess *pDlg = new DLGModeLess(); pDlg->Create(IDD_DG_M

python并发编程之多进程(二):互斥锁(同步锁)&amp;进程其他属性&amp;进程间通信(queue)&amp;生产者消费者模型

一,互斥锁,同步锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 竞争带来的结果就是错乱,如何控制,就是加锁处理 part1:多个进程共享同一打印终端 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

JavaSE学习(二):进制转换-数据类型转换-Java运算符

一.进制转换 1.1 其他进制转十进制(以十六进制为例): 十六进制范围:0-9, A-F对应数字10-15 2A7E(16)  =  14*16(0) +7*16(1) + 10*16(2)  + 2*16(3) =  10878(10) 1.2 十进制转二进制:将十进制数除以2,取余数,将最后一位余数排在第一位,倒数第二位的数排在第二位,依次排列,构成的01数字串即为表示该十进制数的二进制数. 总结:十进制%二进制 余数 0-1(Java中符号 "%"表示取余),逆序排列. (图片

百万年薪python之路 -- 并发编程之 多进程二

1. 僵尸进程和孤儿进程 基于unix的环境(linux,macOS) 主进程需要等待子进程结束之后,主进程才结束 主进程时刻检测子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收. 为什么主进程不在子进程结束后马上对其回收呢? 主进程与子进程是异步关系,主进程无法马上捕获子进程什么时候结束 如果子进程结束之后马上在内存中释放资源,主进程就没有办法检测子进程的状态. Unix针对于上面的问题提供了一个机制 所有的子进程结束之后,立马会释放掉文件的操作链接和内存的大部分数据,但是会

进程控

进程的控制大概包括进程创建.进程执行和进程终止,附加有进程的属性. 进程标识符 每个进程都有一个唯一的ID,它是一个非负整数.进程ID可以重用,当进程终止后,UNIX一般通过延迟重用算法,使得赋予新进程的ID不同于最近终止进程的ID. 系统中有些专用ID.ID为0的通常是调度进程,也被称为交换进程(swapper),它是内核的一部分,不执行磁盘上的任何程序,也被称为系统进程.ID为1的通常是init进程,在自举过程结束时由内核调用,它负责读写系统有关的初始化文件,并将系统引导到一个状态,但它不会

python学习笔记——多进程二 进程的退出

1.进程的退出函数 进程的退出含有有os._exit([status])和sys.exit([])两种,从数据包来看,该退出模块仅在linux或者unix系统下可用,windows系统下没有该模块 原文地址:https://www.cnblogs.com/gengyi/p/8586357.html

一个女大学生的代码学习之路(二)

首先说一下,写这种文章是由于我在四月四日晚上,在手动搭建自己的第一个ssh项目的时候,遇到了一个配置的问题,怎么解决也弄不好,当时是四号晚上九点,我看了一眼表,我就想两个小时之内,我要是能搞定就算行了,但是其实,我搞到三点才OK(凌晨),那时候已经是五号了,转天是一家子去扫墓的时候,结果我居然以这种一个理由没有去,理由是我太累了么?我只是就是搭了一个架子,就是由于我的包太混乱了,导致不兼容,所以tomcat总也不启动,你可能认为好笑,这么简单一个问题怎么就费这多多时间呢,但是作为一个刚接触三框架

计算机进制转换

一.计算机只认识0和1,二进制. 二.2进制转换成 8进制 和 16进制,如下图: 二进制 > 八进制 :  研究上图发现,3位最高二进制可以用来表示一位八进制.所以,将二进制分解每3位,不够前面补0,然后每3位转换为10进制,顺序排列即可. 二进制 > 十六进制  :4位最高二进制可以用来表示一位十六进制.所以,将二进制分解每4位,不够前面补0,然后每4位转换为10进制,超过9用字母表示即可.顺序排列即可. 如下: 二进制 > 十进制:   11001001 = 2^7+2^6+2^3