python3+ros api


# !/usr/bin/env python# -*- coding:utf-8 -*-# Author:lzd

import sys, time, binascii, socket, selectimport hashlib

class ApiRos:    "Routeros api"

def __init__(self, sk):        self.sk = sk        self.currenttag = 0

def login(self, username, pwd):        for repl, attrs in self.talk(["/login"]):            chal = binascii.unhexlify((attrs[‘=ret‘]).encode(‘UTF-8‘))        md = hashlib.md5()        md.update(b‘\x00‘)        md.update(pwd.encode(‘UTF-8‘))        md.update(chal)        self.talk(["/login", "=name=" + username,                   "=response=00" + binascii.hexlify(md.digest()).decode(‘UTF-8‘)])

def talk(self, words):        if self.writeSentence(words) == 0: return        r = []        while 1:            i = self.readSentence();            if len(i) == 0: continue            reply = i[0]            attrs = {}            for w in i[1:]:                j = w.find(‘=‘, 1)                if (j == -1):                    attrs[w] = ‘‘                else:                    attrs[w[:j]] = w[j + 1:]            r.append((reply, attrs))            if reply == ‘!done‘: return r

def writeSentence(self, words):        ret = 0        for w in words:            self.writeWord(w)            ret += 1        self.writeWord(‘‘)        return ret

def readSentence(self):        r = []        while 1:            w = self.readWord()            if w == ‘‘: return r            r.append(w)

def writeWord(self, w):        print(("<<< " + w))        self.writeLen(len(w))        self.writeStr(w)

def readWord(self):        ret = self.readStr(self.readLen())        print((">>> " + ret))        return ret

def writeLen(self, l):        if l < 0x80:            self.writeStr(chr(l))        elif l < 0x4000:            l |= 0x8000            self.writeStr(chr((l >> 8) & 0xFF))            self.writeStr(chr(l & 0xFF))        elif l < 0x200000:            l |= 0xC00000            self.writeStr(chr((l >> 16) & 0xFF))            self.writeStr(chr((l >> 8) & 0xFF))            self.writeStr(chr(l & 0xFF))        elif l < 0x10000000:            l |= 0xE0000000            self.writeStr(chr((l >> 24) & 0xFF))            self.writeStr(chr((l >> 16) & 0xFF))            self.writeStr(chr((l >> 8) & 0xFF))            self.writeStr(chr(l & 0xFF))        else:            self.writeStr(chr(0xF0))            self.writeStr(chr((l >> 24) & 0xFF))            self.writeStr(chr((l >> 16) & 0xFF))            self.writeStr(chr((l >> 8) & 0xFF))            self.writeStr(chr(l & 0xFF))

def readLen(self):        c = ord(self.readStr(1))        if (c & 0x80) == 0x00:            pass        elif (c & 0xC0) == 0x80:            c &= ~0xC0            c <<= 8            c += ord(self.readStr(1))        elif (c & 0xE0) == 0xC0:            c &= ~0xE0            c <<= 8            c += ord(self.readStr(1))            c <<= 8            c += ord(self.readStr(1))        elif (c & 0xF0) == 0xE0:            c &= ~0xF0            c <<= 8            c += ord(self.readStr(1))            c <<= 8            c += ord(self.readStr(1))            c <<= 8            c += ord(self.readStr(1))        elif (c & 0xF8) == 0xF0:            c = ord(self.readStr(1))            c <<= 8            c += ord(self.readStr(1))            c <<= 8            c += ord(self.readStr(1))            c <<= 8            c += ord(self.readStr(1))        return c

def writeStr(self, str):        n = 0;        while n < len(str):            r = self.sk.send(bytes(str[n:], ‘UTF-8‘))            if r == 0: raise RuntimeError("connection closed by remote end")            n += r

def readStr(self, length):        ret = ‘‘        while len(ret) < length:            s = self.sk.recv(length - len(ret))            if s == ‘‘: raise RuntimeError("connection closed by remote end")            ret += s.decode(‘UTF-8‘, ‘replace‘)        return ret

def main():    s = None    for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):        af, socktype, proto, canonname, sa = res        try:            s = socket.socket(af, socktype, proto)        except (socket.error, msg):            s = None            continue        try:            s.connect(sa)        except (socket.error, msg):            s.close()            s = None            continue        break    if s is None:        print(‘could not open socket‘)        sys.exit(1)

apiros = ApiRos(s);    apiros.login(sys.argv[2], sys.argv[3]);

