那些年被我坑过的Python——第十章Broker(rabbitMQ/redis)

基于RabbitMQ的direct任务驱动异步RPC程序实现:

RPC_dispatcher指令分发器:

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 __Author__ = "Zhang Xuyao"
  4
  5 import pika
  6 import uuid
  7 import time
  8 import threading
  9
 10
 11 class RpcDispatcher(object):
 12     def __init__(self, rMQ_addr):
 13         self.cmd_list = [
 14             "ls", "df", "free",
 15             "ip", "ifconfig", "tail",
 16             "head", "grep", "uptime",
 17             "date"
 18         ]
 19         self.task_dict = {}
 20         self.task_fin_dict = {}
 21         self.routing_key_list = []
 22
 23         self.connection = pika.BlockingConnection(
 24             pika.ConnectionParameters(host=rMQ_addr)
 25         )
 26         self.channel = self.connection.channel()
 27
 28         # 收数据用
 29         recv_queue = self.channel.queue_declare(exclusive=True)
 30         self.recv_queue_name = recv_queue.method.queue
 31         self.channel.basic_consume(self._on_response, queue=self.recv_queue_name)
 32
 33         # 发数据用
 34         self.channel.exchange_declare(exchange=‘send‘, type=‘direct‘)
 35         self.send_queue = self.channel.queue_declare()
 36         self.send_queue_name = self.send_queue.method.queue
 37
 38     # 获取指定通道的响应(4)###
 39     def _on_response(self, ch, method, parameters, msg):
 40         if parameters.correlation_id in self.task_dict:
 41             self.task_dict[parameters.correlation_id][parameters.app_id]["response"] = msg.decode("utf-8")
 42             ch.basic_ack(delivery_tag=method.delivery_tag)
 43             self.task_dict[parameters.correlation_id][parameters.app_id]["recv_time"] = time.time()
 44             fin_flag = False
 45             for host in self.task_dict[parameters.correlation_id]:
 46                 if self.task_dict[parameters.correlation_id][host]["response"] != None:
 47                     continue
 48                 else:
 49
 50                     break
 51             # 执行结果全部收到就把记录转移到已经完成容器中,根据检查已经完成容器中的内容用于确定任务的状态
 52             else:
 53                 self.task_fin_dict[parameters.correlation_id] = self.task_dict[parameters.correlation_id]
 54                 del self.task_dict[parameters.correlation_id]
 55
 56     # 发送请求(2)######
 57     def _on_request(self, input_cmd, host_list):
 58         print(RpcDispatcher.colorStr("[x]Requesting>>: ‘%s‘ on %s"
 59                                      % (cmd, tuple(host_list)), 33))
 60         self.response = None
 61         # 生成全局校验码
 62         corr_id = str(uuid.uuid4())
 63         print(RpcDispatcher.colorStr("[x]Task_id>>: %s"
 64                                      % corr_id, 34))
 65         self.task_dict[corr_id] = {}
 66         if host_list:
 67             for host in host_list:
 68                 self.task_dict[corr_id][host] = {
 69                     "cmd": input_cmd,
 70                     "response": None,
 71                     "req_time": time.time(),
 72                     "recv_time": None,
 73                 }
 74                 # 绑定routing_key准备并发布消息
 75                 self.channel.queue_bind(exchange=‘send‘,
 76                                         queue=self.send_queue_name,
 77                                         routing_key=host)
 78                 # 向执行器并发布消息指令
 79                 self.channel.basic_publish(exchange=‘send‘,
 80                                            routing_key=host,
 81                                            properties=pika.BasicProperties(
 82                                                reply_to=self.recv_queue_name,
 83                                                correlation_id=corr_id,
 84                                            ),
 85                                            body=str(input_cmd))
 86                 # 消息发布后解除routing_key绑定
 87                 self.channel.queue_unbind(exchange=‘send‘,
 88                                           queue=self.send_queue_name,
 89                                           routing_key=host)
 90
 91             # 守护线程负责不断检测响应结果是否全部收到
 92             on_recv_thread = threading.Thread(target=self._on_recv, args=[corr_id, ])
 93             on_recv_thread.setDaemon(True)
 94             on_recv_thread.start()
 95
 96     # 等待数据消息(3)
 97     def _on_recv(self, task_id):
 98         # 根据检查已经完成容器中的指定task_id是否存在来确定任务的状态,为空则继续收取消息
 99         while task_id not in self.task_fin_dict:
