课程大纲
一、ORM
连表
一对多
1、创建表,主动指定外键约束
2、操作
类:repr
单表
连表
session.query(表1).join(表2).all()
多对多
1、创建表,额外的关系表
2、filter()
==
int_( 都可以是另外一个查询)
3、relationship
A
AB ==> fk, 关系
B
4、简单
A 关系(B,ABTable对象)
AB ==>fk,
B
操作时,简单
Table对象:
A 关系(B,Table对象方法)
Table对象方法 AB ==>fk,
B
操作时,简单
A 关系(B,AB.__table__)
AB ==>fk,
B
操作时,简单
1、创建表
2、操作表
#单表
#连表
.join
关系:
一对多
fk,关系
多对多,
多一张表 fk,fk
1、 关系表:关系
2、A:关系,(B,AB)
二、paramiko模块
SSHClient
用户名、密码:
SSHClient,Tranport
SFTPClient:
用户名、密码
Tranport
需求:
命令、上传文件、命令
import paramiko
Tranport
Tranport关闭
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, password=‘123‘)
# 执行命令
stdin, stdout, stderr = ssh.exec_command(‘ls‘)
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()
import paramiko
transport = paramiko.Transport((‘hostname‘,22))
transport.connect(username=‘wupeiqi‘,password=‘123‘)
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘)
# 将remove_path 下载到本地 local_path
sftp.get(‘remove_path‘, ‘local_path‘)
transport.close()
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, password=‘123‘)
# 执行命令
stdin, stdout, stderr = ssh.exec_command(‘ls‘)
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()
连接:
堡垒机
三、前端:HTMl
sqlalchemy 使用
1. 单表操作
2. join 连接操作,多表
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:Minghu Wang
"""
目的: 创建用于存储主机用户和组,和其对应的关系,其用户属于那个组
实现: sqlalchemy orm框架
1.创建三张表,user,group,test(用户存储组和用户的对应关系,用外键实现)
2.pymysql实现连接,class类实现创建表,session实现增删改查
3.__repl__实现格式化字符串打印,默认以类原始数据输出,用此方法以列表形式的字符串输出
"""
from sqlalchemy import create_engine #创建表模块
from sqlalchemy.ext.declarative import declarative_base #对象的基类
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index #支持数据类型的模块
from sqlalchemy.orm import sessionmaker, relationship #格式化查询模块
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s13", max_overflow=5) # 初始化数据库连接
Base = declarative_base() # 创建对象的基类
# 单表
class Test(Base): #创建表必须生成类
__tablename__ = ‘test‘ #表名
nid = Column(Integer, primary_key=True,autoincrement=True) #Integer整型,primary_key主键,autoincrement自增ID
name = Column(String(32)) #定义列
# 一对多
class Group(Base):
__tablename__ = ‘group‘
nid = Column(Integer, primary_key=True,autoincrement=True)
caption = Column(String(32))
def __repr__(self): # 只用于格式化打印,针对调取
#temp1 = "%s - %s" %(self.nid, self.caption) #打印组id和名对应的职位关系,调取那个值都可以
temp1 = "%s" %(self.caption) #打印组id和名对应的职位关系,调取那个值都可以
return temp1
class User(Base):
__tablename__ = ‘user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
group_id = Column(Integer, ForeignKey(‘group.nid‘)) #ForeignKey外键,是group的外键
# 以列表方式输出,便于以后分隔,否则将以原始对像出
def __repr__(self): # 只用于格式化打印,针对调取
temp = "%s - %s: %s" %(self.nid, self.username, self.group_id) #打印用户的id和名,组id,调取那个值都可以
return temp
def init_db():
Base.metadata.create_all(engine) #创建表
def drop_db():
Base.metadata.drop_all(engine) #删除表
# 第一步,创建表结构
# init_db()
# 第二步 , 1.创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 第二步, 2.在group表中添加数据
# session.add(Group(caption=‘dba‘)) #给列caption添加数据
# session.add(Group(caption=‘ops‘))
# session.commit() #提交数据
# 第二步,3.在user表表添加数据,加外键码
# session.add_all([
# User(username=‘alex1‘,group_id=1), #给user表的列添加数据,并指定外键,指定了类型,就有了限制,如果外键表group中没有对应的ID是无法添加的
# User(username=‘alex2‘,group_id=2)
# ])
# session.commit()
# 第三步,1.获取单表数据
# 只是获取用户
# 单个针对查询
# ret = session.query(User).filter(User.username == ‘1‘).all() #query为事务操作,filter为指定条件语句,使用的是__repl__方法
# print(ret)
# 查询表所有数据及单独抓取
# ret = session.query(User).all() #查看user表的所有数据
# obj = ret[0] #获取第一行数据,ret[1]为第二行数据,依此类推
# print(ret)
# print(obj)
# print(obj.nid) #获取用户id
# print(obj.username) #获取用户
# print(obj.group_id) #获取组id
# 第三步,2.联表查询,同时查user和gruop数据
ret = session.query(User.username).all() #只查用户
print(ret)
sql = session.query(User,Group).join(Group, isouter=True) #没有加.all()为显示sql数句
print(sql)
ret = session.query(User,Group).join(Group, isouter=True).all() #打印用户所属组id及对应的组
print(ret)
在group表中创始虚拟字段,用于查询把外键相同的数据
"""
功能:查询一条user表信息
1.使用relationship(‘Group‘,backref=‘uuu‘) 定义正反向查询,正向从user到Group的group表,反向是在Group中的虚拟表uuu查user中数据
2.反向调用为 print(obj.uuu)
"""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s13", max_overflow=5)
Base = declarative_base()
# 单表
class Test(Base):
__tablename__ = ‘test‘
nid = Column(Integer, primary_key=True,autoincrement=True)
name = Column(String(32))
# 一对多
class Group(Base):
__tablename__ = ‘group‘
nid = Column(Integer, primary_key=True,autoincrement=True)
caption = Column(String(32))
def __repr__(self):
temp1 = "%s" %(self.caption) #打印组id和名对应的职位关系,调取那个值都可以
return temp1
class User(Base):
__tablename__ = ‘user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
group_id = Column(Integer, ForeignKey(‘group.nid‘))
# 新方式查询,便利的调用关系模块,提供正向调用和反向调用(backref为反向)
group = relationship(‘Group‘,backref=‘uuu‘) #在group表中创始虚拟字段,用于查询外键相同的数据
def __repr__(self):
temp = "%s - %s: %s" %(self.nid, self.username, self.group_id)
return temp
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
# 第一步,创建表结构
# init_db()
# 第二步 , 1.表操作
Session = sessionmaker(bind=engine)
session = Session()
# 第二步, 2.在group中添加数据
# session.add(Group(caption=‘dba‘))
# session.add(Group(caption=‘ddd‘))
# session.commit()
# 第二步,3.在User表添加数据,加外键码
# session.add_all([
# User(username=‘alex1‘,group_id=1),
# User(username=‘alex2‘,group_id=2)
# ])
# session.commit()
# 第三步,1.获取单表数据
# 只是获取用户
# ret = session.query(User).filter(User.username == ‘alex1‘).all()
# print(ret)
# ret = session.query(User).all()
# obj = ret[0]
# print(ret)
# print(obj)
# print(obj.nid)
# print(obj.username)
# print(obj.group_id)
# 第三步,2.联表查询,同时查user和gruop数据
# ret = session.query(User.username).all()
# print(ret)
# sql = session.query(User,Group).join(Group, isouter=True)
# print(sql)
# ret = session.query(User,Group).join(Group, isouter=True).all()
# print(ret)
# 新方式的两种查询,由于一个用户对应的关系的所有信息,展示在一行
# 新方式(正向查询) , 从User 到group
# ret = session.query(User).all()
# for obj in ret:
# # obj 代指user表的一行数据
# # obj.group 代指group对象
# print(obj.nid,obj.username,obj.group_id,obj.group,obj.group.nid,obj.group.caption)
# 原始方式
# ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == ‘DBA‘).all()
# print(ret)
# 新方式(反向查询) 从Group的uuu 到User,查询是DBA都有谁
obj = session.query(Group).filter(Group.caption == ‘DBA‘).first()
print(obj.nid)
print(obj.caption)
print(obj.uuu) #使用uuu,查询出一条user表信息
---------结果
1
dba
[1 - alex1: 1] #使用uuu,查询出一条user表信息
查询一个主机对应的用户
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s13", max_overflow=5)
Base = declarative_base()
# 创建表结构, host ,host_user,host_to_host_user(主机,用户,主机用户的对应关系)
class Host(Base):
__tablename__ = ‘host‘
nid = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32))
class HostUser(Base):
__tablename__ = ‘host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
password = Column(String(32))
class HostToHostUser(Base):
__tablename__ = ‘host_to_host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
host_id = Column(Integer,ForeignKey(‘host.nid‘)) #外键1
host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘)) #外键2
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
#第一步,执行创建表结构
# init_db()
# drop_db()
# 第二步 , 1.表操作
Session = sessionmaker(bind=engine)
session = Session()
# 第三步 1.插入数据 host表,主机信息
# session.add_all([
# Host(hostname=‘c1‘,port=‘22‘,ip=‘172.16.0.3‘),
# Host(hostname=‘c2‘,port=‘22‘,ip=‘172.16.0.4‘),
# Host(hostname=‘c3‘,port=‘22‘,ip=‘172.16.0.5‘),
# Host(hostname=‘c4‘,port=‘22‘,ip=‘172.16.0.6‘),
# Host(hostname=‘c5‘,port=‘22‘,ip=‘172.16.0.7‘),
# ])
# session.commit()
# 第三步 2.插入数据 User表,用户信息
# session.add_all([
# HostUser(username=‘root‘,password=‘[email protected]‘),
# HostUser(username=‘db‘,password=‘[email protected]‘),
# HostUser(username=‘nb‘,password=‘[email protected]‘),
# HostUser(username=‘sb‘,password=‘[email protected]‘),
# ])
# session.commit()
# 第三步 3.插入数据 HostToHostUser表,主机和用户对应关系
# session.add_all([
# HostToHostUser(host_id=1,host_user_id=1),
# HostToHostUser(host_id=1,host_user_id=2),
# HostToHostUser(host_id=1,host_user_id=3),
# HostToHostUser(host_id=2,host_user_id=2),
# HostToHostUser(host_id=2,host_user_id=4),
# HostToHostUser(host_id=2,host_user_id=3),
# ])
# session.commit()
# 第四步,获取主机1中所有用户,使用查出主机对应的ID和用户ID,相等则输出
# 取c1主机信息,在hosttohostuser对应关系表中找和c1中的nid相等的host_id即输出,不相等则不输出,"输出的是对应关系表中条件符合的nid号,从而判断有几个"
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
# # host_obj.nid
host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()
# print(host_2_host_user)
# 输出结果[(1,),(2,),(3,)] ,"输出的是对应关系表中条件符合的nid号,从而判断有几个“”
r = zip(*host_2_host_user) #zip为转化数据为元组输出
# print(list(r)[0])
#输出结果 (1, 2, 3)
users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all() #查询c1主机有多少个对应用户
print(users)
-------结果
[(‘root‘,), (‘db‘,), (‘nb‘,)]
多对多,简化查询代码
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:Minghu Wang
‘‘‘
简化查询,给出一个主机条件,就可以查到对应有多少用户,利用relationship实现
‘‘‘
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s13", max_overflow=5)
Base = declarative_base()
# 创建表结构, host ,host_user,host_to_host_user(主机,用户,主机用户的对应关系)
class Host(Base):
__tablename__ = ‘host‘
nid = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32))
class HostUser(Base):
__tablename__ = ‘host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
password = Column(String(32))
class HostToHostUser(Base):
__tablename__ = ‘host_to_host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
host_id = Column(Integer,ForeignKey(‘host.nid‘)) #外键1
host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘)) #外键2
host = relationship("Host",backref=‘h‘) # 创建虚拟表 ,多对多,简化查询
host_user = relationship("HostUser",backref=‘u‘)
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
# 第一步,执行创建表结构
# init_db()
# drop_db()
# 第二步 , 1.表操作
Session = sessionmaker(bind=engine)
session = Session()
# 第三步 1.插入数据 host表,主机信息
# session.add_all([
# Host(hostname=‘c1‘,port=‘22‘,ip=‘172.16.0.3‘),
# Host(hostname=‘c2‘,port=‘22‘,ip=‘172.16.0.4‘),
# Host(hostname=‘c3‘,port=‘22‘,ip=‘172.16.0.5‘),
# Host(hostname=‘c4‘,port=‘22‘,ip=‘172.16.0.6‘),
# Host(hostname=‘c5‘,port=‘22‘,ip=‘172.16.0.7‘),
# ])
# session.commit()
# 第三步 2.插入数据 User表,用户信息
# session.add_all([
# HostUser(username=‘root‘,password=‘[email protected]‘),
# HostUser(username=‘db‘,password=‘[email protected]‘),
# HostUser(username=‘nb‘,password=‘[email protected]‘),
# HostUser(username=‘sb‘,password=‘[email protected]‘),
# ])
# session.commit()
# 第三步 3.插入数据 HostToHostUser表,主机和用户对应关系
# session.add_all([
# HostToHostUser(host_id=1,host_user_id=1),
# HostToHostUser(host_id=1,host_user_id=2),
# HostToHostUser(host_id=1,host_user_id=3),
# HostToHostUser(host_id=2,host_user_id=2),
# HostToHostUser(host_id=2,host_user_id=4),
# HostToHostUser(host_id=2,host_user_id=3),
# ])
# session.commit()
# 原始查询,一行语句
# session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == ‘c1‘))))
# 多对多,简化查询,只要给出一个条件就可以查到结果(内部实现多条语句判断,使用relationship实现)
# 第三步,获取主机1中所有用户,使用查出主机对应的ID和用户ID,相等则输出
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
for item in host_obj.h:
print(item.host_user.username)
---------结果
root
db
nb
最简单的方法查询
‘‘‘
需要改变class的关系表顺序
‘‘‘
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s13", max_overflow=5)
Base = declarative_base()
class HostToHostUser(Base):
__tablename__ = ‘host_to_host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
host_id = Column(Integer,ForeignKey(‘host.nid‘)) #外键1
host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘)) #外键2
# host = relationship("Host",backref=‘h‘) # 多对多,简化查询
# host_user = relationship("HostUser",backref=‘u‘)
# 创建表结构, host ,host_user,host_to_host_user(主机,用户,主机用户的对应关系)
class Host(Base):
__tablename__ = ‘host‘
nid = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32))
host_user = relationship(‘HostUser‘,secondary=HostToHostUser.__table__,backref=‘h‘)
class HostUser(Base):
__tablename__ = ‘host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
password = Column(String(32))
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
# 第一步,执行创建表结构
# init_db()
# 第二步 , 1.表操作
Session = sessionmaker(bind=engine)
session = Session()
# 多对多,简化查询
# 第三步,获取主机1中所有用户,使用查出主机对应的ID和用户ID,相等则输出
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
print(host_obj.host_user)
for item in host_obj.host_user:
print(item.username)
最最简单的查询,加lambda
‘‘‘
不需要改变class的关系表顺序
‘‘‘
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3306/s13", max_overflow=5)
Base = declarative_base()
# 创建表结构, host ,host_user,host_to_host_user(主机,用户,主机用户的对应关系)
class Host(Base):
__tablename__ = ‘host‘
nid = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32))
host_user = relationship(‘HostUser‘,secondary = lambda: HostToHostUser.__table__,backref=‘h‘) #最重要的一句
class HostUser(Base):
__tablename__ = ‘host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
password = Column(String(32))
class HostToHostUser(Base):
__tablename__ = ‘host_to_host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
host_id = Column(Integer,ForeignKey(‘host.nid‘)) #外键1
host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘)) #外键2
# host = relationship("Host",backref=‘h‘) # 多对多,简化查询
# host_user = relationship("HostUser",backref=‘u‘)
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
# 第一步,执行创建表结构
# init_db()
# 第二步 , 1.表操作
Session = sessionmaker(bind=engine)
session = Session()
# 多对多,简化查询
# 第三步,获取主机1中所有用户,使用查出主机对应的ID和用户ID,相等则输出
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
print(host_obj.host_user)
for item in host_obj.host_user:
print(item.username)
----------结果
[<__main__.HostUser object at 0x00000000036F4978>, <__main__.HostUser object at 0x00000000036F47B8>, <__main__.HostUser object at 0x00000000036F42E8>]
root
db
nb
一个连接transport执行多个操作
import paramiko
import uuid
class SSHConnection(object):
def __init__(self, host=‘172.16.0.2‘, port=22, username=‘root‘,pwd=‘[email protected]‘):
self.host = host
self.port = port
self.username = username
self.pwd = pwd
self.__k = None
def run(self):
self.connect()
pass
self.close()
#一个连接transport执行多个操作
def connect(self): #创建连接
transport = paramiko.Transport((self.host,self.port))
transport.connect(username=self.username,password=self.pwd)
self.__transport = transport
def close(self): #关闭连接
self.__transport.close()
def cmd(self, command):
ssh = paramiko.SSHClient()
ssh._transport = self.__transport
# 执行命令
stdin, stdout, stderr = ssh.exec_command(command)
# 获取命令结果
result = stdout.read()
return result
def upload(self,local_path, target_path):
# 连接,上传
sftp = paramiko.SFTPClient.from_transport(self.__transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(local_path, target_path)
ssh = SSHConnection()
ssh.connect() #执行创建连接
#执行命令
r1 = ssh.cmd(‘df‘) #第一次操作 执行命令
print(r1)
#上传文件
ssh.upload(‘s2.py‘, "/opt/s7.py") #第二次操作 上传文件
ssh.close() #执行关闭连接
利用本地终端操作远端,回车输出,版本1,监听一行
import paramiko
import sys
import os
import socket
import select
import getpass
from paramiko.py3compat import u
tran = paramiko.Transport((‘10.211.55.4‘, 22,))
tran.start_client()
tran.auth_password(‘wupeiqi‘, ‘123‘)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
while True:
# 监视用户输入和服务器返回数据
# sys.stdin 处理用户输入
# chan 是之前创建的通道,用于接收服务器返回信息
readable, writeable, error = select.select([chan, sys.stdin, ],[],[],1)
if chan in readable:
try:
x = u(chan.recv(1024))
if len(x) == 0:
print(‘\r\n*** EOF\r\n‘)
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in readable:
inp = sys.stdin.readline()
chan.sendall(inp)
chan.close()
tran.close()
肆意妄为(二),利用本地终端操作远端,带tab版,监听一个字符
python3.0 代码
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
from paramiko.py3compat import u
tran = paramiko.Transport((‘localhost‘, 22,))
tran.start_client()
tran.auth_password(‘root‘, ‘[email protected]‘)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
# 获取原tty属性
oldtty = termios.tcgetattr(sys.stdin) #实现可TAB方法
try:
# 为tty设置新属性
# 默认当前tty设备属性:
# 输入一行回车,执行
# CTRL+C 进程退出,遇到特殊字符,特殊处理。
# 这是为原始模式,不认识所有特殊符号
# 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
tty.setraw(sys.stdin.fileno())
chan.settimeout(0.0)
while True:
# 监视 用户输入 和 远程服务器返回数据(socket)
# 阻塞,直到句柄可读
r, w, e = select.select([chan, sys.stdin], [], [], 1)
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
print(‘\r\n*** EOF\r\n‘)
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
if len(x) == 0:
break
chan.send(x)
finally:
# 重新设置终端属性
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
chan.close()
tran.close()
肆意妄为(二),利用本地终端操作远端,带tab版,监听一个字符
python2.0 代码
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
# from paramiko.py3compat import u
tran = paramiko.Transport((‘192.168.11.61‘, 22,))
tran.start_client()
tran.auth_password(‘audit‘, ‘123‘)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
# 获取原tty属性
oldtty = termios.tcgetattr(sys.stdin)
try:
# 为tty设置新属性
# 默认当前tty设备属性:
# 输入一行回车,执行
# CTRL+C 进程退出,遇到特殊字符,特殊处理。
# 这是为原始模式,不认识所有特殊符号
# 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
tty.setraw(sys.stdin.fileno())
chan.settimeout(0.0)
while True:
# 监视 用户输入 和 远程服务器返回数据(socket)
# 阻塞,直到句柄可读
r, w, e = select.select([chan, sys.stdin], [], [], 1)
if chan in r:
try:
x = chan.recv(1024)
if len(x) == 0:
print(‘\r\n*** EOF\r\n‘)
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
if len(x) == 0:
break
chan.send(x)
finally:
# 重新设置终端属性
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
chan.close()
tran.close()
-----堡垒机
1. 堡垒机操作
(1) useradd wmh #创建用户
password wmh
(2) vi .bashrc #加入终端脚本内容
/usr/bin/env python3 s7.py
logout
(3)登录堡垒机(wmh用户)
# 会自动执行.bashrc的/usr/bin/env python3 s7.py程序
提示输入远程的机器用户名,密码
终极,模拟终端,windows,linux都可以用-------python3.0版本
import paramiko
import sys
import os
import socket
import getpass
from paramiko.py3compat import u
# windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan)
def posix_shell(chan):
import select
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
log = open(‘handle.log‘, ‘a+‘, encoding=‘utf-8‘)
flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
sys.stdout.write(‘\r\n*** EOF\r\n‘)
break
if flag:
if x.startswith(‘\r\n‘):
pass
else:
temp_list.append(x)
flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
import json
if len(x) == 0:
break
if x == ‘\t‘:
flag = True
else:
temp_list.append(x)
if x == ‘\r‘:
log.write(‘‘.join(temp_list))
log.flush()
temp_list.clear()
chan.send(x)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
def windows_shell(chan):
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass
def run():
default_username = getpass.getuser()
username = input(‘Username [%s]: ‘ % default_username)
if len(username) == 0:
username = default_username
hostname = input(‘Hostname: ‘)
if len(hostname) == 0:
print(‘*** Hostname required.‘)
sys.exit(1)
tran = paramiko.Transport((hostname, 22,))
tran.start_client()
default_auth = "p"
auth = input(‘Auth by (p)assword or (r)sa key[%s] ‘ % default_auth)
if len(auth) == 0:
auth = default_auth
if auth == ‘r‘:
default_path = os.path.join(os.environ[‘HOME‘], ‘.ssh‘, ‘id_rsa‘)
path = input(‘RSA key [%s]: ‘ % default_path)
if len(path) == 0:
path = default_path
try:
key = paramiko.RSAKey.from_private_key_file(path)
except paramiko.PasswordRequiredException:
password = getpass.getpass(‘RSA key password: ‘)
key = paramiko.RSAKey.from_private_key_file(path, password)
tran.auth_publickey(username, key)
else:
pw = getpass.getpass(‘Password for %[email protected]%s: ‘ % (username, hostname))
tran.auth_password(username, pw)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
interactive_shell(chan)
chan.close()
tran.close()
if __name__ == ‘__main__‘:
run()
终极,模拟终端,windows,linux都可以用---------python2.0版本
import paramiko
import sys
import os
import socket
import getpass
from paramiko.py3compat import u
# windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan)
def posix_shell(chan):
import select
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
sys.stdout.write(‘\r\n*** EOF\r\n‘)
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
import json
if len(x) == 0:
break
chan.send(x)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
def windows_shell(chan):
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass
def run():
# 获取当前登录用户
username = raw_input(‘Username ‘)
hostname = raw_input(‘Hostname: ‘)
pwd = raw_input(‘password: ‘)
tran = paramiko.Transport((hostname, 22,))
tran.start_client()
tran.auth_password(username, pwd)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
interactive_shell(chan)
chan.close()
tran.close()
if __name__ == ‘__main__‘:
run()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# auth : pangguoping
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
from paramiko.py3compat import u
#在mac 执行 python3 脚本.py
tran = paramiko.Transport((‘192.168.11.139‘, 22,))
tran.start_client()
tran.auth_password(‘oldboy‘, ‘123‘)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
# 获取原tty属性
oldtty = termios.tcgetattr(sys.stdin)
try:
# 为tty设置新属性
# 默认当前tty设备属性:
# 输入一行回车,执行
# CTRL+C 进程退出,遇到特殊字符,特殊处理。
# 这是为原始模式,不认识所有特殊符号
# 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
tty.setraw(sys.stdin.fileno())
chan.settimeout(0.0)
while True:
# 监视 用户输入 和 远程服务器返回数据(socket)
# 阻塞,直到句柄可读
r, w, e = select.select([chan, sys.stdin], [], [], 1)
if chan in r:
try:
x = u(chan.recv(1024))
if len(x) == 0:
print(‘\r\n*** EOF\r\n‘)
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
if len(x) == 0:
break
chan.send(x)
finally:
# 重新设置终端属性
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
chan.close()
tran.close()
更改后的版本
# coding:utf-8
# Author:Alex Li
import paramiko
import sys
import os
import socket
import getpass
# from paramiko.py3compat import u
# windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan)
def posix_shell(chan):
import select
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
f = open(‘handle.log‘,‘a+‘)
tab_flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = chan.recv(1024)
if len(x) == 0:
sys.stdout.write(‘\r\n*** EOF\r\n‘)
break
if tab_flag:
if x.startswith(‘\r\n‘):
pass
else:
f.write(x)
f.flush()
tab_flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
if len(x) == 0:
break
if x == ‘\t‘:
tab_flag = True
else:
f.write(x)
f.flush()
chan.send(x)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
def windows_shell(chan):
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass
def run():
# 获取当前登录用户
host_list = [
{‘host‘: "192.168.11.139", ‘username‘: ‘oldboy‘, ‘pwd‘: "123"},
{‘host‘: "192.168.11.61", ‘username‘: ‘alex‘, ‘pwd‘: "alex3714"},
{‘host‘: "192.168.11.137", ‘username‘: ‘oldboy‘, ‘pwd‘: "123"},
]
for key,item in enumerate(host_list, 1):
print(key,item[‘host‘])
num = raw_input(‘序号:‘)
sel_host = host_list[int(num) -1]
hostname = sel_host[‘host‘]
username = sel_host[‘username‘]
pwd = sel_host[‘pwd‘]
tran = paramiko.Transport((hostname, 22,))
tran.start_client()
tran.auth_password(username, pwd)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
interactive_shell(chan)
chan.close()
tran.close()
if __name__ == ‘__main__‘:
run()
更改后的3.0版本
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:Minghu Wang
import paramiko
import sys
import os
import socket
import getpass
#from paramiko.py3compat import u
# windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan)
def posix_shell(chan):
import select
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
f = open(‘handle.log‘,‘a+‘)
tab_flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = chan.recv(1024)
if len(x) == 0:
sys.stdout.write(‘\r\n*** EOF\r\n‘)
break
if tab_flag:
if x.startswith(‘\r\n‘):
pass
else:
f.write(x)
f.flush()
tab_flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
if len(x) == 0:
break
if x == ‘\t‘:
tab_flag = True
else:
f.write(x)
f.flush()
chan.send(x)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
def windows_shell(chan):
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
def writeall(sock):
while True:
data = str(sock.recv(256))
if not data:
sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass
def run():
host_list = [
{‘host‘: "172.16.0.2", ‘username‘: ‘root‘, ‘pwd‘: "[email protected]"},
{‘host‘: "192.168.11.61", ‘username‘: ‘alex‘, ‘pwd‘: "alex3714"},
{‘host‘: "192.168.11.137", ‘username‘: ‘oldboy‘, ‘pwd‘: "123"},
]
for key,item in enumerate(host_list, 1):
print(key,item[‘host‘])
num = input(‘序号:‘)
sel_host = host_list[int(num) -1]
hostname = sel_host[‘host‘]
username = sel_host[‘username‘]
pwd = sel_host[‘pwd‘]
tran = paramiko.Transport((hostname, 22,))
tran.start_client()
tran.auth_password(username, pwd)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
interactive_shell(chan)
chan.close()
tran.close()
if __name__ == ‘__main__‘:
run()