#模拟用户自己输入代码进行api控制    tmpcommand=input(‘请输入你想查询的命令‘)    inputsentence = []    inputsentence.append(tmpcommand)    apiros.writeSentence(inputsentence)    while 1:        x = apiros.readSentence()        #print(x)        if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:            break    #导出ros配置到ROS本地ghg1.rsc文件    # inputsentence = []    #    # inputsentence.append(‘/export‘)    # inputsentence.append(‘=file=ghg1.rsc‘)    # apiros.writeSentence(inputsentence)    # while 1:    #     x = apiros.readSentence()    #     #print(x)    #     if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:    #         break    #从ros上传文件到ftp的代码    # inputsentence = []    # inputsentence.append(‘/tool/fetch‘)    # inputsentence.append(‘=address=192.168.0.108‘)    # inputsentence.append(‘=src-path=ghg1.rsc‘)    # inputsentence.append(‘=user=xxxxx‘)    # inputsentence.append(‘=mode=ftp‘)    # inputsentence.append(‘=password=xxxxx‘)    # inputsentence.append(‘=dst-path=123.rsc‘)    # inputsentence.append(‘=upload=yes‘)    # apiros.writeSentence(inputsentence)    # inputsentence = []    # while 1:    #     x = apiros.readSentence()    #     #print(x)    #     if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:    #         break    #删除文件代码    # inputsentence = []    # inputsentence.append(‘/file/remove‘)    # inputsentence.append(‘=numbers=ghg1.rsc‘)    # apiros.writeSentence(inputsentence)    # while 1:    #     x = apiros.readSentence()    #     print(x)    #     if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:    #         break            #官方循环代码,等待你输入命令行,可以用来测试代码命令行            # while 1:            #     r = select.select([s, sys.stdin], [], [], None)            #     if s in r[0]:            #         # something to read in socket, read sentence            #         x = apiros.readSentence()            #            #     if sys.stdin in r[0]:            #         # read line from input and strip off newline            #         l = sys.stdin.readline()            #         print(l)            #         l = l[:-1]            #         print(l)            #            #            #         # if empty line, send sentence and start with new            #         # otherwise append to input sentence            #         if l == ‘‘:            #             apiros.writeSentence(inputsentence)            #             inputsentence = []            #         else:            #             inputsentence.append(l)

if __name__ == ‘__main__‘:    main()
# !/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:lzd

import sys, time, binascii, socket, select
import hashlib

class ApiRos:
    "Routeros api"

    def __init__(self, sk):
        self.sk = sk
        self.currenttag = 0

    def login(self, username, pwd):
        for repl, attrs in self.talk(["/login"]):
            chal = binascii.unhexlify((attrs[‘=ret‘]).encode(‘UTF-8‘))
        md = hashlib.md5()
        md.update(b‘\x00‘)
        md.update(pwd.encode(‘UTF-8‘))
        md.update(chal)
        self.talk(["/login", "=name=" + username,
                   "=response=00" + binascii.hexlify(md.digest()).decode(‘UTF-8‘)])

    def talk(self, words):
        if self.writeSentence(words) == 0: return
        r = []
        while 1:
            i = self.readSentence();
            if len(i) == 0: continue
            reply = i[0]
            attrs = {}
            for w in i[1:]:
                j = w.find(‘=‘, 1)
                if (j == -1):
                    attrs[w] = ‘‘
                else:
                    attrs[w[:j]] = w[j + 1:]
            r.append((reply, attrs))
            if reply == ‘!done‘: return r

    def writeSentence(self, words):
        ret = 0
        for w in words:
            self.writeWord(w)
            ret += 1
        self.writeWord(‘‘)
        return ret

    def readSentence(self):
        r = []
        while 1:
            w = self.readWord()
            if w == ‘‘: return r
            r.append(w)

    def writeWord(self, w):
        print(("<<< " + w))
        self.writeLen(len(w))
        self.writeStr(w)

    def readWord(self):
        ret = self.readStr(self.readLen())
        print((">>> " + ret))
        return ret

    def writeLen(self, l):
        if l < 0x80:
            self.writeStr(chr(l))
        elif l < 0x4000:
            l |= 0x8000
            self.writeStr(chr((l >> 8) & 0xFF))
            self.writeStr(chr(l & 0xFF))
        elif l < 0x200000:
            l |= 0xC00000
            self.writeStr(chr((l >> 16) & 0xFF))
            self.writeStr(chr((l >> 8) & 0xFF))
            self.writeStr(chr(l & 0xFF))
        elif l < 0x10000000:
            l |= 0xE0000000
            self.writeStr(chr((l >> 24) & 0xFF))
            self.writeStr(chr((l >> 16) & 0xFF))
            self.writeStr(chr((l >> 8) & 0xFF))
            self.writeStr(chr(l & 0xFF))
        else:
            self.writeStr(chr(0xF0))
            self.writeStr(chr((l >> 24) & 0xFF))
            self.writeStr(chr((l >> 16) & 0xFF))
            self.writeStr(chr((l >> 8) & 0xFF))
            self.writeStr(chr(l & 0xFF))

    def readLen(self):
        c = ord(self.readStr(1))
        if (c & 0x80) == 0x00:
            pass
        elif (c & 0xC0) == 0x80:
            c &= ~0xC0
            c <<= 8
            c += ord(self.readStr(1))
        elif (c & 0xE0) == 0xC0:
            c &= ~0xE0
            c <<= 8
            c += ord(self.readStr(1))
            c <<= 8
            c += ord(self.readStr(1))
        elif (c & 0xF0) == 0xE0:
            c &= ~0xF0
            c <<= 8
            c += ord(self.readStr(1))
            c <<= 8
            c += ord(self.readStr(1))
            c <<= 8
            c += ord(self.readStr(1))
        elif (c & 0xF8) == 0xF0:
            c = ord(self.readStr(1))
            c <<= 8
            c += ord(self.readStr(1))
            c <<= 8
            c += ord(self.readStr(1))
            c <<= 8
            c += ord(self.readStr(1))
        return c

    def writeStr(self, str):
        n = 0;
        while n < len(str):
            r = self.sk.send(bytes(str[n:], ‘UTF-8‘))
            if r == 0: raise RuntimeError("connection closed by remote end")
            n += r

    def readStr(self, length):
        ret = ‘‘
        while len(ret) < length:
            s = self.sk.recv(length - len(ret))
            if s == ‘‘: raise RuntimeError("connection closed by remote end")
            ret += s.decode(‘UTF-8‘, ‘replace‘)
        return ret

