module05-1-基于RabbitMQ rpc实现的主机管理

需求



题目:rpc命令端

需求:

可以异步的执行多个命令    对多台机器

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>: 

实现需求




  1. 实现全部需求

  2.会缓存已建立过的连接,减少短时间内连接相同主机时再次建立连接的开销

  3.定时清理缓存的连接

目录结构


rabbitmq_server
    ├ bin   # 执行文件目录
    |   └ rabbitmq_server.py     # 执行程序接口
    ├ conf  # 配置文件目录
    |   └ setting.py           # 配置文件。目前主要保存用以连接RabbitMQ服务器的远程用户权限
    └ core  # 程序核心代码位置
        └ main.py           # 主交互逻辑

rabbitmq_client
    ├ bin   # 执行文件目录
    |   └ rabbitmq_client.py     # 执行程序
    ├ conf  # 配置文件目录
    |   └ setting.py           # 配置文件。目前主要保存用以连接RabbitMQ服务器的远程用户权限,以及缓存连接的保存时间
    └ core  # 程序核心代码位置
        └ main.py           # 主逻辑交互程序

代码



rabbitmq_server

1 import os,sys
2
3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 sys.path.insert(0,BasePath)
5
6 from core import main
7 main.main()

rabbitmq_server.py

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 # Author:Jailly
 4
 5 import os,sys,pika,subprocess,locale,threading
 6
 7 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 8 sys.path.insert(0,BasePath)
 9
10 from conf import setting
11
12 sys_encode = locale.getdefaultlocale()[1]
13 username = setting.username
14 password = setting.password
15
16 credentials = pika.PlainCredentials(username,password)
17
18
19 def cb(ch,method,properties,body):
20     command = body.decode(‘utf-8‘)
21     print(‘Received:‘,command)
22
23     res = subprocess.Popen(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
24     out,err = res.communicate()
25     res_con = err.decode(sys_encode) if err else out.decode(sys_encode)
26
27     ch.basic_publish(
28         exchange = ‘‘,
29         routing_key = properties.reply_to,
30         properties = pika.BasicProperties(
31             correlation_id = properties.correlation_id
32         ),
33         body = res_con
34     )
35     print(‘send:‘,res_con)
36
37
38
39 def main():
40     try:
41         conn = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘, 5672, ‘/‘, credentials))
42     except Exception as e:
43         print(e)
44     else:
45
46         try:
47             ch = conn.channel()
48             ch.queue_declare(queue=‘rpc_queue‘)
49             ch.queue_purge(queue=‘rpc_queue‘)
50
51             ch.basic_consume(
52                 cb,
53                 queue=‘rpc_queue‘
54             )
55
56             ch.start_consuming()
57
58         except KeyboardInterrupt:
59             conn.close()
60             print(‘Server closed‘)
61         except Exception as e:
62             conn.close()
63             print(‘Server down because of‘,e)
64
65 if __name__ == ‘__main__‘:
66     main()

main.py

1 #! /usr/bin/env python3
2 # -*- coding:utf-8 -*-
3 # Author:Jailly
4
5 username = ‘jailly‘
6 password = ‘123456‘

setting.py

rabbitmq_client

1 import os,sys
2
3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 sys.path.insert(0,BasePath)
5
6 from core import main
7 main.main()

