需求
题目:rpc命令端 需求: 可以异步的执行多个命令 对多台机器 >>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>:
实现需求
1. 实现全部需求
2.会缓存已建立过的连接,减少短时间内连接相同主机时再次建立连接的开销
3.定时清理缓存的连接
目录结构
rabbitmq_server ├ bin # 执行文件目录 | └ rabbitmq_server.py # 执行程序接口 ├ conf # 配置文件目录 | └ setting.py # 配置文件。目前主要保存用以连接RabbitMQ服务器的远程用户权限 └ core # 程序核心代码位置 └ main.py # 主交互逻辑 rabbitmq_client ├ bin # 执行文件目录 | └ rabbitmq_client.py # 执行程序 ├ conf # 配置文件目录 | └ setting.py # 配置文件。目前主要保存用以连接RabbitMQ服务器的远程用户权限,以及缓存连接的保存时间 └ core # 程序核心代码位置 └ main.py # 主逻辑交互程序
代码
rabbitmq_server
1 import os,sys 2 3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 sys.path.insert(0,BasePath) 5 6 from core import main 7 main.main()
rabbitmq_server.py
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 import os,sys,pika,subprocess,locale,threading 6 7 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 8 sys.path.insert(0,BasePath) 9 10 from conf import setting 11 12 sys_encode = locale.getdefaultlocale()[1] 13 username = setting.username 14 password = setting.password 15 16 credentials = pika.PlainCredentials(username,password) 17 18 19 def cb(ch,method,properties,body): 20 command = body.decode(‘utf-8‘) 21 print(‘Received:‘,command) 22 23 res = subprocess.Popen(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 24 out,err = res.communicate() 25 res_con = err.decode(sys_encode) if err else out.decode(sys_encode) 26 27 ch.basic_publish( 28 exchange = ‘‘, 29 routing_key = properties.reply_to, 30 properties = pika.BasicProperties( 31 correlation_id = properties.correlation_id 32 ), 33 body = res_con 34 ) 35 print(‘send:‘,res_con) 36 37 38 39 def main(): 40 try: 41 conn = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘, 5672, ‘/‘, credentials)) 42 except Exception as e: 43 print(e) 44 else: 45 46 try: 47 ch = conn.channel() 48 ch.queue_declare(queue=‘rpc_queue‘) 49 ch.queue_purge(queue=‘rpc_queue‘) 50 51 ch.basic_consume( 52 cb, 53 queue=‘rpc_queue‘ 54 ) 55 56 ch.start_consuming() 57 58 except KeyboardInterrupt: 59 conn.close() 60 print(‘Server closed‘) 61 except Exception as e: 62 conn.close() 63 print(‘Server down because of‘,e) 64 65 if __name__ == ‘__main__‘: 66 main()
main.py
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 username = ‘jailly‘ 6 password = ‘123456‘
setting.py
rabbitmq_client
1 import os,sys 2 3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 sys.path.insert(0,BasePath) 5 6 from core import main 7 main.main()
rabbitmq_client
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 import re 6 import time 7 import random 8 import threading 9 import sys 10 import os 11 import pika 12 13 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 14 sys.path.insert(0,BasePath) 15 16 from conf import setting 17 18 username = setting.username 19 password = setting.password 20 connection_timeout = setting.connection_timeout 21 22 run_p = re.compile(‘‘‘ 23 ^\s*run\s+ # run 24 (?P<quote>\‘)?(?P<d_quote>\‘\‘)? # 单引号 或 双引号 的前一半 25 (?P<command>.+) # command 26 (?(quote)\‘|)(?(d_quote)\‘\‘|) # 单引号 或 双引号 的后一半 27 \s+--host\s+ 28 (?P<ips> 29 (((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s+)* # ip 30 ((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s* # ip 31 )$ 32 ‘‘‘,re.X) # 匹配run指令的re模式对象 33 check_p = re.compile(r‘^\s*check_task\s+(?P<task_id>\d+)\s*$‘) # 匹配check_task指令的re模式对象 34 exit_p = re.compile(r‘^\s*exit\s*$‘) # 匹配exit指令的re模式对象 35 36 task_ids = list(range(65535)) #task_id 的取值范围 37 38 # tasks是一个保存任务的字典。key为task_id,value是该id对应的RPCClient()对象们组成的列表。创建该变量的目的是为映射 task_id 与其对应的 39 # RPCClient()对象,方便获取该 task_id 所对应的结果(该结果即RPCClient()对象的response属性) 40 tasks = {} 41 42 # conns是一个保存连接的字典,用以缓存已建立过的连接。key 为ip,value 是一个列表[rc,last_start_time,task_id_list],rc是该ip对应的 43 # RPCClient()对象,last_start_time是最近一次连接的开始时间,task_id_list 是使用该连接(或该RPCClient()对象)的task_id 所构成的列表。 44 # 创建该变量的目的是为了缓存连接,当在超时时间(默认5分钟)内再次请求同一个主机时,不用再次创建连接,节省建立连接的开销,同时也为定期清理连接保留了 45 # 连接记录 46 conns = {} 47 48 lock = threading.RLock() 49 50 class RPCClient(object): 51 def __init__(self,host,port,vhost,credentials,conn=None): 52 53 self.host = host 54 self.conn = conn if conn else pika.BlockingConnection(pika.ConnectionParameters(host,port,vhost,credentials)) 55 self.ch = self.conn.channel() 56 self.ch.queue_declare(queue=‘rpc_queue‘) 57 58 result = self.ch.queue_declare(exclusive=True) 59 self.callback_queue = result.method.queue 60 61 self.ch.basic_consume( 62 self.cb, 63 queue = self.callback_queue 64 ) 65 66 67 def cb(self,ch,method,properties,body): 68 if self.corr_id == properties.correlation_id: 69 self.response = body 70 71 72 def call(self,command,task_id): 73 self.corr_id = str(task_id) 74 self.response = None 75 76 self.ch.basic_publish( 77 exchange = ‘‘, 78 routing_key = ‘rpc_queue‘, 79 properties = pika.BasicProperties( 80 reply_to = self.callback_queue, 81 correlation_id = self.corr_id 82 ), 83 body = command 84 ) 85 86 while self.response is None: 87 self.conn.process_data_events() 88 89 self.response = ‘\033[1;32m[--- %s ---]\033[0m\n%s\n‘% (self.host,self.response.decode(‘utf-8‘)) 90 91 92 def create_connection(conns,ips,rcs,task_id): 93 ‘‘‘ 94 创建连接,并将生成的 RPCClient()对象 加入 rcs 列表 95 :param conns: 对应 main.py 中的全局变量 conns 96 :param ips: 本次任务对应的远程主机的 ip 列表 97 :param rcs: 对应 main.py 中的 main_interactive() 中的 rcs 变量:该task_id 对应的 RPCClient()对象们 所组成的列表 98 :param task_id: 本次任务的task_id 99 :return: 100 ‘‘‘ 101 102 credentials = pika.PlainCredentials(username,password) 103 for ip in ips: 104 if ip in conns: 105 rc = RPCClient(ip,5672,‘/‘,credentials,conn=conns[ip][0]) 106 rcs.append(rc) 107 conns[ip][1] = time.time() # 重置“最新连接的开始时间” 108 conns[ip][2].append(task_id) # 添加跟该连接相关的task_id 109 else: 110 rc = RPCClient(ip,5672,‘/‘,credentials) 111 rcs.append(rc) 112 conns[ip] = [rc.conn,time.time(),[task_id,]] 113 114 115 def get_result(tasks,task_id): 116 ‘‘‘ 117 根据task_id 获取 RabbitMQ 服务器的返回结果 118 :param tasks: 任务列表,对应同名全局变量 119 :param task_id: 任务id 120 :return: 121 ‘‘‘ 122 123 rcs = tasks[task_id] 124 outcome = ‘‘ 125 for rc in rcs: 126 if rc.response is not None: 127 outcome += rc.response 128 else: 129 print(‘Task %s is handling\nIf the time for handling was too long,plz check the server health or your network conditions‘%task_id) 130 return False 131 else: 132 print(outcome) 133 print(‘\033[1;34mTask done,task_id %s has been cleaned up\033[0m‘%task_id ) 134 return True 135 136 137 def handle(command,ips,task_id,conns): 138 ‘‘‘ 139 建立连接,并处理指令。相当于整合了creat_connection() 和 get_result() 140 :param command: 待处理的指令 141 :param ips: 待连接的主机ip列表 142 :param task_id: 任务id 143 :param conns: 保存连接的列表,对应同名全局变量 144 :return: 145 ‘‘‘ 146 147 # 第一步:建立连接 148 rcs = [] # 与本次 task_id 对应的 RPCClient() 对象们组成的列表 149 try: 150 create_connection(conns, ips, rcs, task_id) 151 except Exception as e: 152 print(‘\033[1;31mCan not connect with specified host,plz check server health or make sure your network patency\033[m‘) 153 else: 154 tasks[task_id] = rcs 155 156 # 第二步:处理指令 157 for rc in rcs: 158 tc = threading.Thread(target=rc.call, args=(command, task_id)) 159 tc.setDaemon(True) 160 tc.start() 161 162 163 def main_interactive(): 164 ‘‘‘ 165 主交互逻辑 166 :return: 167 ‘‘‘ 168 169 while 1: 170 cmd = input(‘>> ‘).strip() 171 172 m = run_p.search(cmd) 173 if m: 174 command = m.group(‘command‘) 175 ips = m.group(‘ips‘).split() 176 177 task_id = random.choice(task_ids) 178 task_ids.remove(task_id) 179 print(‘task_id:‘, task_id) 180 181 t = threading.Thread(target=handle,args=(command,ips,task_id,conns)) 182 t.setDaemon(True) 183 t.start() 184 185 else: 186 m = check_p.search(cmd) 187 if m: 188 task_id = m.group(‘task_id‘) 189 if task_id.isdigit(): 190 task_id = int(task_id) 191 if task_id in tasks: 192 if get_result(tasks,task_id): 193 # 取得结果后,释放task_id,删除对应的RPCClient() 对象 194 task_ids.append(task_id) 195 del tasks[task_id] 196 else: 197 print(‘‘‘\033[1;31mTask_id not found.It may because: 198 1. task_id does not exist 199 2. connection with specified host has not been created.For this,please try again later 200 3. connection with specified host has been cleaned up because of timeout\033[0m‘‘‘) 201 else: 202 print(‘\033[1;31mTask_id must be integer\033[0m‘) 203 204 else: 205 m = exit_p.search(cmd) 206 if m: 207 for rc in tasks.values(): 208 rc.conn.close() 209 exit() 210 else: 211 if cmd: 212 print(‘\033[1;31mCommand \‘%s\‘ not found\033[0m‘%cmd.split()[0]) 213 else: 214 print(‘Command can not be None‘) 215 216 217 def clean_conns(): 218 ‘‘‘ 219 超时连接清理。 220 每5分钟(默认值,可在setting中设置)检查一次,某连接最近一次建立/声明距现在5分钟以上,则从conns列表中删除,同时删除其对应的task_id和RPCClient()对象 221 注意 : 如果有正在执行的任务,会因为清除对应的RPCClient()对象,而无法取得结果!!! 222 :return: 223 ‘‘‘ 224 225 while 1: 226 time.sleep(connection_timeout) 227 lock.acquire() 228 for ip in conns.copy(): # 不能在本字典迭代时删除字典元素,可以通过其软拷贝间接删除之 229 if time.time() - conns[ip][1] > connection_timeout: # 判断是否超时 230 for id in conns[ip][2]: 231 232 # 清除与该连接相关的所有任务记录 233 if id in tasks: 234 del tasks[id] 235 236 del conns[ip] # 清除连接记录 237 lock.release() 238 239 240 def main(): 241 t = threading.Thread(target=clean_conns) 242 t.setDaemon(True) 243 t.start() 244 245 main_interactive() 246 247 248 if __name__ == ‘__main__‘: 249 main()
main.py
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 username = ‘jailly‘ 6 password = ‘123456‘ 7 connection_timeout = 300
setting.py
时间: 2024-10-09 09:54:35