python+rabbitmq实现分布式

#master

# -*- coding: utf-8 -*-import sys#reload(sys)sys.setdefaultencoding("utf-8")

import pymongo

import random, time, Queuefrom multiprocessing import freeze_supportfrom multiprocessing.managers import BaseManager

global connetionconnetion=pymongo.MongoClient(‘192.168.30.79‘,27017)#connection=pymongo.MongoClient(‘mongodb://10.10.5.216:27017‘)dbs=connetion.database_names()for i in dbs:    print("mongdb connect successful")    print (i+"\n")db=connetion.get_database(‘5wdomian_0519‘)table11=db.get_collection(‘DeviceData‘)table12=db.get_collection(‘DeviceDataSource‘)table13=db.get_collection(‘Device‘)# ##################################################################### 发送任务的队列:task_queue = Queue.Queue(maxsize=100)# 接收结果的队列:result_queue = Queue.Queue()

# 从BaseManager继承的QueueManager:class QueueManager(BaseManager):    pass

def return_task_queue():    global task_queue    return task_queue

def return_result_queue():    global result_queue    return result_queue

def test():    # 把两个Queue都注册到网络上, callable参数关联了Queue对象:    # QueueManager.register(‘get_task_queue‘, callable=lambda: task_queue)    # QueueManager.register(‘get_result_queue‘, callable=lambda: result_queue)    QueueManager.register(‘get_task_queue‘, callable=return_task_queue)    QueueManager.register(‘get_result_queue‘, callable=return_result_queue)

    # 绑定端口5000, 设置验证码‘abc‘:    manager = QueueManager(address=(‘192.168.30.72‘, 5001), authkey=b‘abc‘)    # 启动Queue:    manager.start()    # 获得通过网络访问的Queue对象:    task = manager.get_task_queue()    result = manager.get_result_queue()    #%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%    type=‘type‘    sourceId=‘sourceId‘    devId=‘devId‘    content=‘content‘    fileId=‘fileId‘    subName=‘subName‘    name=‘name‘    aa=[]    bb=[]    cc=[]    #aa=list(type1[:])    config=‘config‘    cdpTable=‘cdpTable‘    routeTable=‘routeTable‘    showCommand=‘showCommand‘    arpTable=‘arpTable‘    stpTable=‘stpTable‘    macTable=‘macTable‘    commonTable=‘commonTable‘    nctTable=‘nctTable‘    bgpNbrTable=‘bgpNbrTable‘    dis_benchID=‘0d537ee8-ecec-4f45-9d52-e5660300d92d‘    k=0     type1=table11.find({sourceId:dis_benchID},{type:1,sourceId:1,devId:1,content:1,fileId:1,subName:1,name:1},no_cursor_timeout=True)    type2=[]    #%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%    # 放几个任务进去:    try:      for i in type1:

        type2.append(i)        if len(type2)==5:          print (len(type2))                    task.put(type2)          type2=[]          #print (i)          k+=5          print (k)          print (‘queue size is‘+str(task.qsize()))      if len(type2)!=0:          task.put(type2)      except Exception as e:          print (e)       # 从result队列读取结果:    type1.close()    print(‘Try get results...‘)    #for i in range(100):    r=result.get()    #print (r)     #   print(‘Result: %s‘ % r)    # 关闭:    while r==‘done‘:        manager.shutdown()        print(‘master exit.‘)

if __name__ == ‘__main__‘:    freeze_support()    test()

#worker
# -*- coding : utf-8 -*-import sysreload(sys)sys.setdefaultencoding("utf-8")import osimport pymongoimport datetimefrom  gridfs  import GridFS

import time, sys, Queuefrom multiprocessing.managers import BaseManager#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%global connetionconnetion=pymongo.MongoClient(‘192.168.30.79‘,27017)path=‘C:\\soft‘#connection=pymongo.MongoClient(‘mongodb://10.10.5.216:27017‘)dbs=connetion.database_names()for i in dbs:    print("mongdb connect successful")    print (i+"\n")db=connetion.get_database(‘5wdomian_0519‘)#table11=db.get_collection(‘DeviceData‘)table12=db.get_collection(‘DeviceDataSource‘)table13=db.get_collection(‘Device‘)#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

