Python开发【项目】:RPC异步执行命令(RabbitMQ双向通信)

RPC异步执行命令
需求:
  • 利用RibbitMQ进行数据交互
  • 可以对多台服务器进行操作
  • 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
  • 实现异步操作

本节涉及最多的还是rabbitmq通信原理知识,要求安装rabbitmq服务

程序用广播topic模式做更好

程序目录结构:

程序简介:

# 异步rpc程序

## 1、需求
- [ ] 利用RibbitMQ进行数据交互
- [ ] 可以对多台服务器进行操作
- [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
- [ ] 实现异步操作

## 备注

- [ ] RabbitMQ队列名:
                    ①执行命令时,队列名为服务器端的IP
                    ②查询数据时,用的是回调时随机生成的callback_queue名
- [ ] threading多线程:
                    实现命令执行后不等待执行结果,依然可以输入新的指令

- [ ] 执行命令格式:
                 -->>run "dir" host 192.168.5.107 127.0.0.1
                        dir     server端要执行的命令
                        host    host后可跟一个或多个可以通过rabbitMQ的服务器地址

- [ ] 查看后台所有的TASK_ID信息:
                 -->>check_all
     显示结果样式:TASK_ID【76786】    HOST【192.168.5.107】    COMMAND【dir】
                  TASK_ID【10307】    HOST【127.0.0.1】    COMMAND【dir】

- [ ] 查看TASK_ID对应的执行结果:
                 -->>check_task 10307
                         10307 为check_all查到的TASK_ID

程序流程图:

服务器端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# !/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import os

class Server(object):
    def __init__(self,rabbitmq,queue_name):
        self.queue_name = queue_name
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=rabbitmq))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

    def handle(self,command):
        command = command.decode()
        print(command,type(command))
        message = os.popen(command).read()
        if not message:
            message = "Wrong Command"
        return message

    def on_request(self,ch, method, props, body):
        response = self.handle(body)
        ch.basic_publish(exchange=‘‘,
                         routing_key=props.reply_to,  # 回信息队列名
                         properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def start(self):
        self.channel.basic_consume(self.on_request,
                                   queue=self.queue_name)

        print(" [x] Awaiting RPC requests")
        self.channel.start_consuming()

if __name__ == "__main__":
    rabbitmq = "localhost"      #rabbitmq服务器地址
    queue_name = "192.168.20.22"    #queue_name为本地ip地址
    server = Server(rabbitmq,queue_name)
    server.start()

客户端:

bin目录:

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import os
import platform

#添加BASE_DIR,添加顶级目录到路径中,方便调用其他目录模块
if platform.system() == ‘Windows‘:
    print(os.path.abspath(os.path.dirname(__file__)).split(‘\\‘)[:-1])
    BASE_DIR = ‘\\‘.join(os.path.abspath(os.path.dirname(__file__)).split(‘\\‘)[:-1])
else:
    BASE_DIR = ‘/‘.join(os.path.abspath(os.path.dirname(__file__)).split(‘/‘)[:-1])

#加载环境变量
sys.path.append(BASE_DIR)
from conf import settings
from core import main

if __name__ == ‘__main__‘:
    obj = main.Handler()
    obj.start()

conf目录:

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import os
import sys
import platform

if platform.system() == ‘Windows‘:
    BASE_DIR = ‘\\‘.join(os.path.abspath(os.path.dirname(__file__)).split(‘\\‘)[:-1])
    school_dbpaths = os.path.join(BASE_DIR,‘school_db‘)

else:
    BASE_DIR = ‘/‘.join(os.path.abspath(os.path.dirname(__file__)).split(‘/‘)[:-1])
    school_dbpaths =os.path.join(BASE_DIR, ‘school_db‘)

#rabbitmq服务地址ip
RabbitMQ_IP = ‘localhost‘

core目录

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

from conf import settings
from modules.client import Client
import random,time
import threading

class Handler(object):
    def __init__(self):
        self.information = {}   # 后台进程信息

    def check_all(self,*args):
        ‘‘‘查看所有task_id信息‘‘‘
        time.sleep(2)
        for key in self.information:
            print("TASK_ID【%s】\tHOST【%s】\tCOMMAND【%s】"%(key,self.information[key][0],
                                                                    self.information[key][1]))

    def check_task(self,user_cmd):
        ‘‘‘查看task_id执行结果‘‘‘
        time.sleep(2)
        try:
            task_id = user_cmd.split()[1]
            task_id = int(task_id)
            callback_queue=self.information[task_id][2]
            callback_id=self.information[task_id][3]
            client = Client()
            response = client.get_response(callback_queue, callback_id)
            print(response.decode())
            del self.information[task_id]

        except KeyError  as e :
            print("\33[31;0mWrong id[%s]\33[0m"%e)
        except IndexError as e:
            print("\33[31;0mWrong id[%s]\33[0m"%e)

    def run(self,user_cmd):
        ‘‘‘执行命令‘‘‘
        try:
            time.sleep(2)
            #print("--->>",user_cmd)
            command = user_cmd.split("\"")[1]
            hosts = user_cmd.split()[3:]
            for host in hosts:
                task_id = random.randint(10000, 99999)
                client = Client()
                response = client.call(host, command)
                # print(response)
                self.information[task_id] = [host, command, response[0],response[1]]
        except IndexError as e:
            print("\33[31;0mError:%s\33[0m"%e)

    def reflect(self,str,user_cmd):
        ‘‘‘反射‘‘‘
        if hasattr(self, str):
            getattr(self, str)(user_cmd)
        # else:
        #     setattr(self, str, self.foo)
        #     getattr(self, str)()

    def start(self):
        while True:
            user_cmd = input("->>").strip()
            if not user_cmd:continue
            str = user_cmd.split()[0]
            t1 = threading.Thread(target=self.reflect,args=(str,user_cmd))  #多线程
            t1.start()

modules目录

client.py

运行示例图

原文地址:https://www.cnblogs.com/fuyuteng/p/9283343.html

时间: 2024-10-13 02:10:23

Python开发【项目】:RPC异步执行命令(RabbitMQ双向通信)的相关文章

m5-多主机异步执行命令程序

这是一个多主机异步执行命令程序 作者介绍: author:Howard My Blog: http://987774031.blog.51cto.com/ GitHub: https://github.com/wuwuming/python_practice/tree/master/machine_manage 需求: 例子: >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 task id: 45334 >>: chec

Saltstack异步执行命令(十三)

Saltstack异步执行命令 salt执行命令有时候会有超时的问题,就是命令下发下去了,部分主机没有返回信息,这时候就很难判断命令或任务是否执行成功.因此,salt提供异步执行的功能,发出命令后立即返回一个jid.然后我们就可以根据这个jid来查询任务是否执行成功. 命令行实现异步 参数--async,返回job ID,根据job ID我们可以查询执行结果. salt --async '*' test.ping salt-run jobs.lookup_jid 2016111716315335

python之实现批量远程执行命令(堡垒机)

python远程批量执行 我并不是一个专业的开发,我一直在学习linux运维,对于python也是接触不久,所以代码写的并不是很规范简洁. 前段时间一个同学找我一起做一个自动化运维平台,我对python的django还没有了解,并且对于HTML和JS这类开发学习还没有涉及,所以我说我做些后台的实现,前端就交给我的同学做.不扯淡了,下面说下我做批量执行的思路. 用到的模块:paramiko 功能:很简单就是批量执行命令,类似于ansible,本来想用Fabric,但是想一想还是用paramiko,

python脚本 对批量机器执行命令和发送文件

背景:对linux服务器批量执行命令和批量发送文件是运维自动化过程中的最基础的,本脚本就是实现这个功能,shell通过expect也可以实现类似功能. 本脚本用到了pexpect模块,没有该模块的需要手动安装. 该脚本有4个功能: 对被管理的服务器批量执行命令 批量发送本地文件到被管理机器上 支持服务器分组 支持IP地址序列 脚本配置文件如下: [WEB] 192.168.56.102 root liu123 192.168.56.103 root liu123 [DB] 192.168.56.

Python开发【十一章】:RabbitMQ队列

RabbitMQ队列 rabbitMQ是消息队列:想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互).进程queue(父进程与子进程进行交互或者同属于同一父进程下的多个子进程进行交互):如果两个独立的程序,那么之间是不能通过queue进行交互的,这时候我们就需要一个中间代理即rabbitMQ 消息队列: RabbitMQ ZeroMQ ActiveMQ ........... 原理: 1.安装和基本使用 安装RabbitMQ服务  htt

