python--基于RabbitMQ rpc实现的主机管理

要求:

可以异步的执行多个命令对多台机器>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>:

思考:1、分解其中需要实现的功能(1)命令是发到远程主机上执行的,命令放在队列里,再发到主机处理,主机执行完结果放在队列里,提交命令的人自取。就需要2个进程,一个client,提交命令,取结果,一个server,处理命令,放结果(2)发送命令的时候,exchange决定往哪个队列放消息,每个server取自己的命令,用ip作为筛选的binding_key(3)取结果的时候,就用默认的exchange,直接往reply_to的队列里放

server端
 1 #rabbitMQserver=‘10.21.147.189‘
 2 rabbitMQserver=‘localhost‘
 3
 4 import pika
 5 import os
 6 import socket
 7
 8 class server(object):
 9     def __init__(self):
10         self.connection=pika.BlockingConnection(pika.ConnectionParameters(rabbitMQserver))
11         self.channel=self.connection.channel()
12         self.channel.exchange_declare(exchange=‘cmd‘,exchange_type=‘topic‘)
13         self.queue_default = self.channel.queue_declare(exclusive=True)
14         self.quname = self.queue_default.method.queue
15         # 获取本机ip作为binding_key,binding_key的格式是以点分隔的一系列字符串,client发过来的消息,routing_key满足server的binding_key匹配原则,就会加入server连的那个queue
16         self.hostip=socket.gethostbyname(socket.gethostname())
17         self.binding_key=‘#.‘+self.hostip+‘.#‘
18         print("binding_key: %s" % self.binding_key)
19         self.channel.queue_bind(queue=self.quname, exchange=‘cmd‘, routing_key=self.binding_key)
20         self.channel.basic_qos(prefetch_count=1)
21
22         self.channel.basic_consume(self.execcmd,queue=self.quname,no_ack=False)
23         self.channel.start_consuming()
24         return
25
26     def execcmd(self,ch,method,props,body):
27         cmd=bytes.decode(body)
28         print("[*] Received %s" % cmd)
29         result = os.popen(cmd).read()
30         print(result)
31         ch.basic_publish(
32             exchange=‘‘,
33             routing_key=props.reply_to,
34             properties=pika.BasicProperties(
35                 correlation_id=props.correlation_id
36             ),
37             body=result
38         )
39         ch.basic_ack(delivery_tag=method.delivery_tag)
40         return
41
42 se=server()

client端
 1 import pika
 2 import uuid
 3
 4 #rabbitMQserver=‘10.21.147.189‘
 5 rabbitMQserver=‘localhost‘
 6
 7 class client(object):
 8     def __init__(self):
 9         self.cmdid={}
10         self.connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitMQserver))
11         self.channel = self.connection.channel()
12         self.channel.exchange_declare(exchange=‘cmd‘, exchange_type=‘topic‘)
13
14         #回写result的队列用默认生成的,需要取得名字,exclusive定为True表示只能有本client消费这个queue
15         self.result=self.channel.queue_declare(exclusive=True)
16         self.resultqueue=self.result.method.queue
17         return
18
19     def showcmdid(self):
20         print("-----命令标识如下:--------")
21         for id,hosts in self.cmdid.items():
22             print("cmdid:%s run on hosts: %s"%(id,hosts))
23         return
24
25     def callcmd(self,cmdinput):
26         #如果是call命令,格式是 call+"命令"+ --host + 一串ip地址
27         #首先判断格式对不对,至少4个参数
28         msglist=cmdinput.split()
29         argnum=len(msglist)
30         if argnum<4:
31             print("Wrong cmd. Input again.")
32             return
33         #其次,命令是第二个参数,命令用双引号标记,所以要strip
34         msg = msglist[1].strip("\"")
35         #然后,第三个是--host,第四个开始是ip,ip当作routing_key
36         routing_key=msglist[3]
37         i=4
38         while i < argnum:
39             routing_key=routing_key+‘.‘+msglist[i]
40             i+=1
41         print("routing_key: %s"%routing_key)
42         #再然后,生成一个随机数,把他作为消息的属性参数
43         self.corr_id=str(uuid.uuid4())
44         self.cmdid[self.corr_id]=msglist[3:]
45         print("命令标识:%s"%self.corr_id)
46         #然后,把消息发到exchange,routing_key,corr_id当作参数发布
47         self.channel.basic_publish(
48             exchange=‘cmd‘,
49             routing_key=routing_key,
50             body=msg,
51             properties=pika.BasicProperties(
52                 reply_to=self.resultqueue,
53                 correlation_id=self.corr_id
54             )
55         )
56         print("[*] Send message %s" % msg)
57         return
58
59     def on_response(self,ch,method,props,body):
60         if self.targetcmdid==props.correlation_id:
61             self.response=bytes.decode(body)
62             print(self.response)
63         ch.basic_ack(delivery_tag=method.delivery_tag)
64         return
65
66     def getres(self,cmdinput):
67         msglist = cmdinput.split()
68         self.targetcmdid=msglist[1]
69         self.response=None
70         self.channel.basic_consume(self.on_response,queue=self.resultqueue)
71         while self.response is None:
72             self.connection.process_data_events()
73         return
74
75 cl=client()
76 while True:
77     msginput=input(">>: ").strip()
78     if msginput.startswith(‘call‘):
79         cl.callcmd(msginput)
80     elif msginput.startswith(‘get‘):
81         cl.getres(msginput)
82     elif msginput.startswith(‘show‘):
83         cl.showcmdid()
84     elif msginput.startswith(‘exit‘):
85         cl.connection.close()
86         exit(0)
87     else:
88         print("Wrong cmd. Input again.")
89         continue