class QueueManager(BaseManager):    pass

QueueManager.register(‘get_task_queue‘)QueueManager.register(‘get_result_queue‘)#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%type=‘type‘sourceId=‘sourceId‘devId=‘devId‘content=‘content‘fileId=‘fileId‘subName=‘subName‘name=‘name‘aa=[]bb=[]cc=[]#aa=list(type1[:])config=‘config‘cdpTable=‘cdpTable‘routeTable=‘routeTable‘showCommand=‘showCommand‘arpTable=‘arpTable‘stpTable=‘stpTable‘macTable=‘macTable‘commonTable=‘commonTable‘nctTable=‘nctTable‘bgpNbrTable=‘bgpNbrTable‘dis_benchID=‘79aa8379-5bf4-42a7-9682-c71cf95f9873‘

#type1=table11.find({},{type:1,sourceId:1,devId:1,content:1,fileId:1,subName:1,name:1},no_cursor_timeout=True)#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

server_addr = ‘192.168.30.72‘print(‘Connect to server %s...‘ % server_addr)

m = QueueManager(address=(server_addr, 5001), authkey=b‘abc‘)

try:    m.connect()except :    print(‘please start task_master.py!‘)    sys.exit("sorry, goodbye!");

task = m.get_task_queue()result = m.get_result_queue()