def main():
    s = None
    for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
        af, socktype, proto, canonname, sa = res
        try:
            s = socket.socket(af, socktype, proto)
        except (socket.error, msg):
            s = None
            continue
        try:
            s.connect(sa)
        except (socket.error, msg):
            s.close()
            s = None
            continue
        break
    if s is None:
        print(‘could not open socket‘)
        sys.exit(1)

    apiros = ApiRos(s);
    apiros.login(sys.argv[2], sys.argv[3]);

    #模拟用户自己输入代码进行api控制
    tmpcommand=input(‘请输入你想查询的命令‘)
    inputsentence = []
    inputsentence.append(tmpcommand)
    apiros.writeSentence(inputsentence)
    while 1:
        x = apiros.readSentence()
        #print(x)
        if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:
            break
    #导出ros配置到ROS本地ghg1.rsc文件
    # inputsentence = []
    #
    # inputsentence.append(‘/export‘)
    # inputsentence.append(‘=file=ghg1.rsc‘)
    # apiros.writeSentence(inputsentence)
    # while 1:
    #     x = apiros.readSentence()
    #     #print(x)
    #     if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:
    #         break
    #从ros上传文件到ftp的代码
    # inputsentence = []
    # inputsentence.append(‘/tool/fetch‘)
    # inputsentence.append(‘=address=192.168.0.108‘)
    # inputsentence.append(‘=src-path=ghg1.rsc‘)
    # inputsentence.append(‘=user=xxxxx‘)
    # inputsentence.append(‘=mode=ftp‘)
    # inputsentence.append(‘=password=xxxxx‘)
    # inputsentence.append(‘=dst-path=123.rsc‘)
    # inputsentence.append(‘=upload=yes‘)
    # apiros.writeSentence(inputsentence)
    # inputsentence = []
    # while 1:
    #     x = apiros.readSentence()
    #     #print(x)
    #     if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:
    #         break
    #删除文件代码
    # inputsentence = []
    # inputsentence.append(‘/file/remove‘)
    # inputsentence.append(‘=numbers=ghg1.rsc‘)
    # apiros.writeSentence(inputsentence)
    # while 1:
    #     x = apiros.readSentence()
    #     print(x)
    #     if x == [‘!done‘] or x==[‘!re‘, ‘=status=finished‘]:
    #         break
            #官方循环代码,等待你输入命令行,可以用来测试代码命令行
            # while 1:
            #     r = select.select([s, sys.stdin], [], [], None)
            #     if s in r[0]:
            #         # something to read in socket, read sentence
            #         x = apiros.readSentence()
            #
            #     if sys.stdin in r[0]:
            #         # read line from input and strip off newline
            #         l = sys.stdin.readline()
            #         print(l)
            #         l = l[:-1]
            #         print(l)
            #
            #
            #         # if empty line, send sentence and start with new
            #         # otherwise append to input sentence
            #         if l == ‘‘:
            #             apiros.writeSentence(inputsentence)
            #             inputsentence = []
            #         else:
            #             inputsentence.append(l)

if __name__ == ‘__main__‘:
    main()
