mutiprocessing 同步类型,如锁,条件和队列官方案例:

官方文档:https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing

1。 同步类型,如锁,条件和队列官方案例:

#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten

#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print ‘\n\t\t\t‘ + str(multiprocessing.current_process()) + ‘ has finished‘
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value(‘i‘, TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print ‘No more running processes‘

#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put(‘STOP‘)

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != ‘STOP‘:
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print ‘TIMEOUT‘

    print

#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print ‘\t‘ + str(cond)
    time.sleep(2)
    print ‘\tchild is notifying‘
    print ‘\t‘ + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print ‘main is waiting‘
    cond.wait()
    print ‘main has woken up‘

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond

#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, ‘tasks are running‘
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print ‘%s has finished‘ % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value(‘i‘, 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print ‘\tchild sleeping‘
    time.sleep(5.5)
    print ‘\n\tchild terminating‘

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print ‘waiting for process to finish‘

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print ‘.‘,
        sys.stdout.flush()

#### TEST_EVENT

def event_func(event):
    print ‘\t%r is waiting‘ % multiprocessing.current_process()
    event.wait()
    print ‘\t%r has woken up‘ % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print ‘main is sleeping‘
    time.sleep(2)

    print ‘main is setting event‘
    event.set()

    for p in processes:
        p.join()

#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print ‘Tests passed‘

def test_sharedvalues():
    values = [
        (‘i‘, 10),
        (‘h‘, -2),
        (‘d‘, 1.25)
        ]
    arrays = [
        (‘i‘, range(100)),
        (‘d‘, [0.25 * i for i in range(100)]),
        (‘H‘, range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0

####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print ‘\n\t######## %s\n‘ % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, ‘_debug_info‘):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError(‘there should be no positive refcounts left‘)

if __name__ == ‘__main__‘:
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == ‘processes‘:
        print ‘ Using processes ‘.center(79, ‘-‘)
        namespace = multiprocessing
    elif sys.argv[1] == ‘manager‘:
        print ‘ Using processes and a manager ‘.center(79, ‘-‘)
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == ‘threads‘:
        print ‘ Using threads ‘.center(79, ‘-‘)
        import multiprocessing.dummy as namespace
    else:
        print ‘Usage:\n\t%s [processes | manager | threads]‘ % sys.argv[0]
        raise SystemExit(2)

    test(namespace)

 下面是一个示例,显示了如何使用队列将任务提供给一组工作进程并收集结果:

# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, ‘STOP‘):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return ‘%s says that %s%s = %s‘ %         (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print ‘Unordered results:‘
    for i in range(len(TASKS1)):
        print ‘\t‘, done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print ‘\t‘, done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put(‘STOP‘)

if __name__ == ‘__main__‘:
    freeze_support()
    test()

  

原文地址:https://www.cnblogs.com/SunshineKimi/p/12054433.html

时间: 2024-08-07 11:59:46

mutiprocessing 同步类型,如锁,条件和队列官方案例:的相关文章

进程间同步(1)——条件变量和互斥量

1. 概述 条件变量和互斥量是最基本的同步形式,总是用于同步同一个进程的各个线程间同步. 当把条件变量或互斥量放在共享内存区时,可用于进程间同步. 同样的情况还有读写锁,它们都是随进程的持续性.   2.互斥锁 互斥锁指代相互排斥,用于保护临界区.多个线程和多个进程分享的共享数据. 静态初始化:static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; 动态初始化:互斥锁是动态分配的,pthread_mutex_init(&mutex);初始

JAVA之旅(十三)——线程的安全性,synchronized关键字,多线程同步代码块,同步函数,同步函数的锁是this

JAVA之旅(十三)--线程的安全性,synchronized关键字,多线程同步代码块,同步函数,同步函数的锁是this 我们继续上个篇幅接着讲线程的知识点 一.线程的安全性 当我们开启四个窗口(线程)把票陆陆续续的卖完了之后,我们要反思一下,这里面有没有安全隐患呢?在实际情况中,这种事情我们是必须要去考虑安全问题的,那我们模拟一下错误 package com.lgl.hellojava; import javax.security.auth.callback.TextInputCallback

内核同步之自旋锁与读写自旋锁

用在多个CPU系统中的锁机制,当一个CPU正访问自旋锁保护的临界区时,临界区将被锁上,其他需要访问此临界区的CPU只能忙等待,直到前面的CPU已访问完临界区,将临界区开锁.自旋锁上锁后让等待线程进行忙等待而不是睡眠阻塞,而信号量是让等待线程睡眠阻塞.自旋锁的忙等待浪费了处理器的时间,但时间通常很短,在1毫秒以下. 自旋锁用于多个CPU系统中,在单处理器系统中,自旋锁不起锁的作用,只是禁止或启用内核抢占.在自旋锁忙等待期间,内核抢占机制还是有效的,等待自旋锁释放的线程可能被更高优先级的线程抢占CP

JAVA之旅(十四)——静态同步函数的锁是class对象,多线程的单例设计模式,死锁,线程中的通讯以及通讯所带来的安全隐患,等待唤醒机制

JAVA之旅(十四)--静态同步函数的锁是class对象,多线程的单例设计模式,死锁,线程中的通讯以及通讯所带来的安全隐患,等待唤醒机制 JAVA之旅,一路有你,加油! 一.静态同步函数的锁是class对象 我们在上节验证了同步函数的锁是this,但是对于静态同步函数,你又知道多少呢? 我们做一个这样的小实验,我们给show方法加上static关键字去修饰 private static synchronized void show() { if (tick > 0) { try { Thread

MySQL索引使用:字段为varchar类型时,条件要使用''包起来

结论: 当MySQL中字段为int类型时,搜索条件where num='111' 与where num=111都可以使用该字段的索引.当MySQL中字段为varchar类型时,搜索条件where num='111' 可以使用索引,where num=111 不可以使用索引 验证过程: 建表语句: CREATE TABLE `gyl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `str` varchar(255) NOT NULL, `num` int(11)

synchronized同步代码块锁释放

今天发现自己写的线上程序出现数据库不能同步的问题,查看日志已经停止记录,随后使用jstack查看线程的运行状况,发现有个同步线程锁住了. 以下是jstack -l 637  问题线程的内容. "schedulerJob-t-291" #314 daemon prio=5 os_prio=0 tid=0x00007f7d64844800 nid=0x3d5 runnable [0x00007f7d3a107000] java.lang.Thread.State: RUNNABLE at

基于Zookeeper的分步式队列系统集成案例

基于Zookeeper的分步式队列系统集成案例 Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等. 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代

多线程同步-主线程等待所有子线程完成案例

有时候我们会遇到这样的问题:做一个大的事情可以被分解为做一系列相似的小的事情,而小的事情无非就是参数上有可能不相同而已! 此时,如果不使用线程,我们势必会浪费非常多的时间来完成整个大的事情,而使用线程的话将会存在这样的问题: 主线程启动所有子线程并发执行后主线程就直接返回了,导致外部函数判读整个大的事情完成了,但是实际上并没有完成! 针对以上情况我想我会采用多线程方式执行同时解决主线程等待子线程的问题.如图: 在这里我使用Java进行案例分析. 首先建立一个线程管理类,用于启动所有子线程和等待所

27 python 初学(信号量、条件变量、同步条件、队列)

参考博客: www.cnblogs.com/yuanchenqi/articles/5733873.html  semaphore 信号量: condition 条件变量: event 同步条件:条件同步和条件变量同步差不多意思,只是少了锁功能.因为条件同步设计于别访问共享资源的条件环境 多线程利器(queue):队列本身有一把锁 q.put('xiaoming', 0) q.get(0) q.qsize()  返回队列大小 q.empty() q.full() semaphore: # _au