try:  while True:      if not task.empty():      b=task.get()      for a in b:         print (a)        key1=a.keys()        if  fileId not in key1:            if  sourceId not in key1:                   if not os.path.exists(path+"\\errdata"):                       os.mkdir(path+"\\errdata")                   else:                       with open(path+"\\errdata\\errdata.txt",‘a‘) as fe:                           fe.write(str(a)+"\n")            else:

                   #bb=list(table22[:])

                   #cc=list(table33[:])                   table22=table12.find({},{‘startTime‘:‘True‘,‘srcType‘:‘True‘},no_cursor_timeout=True)                   table33=table13.find({},{‘name‘:‘True‘,‘mip‘:‘True‘},no_cursor_timeout=True)                   for b in table22:                       if(a[u‘sourceId‘]==b[u‘_id‘]):                           b1=(str(b[u‘startTime‘])+str(b[u‘srcType‘])).replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                           path1=path+"\\"+b1                           if not os.path.exists(path1):                               os.mkdir(path1)                           ty=str(a[u‘type‘])

                           l1=len(str(ty))                           if (ty[(l1-4):]==‘Orig‘):                               if(ty[0:(l1-4)]==arpTable):                                   typename=‘ArpTable‘                               elif(ty[0:(l1-4)]==cdpTable):                                   typename=‘CdpTable‘                               elif(ty[0:(l1-4)]==bgpNbrTable):                                   typename=‘BGPAdRoutingTable‘                               elif(ty[0:(l1-4)]==routeTable):                                   typename=‘RoutingTable‘                               elif(ty[0:(l1-4)]==config):                                   typename=‘ConfigFile‘                               elif(ty[0:(l1-4)]==showCommand):                                   typename=‘ShowCommand‘                               elif(ty[0:(l1-4)]==stpTable):                                   typename=‘StpTable‘                               elif(ty[0:(l1-4)]==macTable):                                   typename=‘MacTable‘                               elif(ty[0:(l1-4)]==nctTable):                                   typename=‘CommonTable‘                               else:typename=‘unkown‘+ty                               path2=path1+"\\"+typename                               if not os.path.exists(path2):                                   os.mkdir(path2)

                               for c in table33:                                   if (a[u‘devId‘]==c[u‘_id‘]):                                     if(ty[0:(l1-4)]==arpTable):                                       if  "subName" in key1:                                           hostname1=str(c[u‘name‘]+"$management.txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                           hostname2=path2+"\\"+hostname1                                           with open(hostname2,‘w‘) as f1:                                               f1.write(a[u‘content‘])                                       else:                                           hostname1=str(c[u‘name‘]+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                           hostname2=path2+"\\"+hostname1                                           with open(hostname2,‘w‘) as f1:                                               f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==cdpTable):                                          hostname1=str(c[u‘name‘]+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname1                                          with open(hostname2,‘w‘) as f1:                                           f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==routeTable):                                          hostname1=str(c[u‘name‘])                                          if "subName" in key1:                                              hostname_file=str(hostname1+"$"+a[u‘subName‘][‘vrf‘]+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                              hostname2=path2+"\\"+hostname_file                                              with open(hostname2,‘w‘) as f1:                                                  f1.write(a[u‘content‘])                                          else:                                              hostname_file=str(hostname1+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                              hostname2=path2+"\\"+hostname_file                                              with open(hostname2,‘w‘) as f1:                                                  f1.write(a[u‘content‘])

                                     elif(ty[0:(l1-4)]==showCommand):                                          hostname1=str(c[u‘name‘]+"$0$"+a[u‘name‘]+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname1                                          with open(hostname2,‘w‘) as f1:                                           f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==config):                                          hostname1=str(c[u‘name‘]+".config").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname1                                          with open(hostname2,‘wb‘) as f1:                                           f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==macTable):                                          hostname1=str(c[u‘name‘]+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname1                                          with open(hostname2,‘w‘) as f1:                                           f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==stpTable):                                          hostname1=str(c[u‘name‘]+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname1                                          with open(hostname2,‘w‘) as f1:                                           f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==bgpNbrTable):                                        if "subName" in key1:                                          hostname1=str(c[u‘name‘])                                          hostname_ip=a[u‘subName‘][‘peIp‘]                                          key2=a[u‘subName‘].keys()                                          if "vrf" in a[u‘subName‘].keys():                                              hostname_vrf=a[u‘subName‘][‘vrf‘]                                              hostname_file=str(hostname1+"$"+hostname_ip+"$"+hostname_vrf+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                              hostname2=path2+"\\"+hostname_file                                              with open(hostname2,‘w‘) as f1:                                                 f1.write(a[u‘content‘])                                          else:                                              hostname_file=str(hostname1+"$"+hostname_ip+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                              hostname2=path2+"\\"+hostname_file                                              with open(hostname2,‘w‘) as f1:                                                 f1.write(a[u‘content‘])

                                        else:                                          hostname1=str(c[u‘name‘])                                          hostname_ip=c[u‘mip‘]                                          hostname_file=str(hostname1+"$"+hostname_ip+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname_file                                          with open(hostname2,‘w‘) as f1:                                           f1.write(a[u‘content‘])                                     elif(ty[0:(l1-4)]==nctTable):

                                          hostname1=str(c[u‘name‘])                                          hostname_name=a[u‘name‘]

                                          hostname_file=str(hostname1+"$$"+hostname_name+".txt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                          hostname2=path2+"\\"+hostname_file                                          with open(hostname2,‘w‘) as f1:                                           f1.write(a[u‘content‘])

                           else:                               if(ty==arpTable):                                   typename=‘ArpTable‘                               elif(ty==cdpTable):                                   typename=‘CdpTable‘                               elif(ty==bgpNbrTable):                                   typename=‘BGPAdRoutingTable‘                               elif(ty==routeTable):                                   typename=‘RoutingTable‘                               elif(ty==config):                                   typename=‘ConfigFile‘                               elif(ty==showCommand):                                   typename=‘ShowCommand‘                               elif(ty==stpTable):                                   typename=‘StpTable‘                               elif(ty==macTable):                                   typename=‘MacTable‘                               elif(ty==nctTable):                                   typename=‘CommonTable‘                               else:typename=‘unkown‘+ty                               path2=path1+"\\"+typename                               if not os.path.exists(path2):                                   os.mkdir(path2)                               for c in table33:                                   if (a[u‘devId‘]==c[u‘_id‘]):                                       if(ty==arpTable):                                           hostname=str(c[u‘name‘])                                           if "subName" in key1:                                               hostname_vrf=a[u‘subName‘][‘vrf‘]                                               filename=str(hostname+"$"+hostname_vrf+"$"+"default-live.oat").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                           else:                                               filename=str(hostname+"$$"+"default-live.oat").replace("*","XD").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==cdpTable):                                           hostname=str(c[u‘name‘])                                           if "subName" in key1:                                               hostname_vrf=a[u‘subName‘][‘vrf‘]                                               filename=str(hostname+"$"+hostname_vrf+"$"+"default-live.cdp").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                           else:                                               filename=str(hostname+"$$"+"default-live.cdp").replace("*","XD").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==routeTable):                                           hostname=str(c[u‘name‘])                                           if "subName" in key1:                                               hostname_vrf=a[u‘subName‘][‘vrf‘]                                               filename=str(hostname+"$"+hostname_vrf+"$"+"default-live.ort").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                           else:                                               filename=str(hostname+"$$"+"default-live.ort").replace("*","XD").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==showCommand):                                               hostname=str(c[u‘name‘])                                               hostname_vrf=str(a[u‘name‘])                                               filename=str(hostname+"$0$"+hostname_vrf+".txt").replace("*","XD").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==config):                                               hostname=str(c[u‘name‘]).replace("*","XD").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)

                                               filename=hostname+".config"                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘wb‘) as f2:                                                 f2.write(a[u‘content‘].decode(‘utf8‘))                                       elif(ty==macTable):                                           hostname=str(c[u‘name‘])                                           if "subName" in key1:                                               hostname_vrf=a[u‘subName‘][‘vrf‘]                                               filename=str(hostname+"$"+hostname_vrf+"$"+"default-live.omt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                           else:                                               filename=str(hostname+"$$"+"default-live.omt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==stpTable):                                           hostname=str(c[u‘name‘])                                           if "subName" in key1:                                               hostname_vrf=a[u‘subName‘][‘vrf‘]                                               filename=str(hostname+"$"+hostname_vrf+"$"+"default-live.stp").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                           else:                                               filename=str(hostname+"$$"+"default-live.stp").replace("*","XD").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==bgpNbrTable):                                           hostname=str(c[u‘name‘])                                           if "subName" in key1:                                               hostname_ip=a[u‘subName‘][‘peIp‘]                                               if "vrf" in a[u‘subName‘].keys():                                                   hostname_vrf=a[u‘subName‘][u‘vrf‘]                                                   filename=str(hostname+"$"+hostname_ip+"$"+"default-live"+"$"+hostname_vrf+".obgprt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                                   hostname4=path2+"\\"+filename                                                   with open(hostname4,‘w‘) as f2:                                                      f2.write(a[u‘content‘])                                               else:                                                   filename=str(hostname+"$"+hostname_ip+"$"+"default-live.obgprt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                                   hostname4=path2+"\\"+filename                                                   with open(hostname4,‘w‘) as f2:                                                      f2.write(a[u‘content‘])

                                           else:                                               hostname_ip=c[u‘mip‘]

                                               filename=str(hostname+"$"+hostname_ip+"$"+"default-live.obgprt").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                                       elif(ty==nctTable):                                               hostname=str(c[u‘name‘])

                                               hostname_name=a[u‘name‘]

                                               filename=str(hostname+"$$"+hostname_name+".csv").replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                                               hostname4=path2+"\\"+filename                                               with open(hostname4,‘w‘) as f2:                                                 f2.write(a[u‘content‘])                   table33.close()                   table22.close()        else:          if a[u‘fileId‘]!=None:            fs=GridFS(db)            table22=table12.find({},{‘startTime‘:‘True‘})            bb=list(table22[:])            table33=table13.find({},{‘name‘:‘True‘})            cc=list(table33[:])            for b in bb:                if(a[u‘sourceId‘]==b[u‘_id‘]):                   b1=str(b[u‘startTime‘]).replace("X","XX").replace("*","XD").replace("/","XA").replace(":","XC").replace("<","XG").replace(">","XH").replace("|","XI").replace("\\","XB").replace("?","XE").replace(‘"‘,‘XF‘)                   path1=path+"\\"+b1                   if not os.path.exists(path1):                       os.mkdir(path1)                   fileid=a[u‘fileId‘]                   print (fileid)                   path2=path1+"\\"+str(fileid)+‘.big‘                   f=fs.get(fileid).read()                   with open(path2,‘w‘) as f3:                       f3.write(f)        #time.sleep(1)   else:       break    except Queue.Empty:        print(‘task queue is empty.‘)        #result.put(‘done‘)

time.sleep(10)print(‘worker exit...‘)

if __name__ == ‘__main__‘:    pass
时间: 2024-10-09 00:35:58

python+rabbitmq实现分布式的相关文章

python RabbitMQ队列/redis

RabbitMQ队列 安装 http://www.rabbitmq.com/install-standalone-mac.html 安装python rabbitMQ module 1 2 3 4 5 6 7 pip install pika or easy_install pika or 源码   https://pypi.python.org/pypi/pika 实现最简单的队列通信 produce 1 import pika 2 connection = pika.BlockingConn

python RabbitMQ队列使用(入门篇)

---恢复内容开始--- python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换,这时候我们就需要一个中间件,来实现程序之间的通讯. RabbitMQ MQ并不是python内置的模块,而是一个需要你额外安装(ubunto可直接apt-get其余请自行百度.)的程序,安装完毕

RabbitMQ解决分布式事务

案例:经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯. RabbitMQ解决分布式事务原理: 采用最终一致性原理.需要保证以下三要素1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)2.MQ消费者消息能够正确消费消息,采用手动ACK模式,使用不补偿机制(注意重试幂等性问题)3.如何保证第一个事务先执行,采用补偿机制(补单机制),在创建一个补单消费者进行监听,如果订单没有创建成功,进

RabbitMq解决分布式事物

一.RabbitMQ解决分布式事务思路: 案例: 经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯. 二.RabbitMQ解决分布式事务原理:采用最终一致性原理. 需要保证以下三要素 1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制) 2.MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题) 3.如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听

