A服务器是master,B服务器为worker,
A服务器上执行taskManger.py
# coding:utf-8 import random,time,Queue from multiprocessing.managers import BaseManager #实现第一步:建立task_queue和result_queue,用来存放任务和结果 task_queue=Queue.Queue() result_queue=Queue.Queue() class Queuemanager(BaseManager): pass #实现第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象, # 将Queue对象在网络中暴露 Queuemanager.register(‘get_task_queue‘,callable=lambda:task_queue) Queuemanager.register(‘get_result_queue‘,callable=lambda:result_queue) #实现第三步:绑定端口8001,设置验证口令‘qiye’。这个相当于对象的初始化 manager=Queuemanager(address=(‘‘,8001),authkey=‘lsf‘) #实现第四步:启动管理,监听信息通道 manager.start() #实现第五步:通过管理实例的方法获得通过网络访问的Queue对象 task=manager.get_task_queue() result=manager.get_result_queue() #实现第六步:添加任务 for url in ["ImageUrl_"+str(i) for i in range(10)]: print ‘put task %s ...‘ %url task.put(url) #获取返回结果 print ‘try get result...‘ for i in range(10): print ‘result is %s‘ %result.get(timeout=10) #关闭管理 manager.shutdown()
B服务器上执行worker的脚本taskWorker.py
#coding:utf-8 import time from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 实现第一步:使用QueueManager注册获取Queue的方法名称 QueueManager.register(‘get_task_queue‘) QueueManager.register(‘get_result_queue‘) # 实现第二步:连接到服务器: server_addr = ‘127.0.0.1‘ print(‘Connect to server %s...‘ % server_addr) # 端口和验证口令注意保持与服务进程设置的完全一致: m = QueueManager(address=(server_addr, 8001), authkey=‘lsf‘) # 从网络连接: m.connect() # 实现第三步:获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 实现第四步:从task队列取任务,并把结果写入result队列: while(not task.empty()): image_url = task.get(True,timeout=5) print(‘run task download %s...‘ % image_url) time.sleep(1) result.put(‘%s--->success‘%image_url) # 处理结束: print(‘worker exit.‘)
时间: 2024-10-16 07:14:20