具体需求:
1. 模拟Device首先发送注册包注册到TPS服务器,然后Client发送私有数据包到TPS,测试Device可以接收到私有数据包则返回成功或标志位(0, ‘‘)或(1, ‘errormsg‘)
2. 由于此插件是用于自己写的插件式监控系统,所以入口函数名必须和文件名保持一致,这里暂定为server_tps_status.py
实现思路:
具体代码:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.163.com/ # Purpose: # Install: # """ # 说明: 导入公共模块 import json import socket def device_send_recv(device_socket, tps_host, tps_port, private_data, extra_data): send_data = { "TransProxy": { "Body": { ‘AuthCode‘: ‘0000000000000000‘, ‘SerialNumber‘: ‘0000000000000000‘ }, ‘Header‘: { ‘CSeq‘: ‘1‘, ‘MessageType‘: ‘MSG_TRANSPROXY_REGISTER_REQ‘, ‘TerminalType‘: ‘Camera‘, ‘Version‘: ‘1.0‘ } } } send_data = json.dumps(send_data) send_data = ("POST /TransProxy HTTP/1.1\r\n" "Host:%s\r\n" "Port:%s\r\n" "Connection:keep-alive\r\n" "Content-Length:%s\r\n\r\n" "%s\r\n" ) % (tps_host, tps_port, len(send_data), send_data) device_socket.send(send_data) while True: cur_buffer = device_socket.recv(1024) if ‘TransProxy‘ in cur_buffer: yield (0, ‘‘) elif private_data in cur_buffer: yield (0, ‘‘) break else: yield (1, cur_buffer) break def client_send_recv(device_socket, tps_host, tps_port, private_data, extra_data): send_data = ("POST /PrivateData HTTP/1.1\r\n" "Host:%s\r\n" "Port:%s\r\n" "AuthCode:%s\r\n" "SrcUuid:%s\r\n" "DestUuid:%s\r\n" "Connection:keep-alive\r\n" "Content-Length:%s\r\n\r\n" "%s\r\n" ) % (tps_host, tps_port, extra_data[0], extra_data[1], extra_data[2], len(private_data), private_data) device_socket.send(send_data) def service_tps_status(): private_data = ‘xmdevops‘ exec_ret_dicts = {‘status‘: 0, ‘target‘: 0, ‘errors‘: ‘‘} tps_host, tps_port = ‘127.0.0.1‘, 6604 extra_data = ( ‘02899574bd6e899060‘, ‘1111111111111111‘, ‘0000000000000000‘, ) client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) device_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: client_socket.connect((tps_host, tps_port)) device_socket.connect((tps_host, tps_port)) except socket.error, e: exec_ret_dicts.update({ ‘status‘: 1, ‘target‘: 1, ‘errors‘: e.strerror, }) return exec_ret_dicts device = device_send_recv(device_socket, tps_host, tps_port, private_data, extra_data) device.next() client_send_recv(client_socket, tps_host, tps_port, private_data, extra_data) while True: try: result = device.next() except StopIteration, e: break exec_ret_dicts.update({ ‘status‘: result[0], ‘target‘: result[0], ‘errors‘: result[1], }) return exec_ret_dicts if __name__ == ‘__main__‘: print(service_tps_status())
时间: 2024-10-08 10:44:17