Python 开发 项目《外星人入侵》

2019-02-05 本篇心路历程: 本篇是打算记录自己的第一个python项目,也是众人皆知的<外星人入侵项目>,本项目大概500多行.趁着寒假,大概耗时3天吧,把完整代码敲了出来,当然是照着书敲的啦,本人也是刚刚入门python. 打算在python这条路上走得更远一些吧,也以此来记录自己.其实前些天是在看python基础的视频,在选择看视频学习和看书籍学习也是迷茫了一段时间.感觉看视频学习python的话呢,可以是可以,但是要有书籍作为辅助,不然你拿什么敲代码呢?自己感觉还是,以书籍为主

python之socket运用之执行命令

服务端的代码 import socket import subprocess HOST = "127.0.0.1" PORT = 5001 ip_bind = (HOST,PORT) server = socket.socket() server.bind(ip_bind) server.listen(1) print("server is waiting for connected........") conn, add = server.accept() pri

Python开发【模块】:Celery 分布式异步消息任务队列

Celery 前言: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们所有客户的

ADO.NET开发技巧(3)-执行命令初步

非常好,新兵,我们已经成功连接到了数据库,接下来,我们就可以让数据库执行我们的命令了,也就是,可以开始大干一场了.哦,当然,你还得知道如何让数据库执行命令.如果没掌握好技巧而弄巧成拙的话,异常大军会马上赶过来并杀死你的. 一旦SqlConnection类使用了open方法,那么,就代表我们已经连接到了数据库,哦,等等,这是-.异常大军!笨蛋,你连接字符串拼错了!怎么办?别慌,下士,microsoft送给我们的工具箱还没用呢,让我打开看看,哦,这是try-catch-finally.另一个是usi