今天用python实现分布式,基于python2.7,注意:在linux下执行测试通过,在windows测试失败。# -*- coding: utf-8 -*-__author__ = ‘dell‘import random, time, Queuefrom multiprocessing.managers import BaseManager # 发送任务的队列:task_queue = Queue.Queue()# 接收结果的队列:result_queue = Queue.Queue() # 从BaseManager继承的QueueManager:class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象:QueueManager.register(‘get_task_queue‘, callable=lambda: task_queue)QueueManager.register(‘get_result_queue‘, callable=lambda: result_queue)# 绑定端口5000, 设置验证码‘abc‘:manager = QueueManager(address=(‘‘, 5000), authkey=‘abc‘)# 启动Queue:manager.start()# 获得通过网络访问的Queue对象:task = manager.get_task_queue()result = manager.get_result_queue()# 放几个任务进去:for i in range(10): n = random.randint(0, 10000) print(‘Put task %d...‘ % n) task.put(n)# 从result队列读取结果:print(‘Try get results...‘)for i in range(10): r = result.get(timeout=10) print(‘Result: %s‘ % r)# 关闭:manager.shutdown() =======================================
# -*- coding: utf-8 -*-__author__ = ‘dell‘import time, sys, Queuefrom multiprocessing.managers import BaseManager # 创建类似的QueueManager:class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:QueueManager.register(‘get_task_queue‘)QueueManager.register(‘get_result_queue‘) # 连接到服务器,也就是运行taskmanager.py的机器:server_addr = ‘127.0.0.1‘ #这里修改,如果是同一机器运行,不需要改;如果是别的机器运行,改为相应ip即可。print(‘Connect to server %s...‘ % server_addr)# 端口和验证码注意保持与taskmanager.py设置的完全一致:m = QueueManager(address=(server_addr, 5000), authkey=‘abc‘)# 从网络连接:m.connect()# 获取Queue的对象:task = m.get_task_queue()result = m.get_result_queue()# 从task队列取任务,并把结果写入result队列:for i in range(10): try: n = task.get(timeout=1) print(‘run task %d * %d...‘ % (n, n)) r = ‘%d * %d = %d‘ % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print(‘task queue is empty.‘)# 处理结束:print(‘worker exit.‘)
时间: 2024-10-12 09:08:00