100             self.connection.process_data_events()
101
102     # 显示已经完成的任务编号(5)
103     def show_task_fin(self):
104         print("尚未查看的任务:")
105         for task_id in self.task_fin_dict:
106             value_list = tuple(self.task_fin_dict[task_id].values())
107             host_list = tuple(self.task_fin_dict[task_id].keys())
108             cmd = str(value_list[0]["cmd"])
109             print(RpcDispatcher.colorStr("[task_id]: %s | [cmd_info]: ‘%s‘ | [host_list]: %s"
110                                          % (task_id, cmd, host_list), 32))
111
112     # 获取指定任务的执行结果(6)
113     def get_response(self, task_id):
114         if task_id in self.task_fin_dict:
115             for host in self.task_fin_dict[task_id]:
116                 response = self.task_fin_dict[task_id][host]["response"]
117                 cmd_req = self.task_fin_dict[task_id][host]["cmd"]
118                 time_cost = self.task_fin_dict[task_id][host]["recv_time"] - 119                             self.task_fin_dict[task_id][host]["req_time"]
120                 time_cost = round(time_cost, 3)
121                 print(RpcDispatcher.colorStr("Host: %s           | Cmd: ‘%s‘ \nTime Cost: %ss | Response: "
122                                              % (host, cmd_req, time_cost), 33))
123                 print(RpcDispatcher.colorStr(response, 36))
124             del self.task_fin_dict[task_id]
125         else:
126             print("任务结果尚未全部返回")
127
128     # 接收外部输入,调用请求(1)
129     def call(self, cmd, host_list):
130         return self._on_request(cmd, host_list)
131
132     @staticmethod
133     def colorStr(aStr, color_code):
134         return "\033[0;" + str(color_code) + ";0m" + aStr + "\033[0m"
135
136     def __del__(self):
137         self.connection.close()
138
139
140 if __name__ == ‘__main__‘:
141     cmd_rpc = RpcDispatcher(rMQ_addr="localhost")
142     while True:
143         cmd = input("[$]Cmd>>:")
144         if cmd.lower() != ‘eof‘ and cmd.lower() != ‘exit‘:
145             if cmd.split()[0].lower() in ["$s"]:
146                 cmd_rpc.show_task_fin()
147             elif cmd.split()[0].lower() in ["$c"] and len(cmd.split()) == 2:
148                 cmd_rpc.get_response(cmd.split()[1])
149             else:
150                 cmd_split = cmd.split()
151                 has_host = cmd_split.count("--hosts")
152                 if has_host != 1:
153                     print(cmd_rpc.colorStr("Usage <cmd> --hosts <ip1>[,<ip2>[,<ip3>...]]", 35))
154                     continue
155                 else:
156                     if len(cmd_split) <= cmd_split.index("--hosts") + 1:
157                         print("请至少指定一个主机IP")
158                         continue
159                     host_list = cmd_split[cmd_split.index("--hosts") + 1].split(‘,‘)
160                     cmd = " ".join(cmd_split[0:cmd_split.index("--hosts")]).strip()
161
162                     if cmd.split()[0] in cmd_rpc.cmd_list:
163                         cmd_rpc.call(cmd, host_list)
164                     else:
165                         print("您输入的命令暂不支持...")
166                         continue
167         else:
168             break

RPC 指令分发器代码

RPC_executor指令执行器:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __Author__ = "Zhang Xuyao"
 4
 5 import pika
 6 import os, time
 7
 8
 9 class RpcExecutor(object):
10     def __init__(self, rMQ_addr, ip):
11         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
12             host=rMQ_addr))
13         self.channel = self.connection.channel()
14
15         # 接收命令用
16         self.ip = ip
17         self.channel.exchange_declare(exchange=‘send‘, type=‘direct‘)
18         self.send_queue = self.channel.queue_declare()
19         self.send_queue_name = self.send_queue.method.queue
20         self.channel.queue_bind(exchange=‘send‘, routing_key=self.ip, queue=self.send_queue_name)
21         self.channel.basic_consume(self._on_request, queue=self.send_queue_name)
22
23     # 开始订阅消息
24     def run(self):
25         print(" [x] Awaiting RPC requests")
26         self.channel.start_consuming()
27
28     # 执行消息命令
29     def exec_cmd(self, cmd_str):
30         result = os.popen(cmd_str).read()
31         if not result:
32             return "命令没有输出结果"
33         else:
34             return result
35
36     # 请求事件的回调函数
37     def _on_request(self, ch, method, props, body):
38         cmd_str = body.decode(‘utf-8‘)
39         print(" [.] exec_cmd(%s)" % cmd_str)
40         response = self.exec_cmd(cmd_str)
41
42         # 发送命令结果,通过传来的queue进行发布,这里的app_id是额外增加的IP信息,区分于其他执行器的响应结果
43         ch.basic_publish(exchange=‘‘,
44                          routing_key=props.reply_to,
45                          properties=pika.BasicProperties(
46                              correlation_id=props.correlation_id,
47                              app_id=self.ip
48                          ),
49                          body=str(response)
50                          )
51         # 消息反馈确认,确保对方确实收到了响应结果
52         ch.basic_ack(delivery_tag=method.delivery_tag)
53
54     def __del__(self):
55         self.connection.close()
56
57
58 if __name__ == ‘__main__‘:
59     ip = input(‘请输入IP地址>>:‘)
60     cmd_executor = RpcExecutor(‘localhost‘, ip)
61     cmd_executor.run()