rabbitmq_client

  1 #! /usr/bin/env python3
  2 # -*- coding:utf-8 -*-
  3 # Author:Jailly
  4
  5 import re
  6 import time
  7 import random
  8 import threading
  9 import sys
 10 import os
 11 import pika
 12
 13 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 14 sys.path.insert(0,BasePath)
 15
 16 from conf import setting
 17
 18 username = setting.username
 19 password = setting.password
 20 connection_timeout = setting.connection_timeout
 21
 22 run_p = re.compile(‘‘‘
 23     ^\s*run\s+  # run
 24     (?P<quote>\‘)?(?P<d_quote>\‘\‘)?  # 单引号 或 双引号 的前一半
 25     (?P<command>.+)  # command
 26     (?(quote)\‘|)(?(d_quote)\‘\‘|)  # 单引号 或 双引号 的后一半
 27     \s+--host\s+
 28     (?P<ips>
 29         (((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s+)*  # ip
 30         ((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s*  # ip
 31     )$
 32     ‘‘‘,re.X)  # 匹配run指令的re模式对象
 33 check_p = re.compile(r‘^\s*check_task\s+(?P<task_id>\d+)\s*$‘)  # 匹配check_task指令的re模式对象
 34 exit_p = re.compile(r‘^\s*exit\s*$‘)  # 匹配exit指令的re模式对象
 35
 36 task_ids = list(range(65535))  #task_id 的取值范围
 37
 38 # tasks是一个保存任务的字典。key为task_id,value是该id对应的RPCClient()对象们组成的列表。创建该变量的目的是为映射 task_id 与其对应的
 39 # RPCClient()对象,方便获取该 task_id 所对应的结果(该结果即RPCClient()对象的response属性)
 40 tasks = {}
 41
 42 # conns是一个保存连接的字典,用以缓存已建立过的连接。key 为ip,value 是一个列表[rc,last_start_time,task_id_list],rc是该ip对应的
 43 # RPCClient()对象,last_start_time是最近一次连接的开始时间,task_id_list 是使用该连接(或该RPCClient()对象)的task_id 所构成的列表。
 44 # 创建该变量的目的是为了缓存连接,当在超时时间(默认5分钟)内再次请求同一个主机时,不用再次创建连接,节省建立连接的开销,同时也为定期清理连接保留了
 45 # 连接记录
 46 conns = {}
 47
 48 lock = threading.RLock()
 49
 50 class RPCClient(object):
 51     def __init__(self,host,port,vhost,credentials,conn=None):
 52
 53         self.host = host
 54         self.conn = conn if conn else pika.BlockingConnection(pika.ConnectionParameters(host,port,vhost,credentials))
 55         self.ch = self.conn.channel()
 56         self.ch.queue_declare(queue=‘rpc_queue‘)
 57
 58         result = self.ch.queue_declare(exclusive=True)
 59         self.callback_queue = result.method.queue
 60
 61         self.ch.basic_consume(
 62             self.cb,
 63             queue = self.callback_queue
 64         )
 65
 66
 67     def cb(self,ch,method,properties,body):
 68         if self.corr_id == properties.correlation_id:
 69             self.response = body
 70
 71
 72     def call(self,command,task_id):
 73         self.corr_id = str(task_id)
 74         self.response = None
 75
 76         self.ch.basic_publish(
 77             exchange = ‘‘,
 78             routing_key = ‘rpc_queue‘,
 79             properties = pika.BasicProperties(
 80                 reply_to = self.callback_queue,
 81                 correlation_id = self.corr_id
 82             ),
 83             body = command
 84         )
 85
 86         while self.response is None:
 87             self.conn.process_data_events()
 88
 89         self.response = ‘\033[1;32m[--- %s ---]\033[0m\n%s\n‘% (self.host,self.response.decode(‘utf-8‘))
 90
 91
 92 def create_connection(conns,ips,rcs,task_id):
 93     ‘‘‘
 94     创建连接,并将生成的 RPCClient()对象 加入 rcs 列表
 95     :param conns: 对应 main.py 中的全局变量 conns
 96     :param ips: 本次任务对应的远程主机的 ip 列表
 97     :param rcs: 对应 main.py 中的 main_interactive() 中的 rcs 变量:该task_id 对应的 RPCClient()对象们 所组成的列表
 98     :param task_id: 本次任务的task_id
 99     :return:
100     ‘‘‘
101
102     credentials = pika.PlainCredentials(username,password)
103     for ip in ips:
104         if ip in conns:
105             rc = RPCClient(ip,5672,‘/‘,credentials,conn=conns[ip][0])
106             rcs.append(rc)
107             conns[ip][1] = time.time()      # 重置“最新连接的开始时间”
108             conns[ip][2].append(task_id)    # 添加跟该连接相关的task_id
109         else:
110             rc = RPCClient(ip,5672,‘/‘,credentials)
111             rcs.append(rc)
112             conns[ip] = [rc.conn,time.time(),[task_id,]]
113
114
115 def get_result(tasks,task_id):
116     ‘‘‘
117     根据task_id 获取 RabbitMQ 服务器的返回结果
118     :param tasks: 任务列表,对应同名全局变量
119     :param task_id: 任务id
120     :return:
121     ‘‘‘
122
123     rcs = tasks[task_id]
124     outcome = ‘‘
125     for rc in rcs:
126         if rc.response is not None:
127             outcome += rc.response
128         else:
129             print(‘Task %s is handling\nIf the time for handling was too long,plz check the server health or your network conditions‘%task_id)
130             return False
131     else:
132         print(outcome)
133         print(‘\033[1;34mTask done,task_id %s has been cleaned up\033[0m‘%task_id )
134         return True
135
136
137 def handle(command,ips,task_id,conns):
138     ‘‘‘
139     建立连接,并处理指令。相当于整合了creat_connection() 和 get_result()
140     :param command: 待处理的指令
141     :param ips: 待连接的主机ip列表
142     :param task_id: 任务id
143     :param conns: 保存连接的列表,对应同名全局变量
144     :return:
145     ‘‘‘
146
147     # 第一步:建立连接
148     rcs = []  # 与本次 task_id 对应的 RPCClient() 对象们组成的列表
149     try:
150         create_connection(conns, ips, rcs, task_id)
151     except Exception as e:
152         print(‘\033[1;31mCan not connect with specified host,plz check server health or make sure your network patency\033[m‘)
153     else:
154         tasks[task_id] = rcs
155
156         # 第二步:处理指令
157         for rc in rcs:
158             tc = threading.Thread(target=rc.call, args=(command, task_id))
159             tc.setDaemon(True)
160             tc.start()
161
162
163 def main_interactive():
164     ‘‘‘
165     主交互逻辑
166     :return:
167     ‘‘‘
168
169     while 1:
170         cmd = input(‘>> ‘).strip()
171
172         m = run_p.search(cmd)
173         if m:
174             command = m.group(‘command‘)
175             ips = m.group(‘ips‘).split()
176
177             task_id = random.choice(task_ids)
178             task_ids.remove(task_id)
179             print(‘task_id:‘, task_id)
180
181             t = threading.Thread(target=handle,args=(command,ips,task_id,conns))
182             t.setDaemon(True)
183             t.start()
184
185         else:
186             m = check_p.search(cmd)
187             if m:
188                 task_id = m.group(‘task_id‘)
189                 if task_id.isdigit():
190                     task_id = int(task_id)
191                     if task_id in tasks:
192                         if get_result(tasks,task_id):
193                             # 取得结果后,释放task_id,删除对应的RPCClient() 对象
194                             task_ids.append(task_id)
195                             del tasks[task_id]
196                     else:
197                         print(‘‘‘\033[1;31mTask_id not found.It may because:
198 1. task_id does not exist
199 2. connection with specified host has not been created.For this,please try again later
200 3. connection with specified host has been cleaned up because of timeout\033[0m‘‘‘)
201                 else:
202                     print(‘\033[1;31mTask_id must be integer\033[0m‘)
203
204             else:
205                 m = exit_p.search(cmd)
206                 if m:
207                     for rc in tasks.values():
208                         rc.conn.close()
209                     exit()
210                 else:
211                     if cmd:
212                         print(‘\033[1;31mCommand \‘%s\‘ not found\033[0m‘%cmd.split()[0])
213                     else:
214                         print(‘Command can not be None‘)
215
216
217 def clean_conns():
218     ‘‘‘
219     超时连接清理。
220     每5分钟(默认值,可在setting中设置)检查一次,某连接最近一次建立/声明距现在5分钟以上,则从conns列表中删除,同时删除其对应的task_id和RPCClient()对象
221     注意 : 如果有正在执行的任务,会因为清除对应的RPCClient()对象,而无法取得结果!!!
222     :return:
223     ‘‘‘
224
225     while 1:
226         time.sleep(connection_timeout)
227         lock.acquire()
228         for ip in conns.copy():  # 不能在本字典迭代时删除字典元素,可以通过其软拷贝间接删除之
229             if time.time() - conns[ip][1] > connection_timeout:  # 判断是否超时
230                 for id in conns[ip][2]:
231
232                     # 清除与该连接相关的所有任务记录
233                     if id in tasks:
234                         del tasks[id]
235
236                 del conns[ip]  # 清除连接记录
237         lock.release()
238
239
240 def main():
241     t = threading.Thread(target=clean_conns)
242     t.setDaemon(True)
243     t.start()
244
245     main_interactive()
246
247
248 if __name__ == ‘__main__‘:
249     main()

main.py

1 #! /usr/bin/env python3
2 # -*- coding:utf-8 -*-
3 # Author:Jailly
4
5 username = ‘jailly‘
6 password = ‘123456‘
7 connection_timeout = 300

setting.py

时间: 2024-10-09 09:54:35

module05-1-基于RabbitMQ rpc实现的主机管理的相关文章

python--基于RabbitMQ rpc实现的主机管理

要求: 可以异步的执行多个命令对多台机器>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>: 思考:1.分解其中需要实现的功能(1)命令是发到远程主机上执行的,命令放在队列里,再发到主机处理,主机执行完结果放在队列里,提交命令的人自取.就需要2个进程,一个client,提交命令,取结果,一个server,处理命令,放结果(2)发送命令的时候,

基于RabbitMQ的跨平台RPC框架

RabbitMQRpc protocobuf RabbitMQ 实现RPC https://www.cnblogs.com/LiangSW/p/6216537.html 基于RabbitMQ的RPC https://blog.csdn.net/lmw1239225096/article/details/79453317 RabbitMQ之RPC实现 https://blog.csdn.net/u013256816/article/details/55218595 rabbitMQ 和 proto

RabbitMQ - RPC in Java

这次试着用RabbitMQ进行RPC. 其实用RabbitMQ搞RPC也没什么特别的. 只是我们需要在请求中再加入一个callback queue. 比如这样: callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties                             .Builder()                             .re

基于域名的虚拟web主机

Web网站服务(二) 用户授权限制 1 创建用户认证数据文件(新建数据文件/usr/local/httpd/conf/.awspwd 其中包括一个名为webadmin的用户) 查看创建的用户数据文件 2 添加用户授权配置 AuthName 定义受保护的领域名称 Authtype 设置认证的类型 basic表示基本认证 Authuserfile 设置用于保存用户账户密码的认证文件路径 Require  valid-user 要求只有认证文件中的合法用户才能访问 重启服务使新配置生效 3 验证用户访

Openstack中RabbitMQ RPC代码分析

在Openstack中,RPC调用是通过RabbitMQ进行的. 任何一个RPC调用,都有Client/Server两部分,分别在rpcapi.py和manager.py中实现. 这里以nova-scheduler调用nova-compute为例子. nova/compute/rpcapi.py中有ComputeAPI nova/compute/manager.py中有ComputeManager 两个类有名字相同的方法,nova-scheduler调用ComputeAPI中的方法,通过底层的R

tomcat基于多端口的虚拟主机配置

我又有两个项目需要部署在一台机器上,两个项目对应两个不同的服务,一个服务启动侦听8001 另一个侦听8002端口,就是基于多端口的虚拟机主机配置,只需要更改tomcat目录下的conf/server.xml配置文件,内容如下: <?xml version='1.0' encoding='utf-8'?> <Server port="8005" shutdown="SHUTDOWN">   <Listener className=&quo

Apache配置基于端口号的虚拟主机 Apache virtual host configuration is based on the port

有可能只有一个ip出口,但却有多个项目,那么就需要基于端口号架设虚拟主机. Step 1: 检查是否开启 httpd-vhosts.conf apache/conf/httpd.conf文件 # Virtual hosts Include conf/extra/httpd-vhosts.conf 如果没有开启,必须在httpd.conf文件中设置:如果开启,则可以在apache/conf/extra/httpd-vhosts.conf文件中设置,当然也还是可以再httpd.conf文件中进行设置

如何开发基于Dubbo RPC的分布式服务?

什么是Dubbo? Dubbo能做什么? 在Crystal框架下,如何开发基于Dubbo RPC的服务? 在Crystal框架下,如何调用Dubbo RPC服务? 相关的文章 什么是Dubbo? Dubbo[]是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案. 其核心部分包含: 远程通讯: 提供对多种基于长连接的NIO框架抽象封装,包括多种线程模型,序列化,以及“请求-响应”模式的信息交换方式. 集群容错: 提供基于接口方法的透明远程过程调用,包括多

基于httpd-2.4配置虚拟主机web站点,并提供https服务(二)

使用httpd-2.2和httpd-2.4实现 > 1.建立httpd服务,要求: > 1) 提供两个基于名称的虚拟主机www1, www2:要求每个虚拟主机都有单独的错误日志和访问日志: > 2) 通过www1的/server-status提供状态信息,且仅允许172.16.0.1主机访问: > 3) www2不允许192.168.1.0/24网络中任意主机访问: > 2.为上面的第2)个虚拟主机提供https服务. > 基于httpd-2.4配置虚拟主机web站点,