注意:Barrier是PYTHON3才有的功能,在2中无法测试。
#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing from multiprocessing import Barrier, Lock, Process from time import time from datetime import datetime def test_with_barrier(synchronizer, seializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s -----> %s" % (name, datetime.fromtimestamp(now))) def test_without_barrier(): name = multiprocessing.current_process().name now = time() print("process %s -----> %s" % (name, datetime.fromtimestamp(now))) def worker(dictionary, key, item): dictionary[key] = item print(key, item) if __name__ == ‘__main__‘: synchronizer = Barrier(2) serializer = Lock() Process(name=‘p1 - test_with_barrier‘, target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name=‘p2 - test_with_barrier‘, target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name=‘p3 - test_without_barrier‘, target=test_without_barrier).start() Process(name=‘p4 - test_without_barrier‘, target=test_without_barrier).start() mgr = multiprocessing.Manager() dictionary = mgr.dict() jobs = [multiprocessing.Process (target=worker, args=(dictionary, i, i*2)) for i in range(10) ] for j in jobs: j.start() for j in jobs: j.join()
时间: 2024-10-28 19:41:35