python+rabbitMQ抓取某婚恋网站用户数据

"总是向你索取却不曾说谢谢你----",在博客园和知乎上面吸收了很多知识,以后也会在这里成长,这里挺好,谢谢博客园和知乎,所以今天也把自己在项目期间做的东西分享一下,希望对朋友们有所帮助.... 废话少说,let's go----! 需求: 项目需要做一个婚恋网站,主要技术有nginx,服务器集群,redis缓存,mysql主从复制,amoeba读写分离等等,我主要用rabbitMQ+python完成并实现了数据爬取工作(数据库写入及图片下载保存),速度的话公司的电脑爬的(i5+16g

Python—RabbitMQ

RabbitMQ RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统 安装 因为RabbitMQ由erlang实现,先安装erlang #安装配置epel源 rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm #安装erlang yum -y install erlang #安装RabbitMQ yum -y install rabbitmq-server #启动/关闭

python—Celery异步分布式

Celery异步分布式 Celery是一个python开发的异步分布式任务调度模块 Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等 使用redis连接url的格式为: redis://:[email protected]:port/db_number 例如: BROKER_URL = 'redis://localhost:6379/0' 1)huang.py from celery import Celery bro

基于RabbitMQ实现分布式延时任务调度

一.分布式延时任务 传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成.这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行. 然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行).基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性.发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一

Python rabbitmq

一.RabbitMQ安装 安装配置epel源    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang    $ yum -y install erlang 安装RabbitMQ    $ yum -y install rabbitmq-server 二.启动rabbitmq /etc/init.d/rabbitmq-server start [[email pro