目前没有验证远程登陆rabbitMQ server的情况,应该是需要配置用户名密码,不能用默认的guest/guest。不过以上功能是实现了。

原文地址:https://www.cnblogs.com/susenyan/p/8489345.html

时间: 2024-11-05 20:43:33

python--基于RabbitMQ rpc实现的主机管理的相关文章

module05-1-基于RabbitMQ rpc实现的主机管理

需求 题目:rpc命令端 需求: 可以异步的执行多个命令 对多台机器 >>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>: 实现需求 1. 实现全部需求 2.会缓存已建立过的连接,减少短时间内连接相同主机时再次建立连接的开销 3.定时清理缓存的连接 目录结构 rabbitmq_server ├ bin # 执行文件目录 | └ rabbitm

python 之路,Day27 - 主机管理+堡垒机系统开发

python 之路,Day27 - 主机管理+堡垒机系统开发 本节内容 需求讨论 构架设计 表结构设计 程序开发 1.需求讨论 实现对用户的权限管理,能访问哪些机器,在被访问的机器上有哪些权限 实现可以通过web页面对指定主机列表 进行 批量发布命令.文件 实现对用户操作进行纪录 2.架构设计 3. 表结构设计 参考 http://www.cnblogs.com/alex3714/articles/5286889.html 分类: Python自动化开发之路 好文要顶 关注我 收藏该文   金角

基于RabbitMQ的跨平台RPC框架

RabbitMQRpc protocobuf RabbitMQ 实现RPC https://www.cnblogs.com/LiangSW/p/6216537.html 基于RabbitMQ的RPC https://blog.csdn.net/lmw1239225096/article/details/79453317 RabbitMQ之RPC实现 https://blog.csdn.net/u013256816/article/details/55218595 rabbitMQ 和 proto

Python学习笔记——进阶篇【第八周】———FTP断点续传作业&amp;批量主机管理工具

主机管理之paramiko模块学习 http://www.cnblogs.com/wupeiqi/articles/5095821.html 作业1:用socketserver继续完善FTP作业 作业2:开发一个批量主机管理工具 需求: 可以对机器进行分组 可以对指定的一组或多组机器执行批量命令,分发文件(发送\接收) 纪录操作日志 作业参考链接http://www.cnblogs.com/alex3714/articles/5227251.html

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

十一天 python操作rabbitmq、redis

1.启动rabbimq.mysql 在""运行""里输入services.msc,找到rabbimq.mysql启动即可 2.启动redis 管理员进入cmd,进入redis所在目录,执行redis-server.exe redis.windows.conf --maxmemory 200M  启动redis  server 执行redis-cli.exe启动客户端 一.python系列之 RabbitMQ - work queues 本节我们创建一个工作队列( w

Django + Ansible 主机管理

本文分享内容如下: 内容目录 Django 基础 MVC ORM COMMAND AuthenticationAnsible 基础 配置 ad-hoc 命令集 python api代码解读 演示 创建虚拟化环境并进入python3/python -m venv venv(linux)source venv\bin\active(win) venv\Scripts\active 安装第三方库pip install -r requirements.txt 初始化python manage.py ma

python之rabbitMQ

一.简单的rabbitMQ队列通信 由上图可知,数据是先发给exchange交换器,exchage再发给相应队列.pika模块是python对rabbitMQ的API接口.接收端有一个回调函数,一接收到数据就调用该函数.一条消息被一个消费者接收后,该消息就从队列删除.OK,了解上面的知识后,先来看看一个简单的rabbitMQ列队通信. send端:  1 import pika 2 #连上rabbitMQ 3 connection=pika.BlockingConnection(pika.Con

存储与虚拟主机管理,克隆虚拟机的详细讲解

存储与虚拟主机管理 本章的重点了解及部署vmotion迁移,迁移前需要准备外部存储网络,本章将围绕这几个点进行讲解以及部署openfiler和vmotion进行迁移 esxi存储是虚拟化平台的基础,分为本地存储和外部存储. 1)本地存储: DSA 直接附加存储 不需要通过网络进行访问 主机可以直接在本地访问,其他主机不能进行访问,不能使用IDE/ATA或USB驱动器来存储虚拟机,上面的数据只能被一台esxi主机访问 2)外部共享存储: ① 光纤 (FC) SAN 又称hdb卡 网络存储 ② IP