RPC 执行器代码

时间: 2024-10-29 04:26:18

那些年被我坑过的Python——第十章Broker(rabbitMQ/redis)的相关文章

python命令行下安装redis客户端

1. 安装文件: https://pypi.python.org/pypi/setuptools 直接下载然后拷贝到python目录下同下面步骤 下载 ez_setup.py>>> from urllib.request import urlopen >>>data = urlopen('https://bootstrap.pypa.io/ez_setup.py') >>>open('ez_setup.py','wb').write(data.read

关于python+django的又一坑,请python班的同学进来看

事情是这样的. 在使用django1.6+的时候,默认会吧session存放在数据库django_session表里. 如果要把session放在内存中,就应该在settings.py 中配置 SESSION_ENGINE = "django.contrib.sessions.backends.cache" 使用的时候: 新增:request.session["userName"] = "Tom" 查询:name = request.sessio

廖雪峰js教程笔记6 generator一个坑 看完python在回来填坑

generator(生成器)是ES6标准引入的新的数据类型.一个generator看上去像一个函数,但可以返回多次. ES6定义generator标准的哥们借鉴了Python的generator的概念和语法,如果你对Python的generator很熟悉,那么ES6的generator就是小菜一碟了.如果你对Python还不熟,赶快恶补Python教程!. 我们先复习函数的概念.一个函数是一段完整的代码,调用一个函数就是传入参数,然后返回结果: function foo(x) { return

那些年被我坑过的Python——道阻且长(第五章实用模块讲解)

random模块 我的随机验证吗程序: 首先保证了字母和数字出现的概率是50% VS 50%,其次是可以订制输出多少位 1 def Captcha(size): 2 Captcha_list = [] 3 for i in range(size): 4 rand_num = random.randint(1, 2) 5 if rand_num == 1: 6 Captcha_list.append(chr(random.randint(65, 90))) 7 elif rand_num == 2

那些年被我坑过的Python——山外有山(第四章)

装饰器: 定义: 本质是函数,(装饰其他函数)就是为其他函数添加附加功能原则: 1.不能修改被装饰的函数的源代码 2.不能修改被装饰的函数的调用方式 优点: 装饰器带来的最直观的好处:减少对函数的细化修改,批量增加新功能. 实现装饰器必备知识:1.函数即“变量” 函数的原理与变量引用相似,都是将一个命名于内存对象进行映射2.高阶函数 满足下列两个条件之一的则为高阶函数: a.把一个函数名当作实参传递给另一个函数(在不修改被装饰函数代码的情况下为它增加功能) b.一个函数返回值中包含另一个函数名(

那些年被我坑过的Python——第十三章:一夫当关(设计堡垒机)

  堡垒机架构 堡垒机的主要作用权限控制和用户行为审计,堡垒机就像一个城堡的大门,城堡里的所有建筑就是你不同的业务系统 , 每个想进入城堡的人都必须经过城堡大门并经过大门守卫的授权,每个进入城堡的人必须且只能严格按守卫的分配进入指定的建筑,且每个建筑物还有自己的权限访 问控制,不同级别的人可以到建筑物里不同楼层的访问级别也是不一样的.还有就是,每个进入城堡的人的所有行为和足迹都会被严格的监控和纪录下来,一旦发生 犯罪事件,城堡管理人员就可以通过这些监控纪录来追踪责任人. 堡垒要想成功完全记到他的

[踩坑] Django &quot;OverflowError: Python int too large to convert to C long&quot; 错误

转自:https://blog.csdn.net/June7_/article/details/99991680 问题描述 使用Django框架,在使用model操作数据库的时候,出现 Django "OverflowError: Python int too large to convert to C long" 错误. 以下参照https://blog.csdn.net/June7_/article/details/99991680 解决该问题. 注意:该错误出现原因不仅是mode

那些年被我坑过的Python【转】

原文:http://www.cnblogs.com/tntxyz/p/5762371.html 集合类型: 集合类型中的元素是唯一的! 集合的定义与赋值: 1 set_1 = set([1, 3, 5, 7, 2]) 2 set_2 = set([2, 4, 6, 8, 3]) 集合的运算操作 1 # 交集 2 print(set_1.intersection(set_2)) 3 # 并集 4 print(set_1.union(set_2)) 5 # 差集 6 print(set_1.diff

Python操作数据库(mysql redis)

一.python操作mysql数据库: 数据库信息:(例如211.149.218.16   szz  123456) 操作mysql用pymysql模块 #操作其他数据库,就安装相应的模块 import  pymysql ip='211.149.218.16' port=3306 passwd='123456' user='root' db='szz' conn=pymysql.connect(host=ip,user=user,port=port,passwd=passwd,db=db,cha