时间: 2024-10-29 10:47:45

python3+ros api的相关文章

easyradius隆重发布ROS API计费接口,支持ROS 3.3以上版本,实现简单快捷的ROS宽带计费系统云端版

easyradius对接ROS也是使用的是ROS的api接口,即您要开启 /ip /service api服务,不了解api接口的,可以自己百度噢 开启API,就到easyradius配置通讯接口,具体参数,如下: 注意:管理地址和备用地址处不要加:端口,easyradius和ros通讯使用的是默认的8728端口 测试成功还不算完,您还得下载这个脚本:下载地址 解压,把dispppoe.rsc然后上传到ros的Files文件夹,在控制台下运行im dispppoe导入脚本,然后计划中添加24小时

Python3之api

刚到公司领导安排了一个任务,用Python写一个api接口 主要用到django,request,json,orm,HttpResponse 闲话少说上代码 以下是表和需求: IP资源表:(IpSource)        id(主键) area(测试环境,仿真环境,线上环境,其他) address prefix netmask network gateway 状态 备注服务器表:        id(主键) 资产编号 area(测试环境,仿真环境,线上环境,其他) SN 主机名 内网IP 外网

python3 tornado api + angular8 + nginx 跨域问题

问题: 上一个博客部署好了api之后,前端开始吊发现了跨域的问题. 接口地址: http://111.231.201.164/api/houses  服务器上使用的是nginx转发 数据: 前端angular请求 this.http.get('http://111.231.201.164/api/houses').subscribe((res: any) => { console.log(res); }); 目前测试用的Google 和 Firefox两个. Google  浏览器 需要注意的事

ROS(indigo) turtlebot2 + android一些有趣应用

ROS和Android配合使用非常有趣,这里推荐,ROSClinet,使用rosbridge让android和ROS通信: 具体参考奥斯卡的个人剧场:http://xxhong.net/ turtlebot示例非常全面,也给出了Android的Apk,今天尝试一下.实际机器人已测. 考虑到通用性,这里选用仿真环境: ~$ roslaunch turtlebot_gazebo turtlebot_world.launch ~$ roslaunch turtlebot_gazebo gmapping

ROS 进阶学习笔记(16):ROS导航1:关于Costmap_2d Package (代价地图包)

=== 关于Costmap_2d Package === wiki page: http://wiki.ros.org/costmap_2d === 我遇到的问题是 obstacle layer的刷新频率太低 === costmap_2d包下的所有类文档:http://docs.ros.org/hydro/api/costmap_2d/html/annotated.html 其中,值得注意滴是 costmap_2d::ObservationBuffer 这个类,这个类会被 costmap_2d:

使用metaweblog API实现通用博客发布 之 API测试

使用metaweblog API实现通用博客发布 之 API测试 使用博客比较少,一则是文笔有限,怕写出的东西狗屁不通,有碍观瞻, 二则是懒,很讨厌要登录到网站上写东西,也没有那么多时间(借口).个人最喜欢用于记录的工具是Zim https://zim-wiki.org/ ,记录东西超级方便,可惜只支持PC版本, 记录的东西可以到处为MarkDown 格式,非常方便(你现在看到的这篇就是用Zim写的). 无意间看到Vs Code上有博客园的插件,作为程序员,顺手google/百度了一下,原来通用

第十八课 Gazebo仿真器

1.Gazebo概述 在Gazebo中的模拟效果是非常好的. 它的特性 Dynamics Simulation 直接控制物理引擎参数 Building Editor 无需代码即可在Gazebo中创建机器人模型(只用insert就可以插入各种模型,可以在基于Gazebo的云端中创建机器人模型) Advanced 3DGraphics 提高真实性 Sensors 提供激光雷达,摄像头,RGBD摄像头,IMU等传感器 Robot Models 提供多种机器人模型,包括PR2,iRobot,Create

nodelet的理解

1.介绍 nodelet包可以为在相同进程中的多个算法之间实现零拷贝的传输方式. 这个包也提供了实现一个nodelet所需的nodelet基类以及用于实例化nodelet的NodeletLoader类. Source: git https://github.com/ros/nodelet_core.git (branch: indigo-devel) Nodelets旨在提供一种在单机器单进程运行多个算法而不会在进程中传递消息时产生复制成本的方法. roscpp具有在同一节点内的发布和订阅调用之

movit 相关函数(二)

//The kinematics.yaml file 运动学参数文件//该文件是由MoveIt! Setup Assistant生成的记录初始组态的.right_arm:  kinematics_solver: pr2_arm_kinematics/PR2ArmKinematicsPlugin  kinematics_solver_search_resolution: 0.001  kinematics_solver_timeout: 0.05  kinematics_solver_attemp