sqlalchemy 、paramiko

sqlalchemy

一、单表

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)

Base = declarative_base()
# 单表
class Test(Base):
    __tablename__ = ‘test‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))

二、一对多

  1.创建表 创建数据

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)

Base = declarative_base()
# 单表
class Test(Base):
    __tablename__ = ‘test‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))
# 一对多
class Group(Base):
    __tablename__ = ‘group‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))
class User(Base):
    __tablename__ = ‘user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey(‘group.nid‘)) #主动创建外键约束
def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()

session.add(Group(caption=‘dba‘))                            #外键添加group数据
session.add(Group(caption=‘ddd‘))
session.commit()

session.add_all([
    User(username=‘alex1‘,group_id=1),                       #键添加用户
    User(username=‘alex2‘,group_id=2)
])
session.commit()

数据库显示结果
mysql> show tables;
+---------------+
| Tables_in_s13 |
+---------------+
| group         |
| user          |
+---------------+

mysql> select *from `group`;
+-----+---------+
| nid | caption |
+-----+---------+
|   1 | dba     |
|   2 | ddd     |
+-----+---------+

mysql> select *from user;
+-----+----------+----------+
| nid | username | group_id |
+-----+----------+----------+
|   1 | alex1    |        1 |
|   2 | alex2    |        2 |
+-----+----------+----------+

2.查询表(普通查询)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)

Base = declarative_base()

# 一对多
class Group(Base):
    __tablename__ = ‘group‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))
class User(Base):
    __tablename__ = ‘user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey(‘group.nid‘)) #主动创建外键约束

# 只是获取用户
ret = session.query(User).filter(User.username == ‘alex1‘).all()
print(ret)
ret = session.query(User).all()
obj = ret[0]
print(ret)
print(obj)
print(obj.nid)
print(obj.username)
print(obj.group_id)

#以下为查询执行情况
[1 - alex1: 1]
[1 - alex1: 1, 2 - alex2: 2]
1 - alex1: 1
1
alex1
1

#联表查询

ret = session.query(User.username).all()
print(ret)
sql = session.query(User,Group).join(Group, isouter=True)
print(sql)
ret = session.query(User,Group).join(Group, isouter=True).all()
print(ret)
sql = session.query(User.username,Group.caption).join(Group, isouter=True)
print(sql)
ret = session.query(User.username,Group.caption).join(Group, isouter=True).all()
print(ret)

#以下为查询执行情况

[(‘alex1‘,), (‘alex2‘,)]
SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id, "group".nid AS group_nid, "group".caption AS group_caption
FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id
[(1 - alex1: 1, <s1.Group object at 0x000001A74F870978>), (2 - alex2: 2, <s1.Group object at 0x000001A74F870A58>)]
SELECT "user".username AS user_username, "group".caption AS group_caption
FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id
[(‘alex1‘, ‘dba‘), (‘alex2‘, ‘ddd‘)]

  3.优化联表查询

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)
Base = declarative_base()

# 一对多
class Group(Base):
    __tablename__ = ‘group‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))
class User(Base):
    __tablename__ = ‘user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey(‘group.nid‘)) #主动创建外键约束
    # 与生成表结构无关,仅用于查询方便
    group = relationship("Group", backref=‘uuu‘)    # backref  yi

    def __repr__(self):                                  #返回格式
        temp = "%s - %s: %s" %(self.nid, self.username, self.group_id)
        return temp

#优化: User类下添加一行
    group = relationship("Group", backref=‘uuu‘)  

#1
# 原始方式
# ret = session.query(User.username,Group.caption).join(Group, isouter=True).all()
# 新方式(正向查询)
ret = session.query(User).all()                #查询
for obj in ret:
    # obj代指user表的没一行数据
    # obj.group代指group对象,
    print(obj.nid,obj.username,obj.group_id, obj.group,obj.group.nid,obj.group.caption)

#以下为执行结果: 1 alex1 1 <s1.Group object at 0x0000029BB37BE2B0> 1 dba2 alex2 2 <s1.Group object at 0x0000029BB37BE470> 2 ddd
#2
# 原始方式
# ret = session.query(User.username,Group.caption).join(Group, isouter=True).filter(Group.caption == ‘DBA‘).all()
# 新方式(反向查询)
obj = session.query(Group).filter(Group.caption == ‘DBA‘).first()
print(obj.nid)
print(obj.caption)
print(obj.uuu)

sql = session.query(Group).filter(Group.caption == ‘DBA‘)
print(sql)

sql = session.query(Group).get(1)
print(sql)

#以下为执行结果:
1dba[1 - alex1: 1]SELECT "group".nid AS group_nid, "group".caption AS group_caption FROM "group" WHERE "group".caption = :caption_1<s1.Group object at 0x0000024A4A84CB70>

三、多对多

  1.创建表 创建数据

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)

Base = declarative_base()

# 多对多
class HostToHostUser(Base):
    __tablename__ = ‘host_to_host_user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)

    host_id = Column(Integer ,ForeignKey(‘host.nid‘))
    host_user_id = Column(Integer, ForeignKey(‘host_user.nid‘))

class Host(Base): # metaclass,Host.table对象
    __tablename__ = ‘host‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))
class HostUser(Base):
    __tablename__ = ‘host_user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()
session.add_all([                                           #插入数据
    Host(hostname=‘c1‘,port=‘22‘,ip=‘1.1.1.1‘),
    Host(hostname=‘c2‘,port=‘22‘,ip=‘1.1.1.2‘),
    Host(hostname=‘c3‘,port=‘22‘,ip=‘1.1.1.3‘),
    Host(hostname=‘c4‘,port=‘22‘,ip=‘1.1.1.4‘),
    Host(hostname=‘c5‘,port=‘22‘,ip=‘1.1.1.5‘),
])
session.commit()

session.add_all([
    HostUser(username=‘root‘),
    HostUser(username=‘db‘),
    HostUser(username=‘nb‘),
    HostUser(username=‘sb‘),
])
session.commit()

session.add_all([
    HostToHostUser(host_id=1,host_user_id=1),
    HostToHostUser(host_id=1,host_user_id=2),
    HostToHostUser(host_id=1,host_user_id=3),
    HostToHostUser(host_id=2,host_user_id=2),
    HostToHostUser(host_id=2,host_user_id=4),
    HostToHostUser(host_id=2,host_user_id=3),
])
session.commit()

#以下为创建的数据

mysql> select *from `host`;
+-----+----------+------+---------+
| nid | hostname | port | ip      |
+-----+----------+------+---------+
|   1 | c1       | 22   | 1.1.1.1 |
|   2 | c2       | 22   | 1.1.1.2 |
|   3 | c3       | 22   | 1.1.1.3 |
|   4 | c4       | 22   | 1.1.1.4 |
|   5 | c5       | 22   | 1.1.1.5 |
+-----+----------+------+---------+

mysql> select *from `host_user`;
+-----+----------+
| nid | username |
+-----+----------+
|   1 | root     |
|   2 | db       |
|   3 | nb       |
|   4 | sb       |
+-----+----------+

mysql> select *from `host_to_host_user`;
+-----+---------+--------------+
| nid | host_id | host_user_id |
+-----+---------+--------------+
|   1 |       1 |            1 |
|   2 |       1 |            2 |
|   3 |       1 |            3 |
|   4 |       2 |            2 |
|   5 |       2 |            4 |
|   6 |       2 |            3 |
+-----+---------+--------------+

  2.查询表(普通查询)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)

Base = declarative_base()

# 多对多
class HostToHostUser(Base):
    __tablename__ = ‘host_to_host_user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)

    host_id = Column(Integer ,ForeignKey(‘host.nid‘))
    host_user_id = Column(Integer, ForeignKey(‘host_user.nid‘))

class Host(Base): # metaclass,Host.table对象
    __tablename__ = ‘host‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))

    host_user = relationship(‘HostUser‘, secondary=lamda:HostToHostUser.__table__, backref=‘h‘)  #secondary指定通过哪个中间表关联
来进行反向查询

class HostUser(Base):
    __tablename__ = ‘host_user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))

#init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()

# 1
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
host_obj.nid
session.query()
session.query(Host.nid).filter(Host.hostname == ‘c1‘)

# 2,所有用户ID
host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()

print(host_2_host_user)
# [(1,), (2,), (3,)]
r = zip(*host_2_host_user)
#\print(list(r)[0])
# [1,2,3]
# 3、根据用户ID找到所有用户
users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
print(users)

#以下为执行结果:
[(1,), (2,), (3,)]
[(‘root‘,), (‘db‘,), (‘nb‘,)]

  3.优化联表查询

    #!/usr/bin/env python

# -*- coding:utf-8 -*-
# Author:wuwenyu

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,Table
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:[email protected]:3307/s13", max_overflow=5)
Base = declarative_base()
class Host(Base):
    __tablename__ = ‘host‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))
    host_user = relationship(‘HostUser‘, secondary=lambda:HostToHostUser.__table__, backref=‘h‘)

    def __repr__(self):
        temp = "%s - %s: %s" % (self.nid, self.username, self.group_id)
        return temp

class HostUser(Base):
    __tablename__ = ‘host_user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))

class HostToHostUser(Base):
    __tablename__ = ‘host_to_host_user‘
    nid = Column(Integer, primary_key=True,autoincrement=True)
    host_id = Column(Integer,ForeignKey(‘host.nid‘))
    host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘))
    host = relationship("Host", backref=‘h‘)
    # hostUser = relationship("Host_user", backref=‘u‘)
#init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()

#原始的方式
#obj=session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == ‘c1‘))))

#新方式(反向查询)
host_obj = session.query(Host).filter(Host.hostname==‘c1‘).first()
print(host_obj.nid)
print(host_obj.hostname)
# 第三表对应的对象#类HostToHostUser  添加 :host = relationship("Host", backref=‘h‘) 则可以使用
print(host_obj.h)
# # 循环获取的第三表对应的对象
# for item in host_obj.h:
#     print(item.host_user,item.host_user.nid,item.host_user.username)

#Host  类中添加:host_user = relationship(‘HostUser‘, secondary=lambda:HostToHostUser.__table__, backref=‘h‘) 可以使用

print(host_obj.host_user)
for item in host_obj.host_user:
    print(item.username)

#以下为查询结果

1
c1
[<s1.HostToHostUser object at 0x000002315C9FCA58>, <s1.HostToHostUser object at 0x000002315CA140F0>, <s1.HostToHostUser object at 0x000002315CA14160>]

[<s1.HostUser object at 0x000002315CA14710>, <s1.HostUser object at 0x000002315CA14780>, <s1.HostUser object at 0x000002315CA147F0>]
root
db
nb

paramiko

#paramiko  登陆 代码  +tab键 #python2适用

#encoding:utf-8
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
# from paramiko.py3compat import u

tran = paramiko.Transport((‘127.0.0.01, 22,))
tran.start_client()
tran.auth_password(‘root‘, ‘123‘)

# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()

# 获取原tty属性
oldtty = termios.tcgetattr(sys.stdin)
try:
    # 为tty设置新属性
    # 默认当前tty设备属性:
    #   输入一行回车,执行
    #   CTRL+C 进程退出,遇到特殊字符,特殊处理。

    # 这是为原始模式,不认识所有特殊符号
    # 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
    tty.setraw(sys.stdin.fileno())
    chan.settimeout(0.0)

    while True:
        # 监视 用户输入 和 远程服务器返回数据(socket)
        # 阻塞,直到句柄可读
        r, w, e = select.select([chan, sys.stdin], [], [], 1)
        if chan in r:
            try:
                x = chan.recv(1024)
                if len(x) == 0:
                    print(‘\r\n*** EOF\r\n‘)
                    break
                sys.stdout.write(x)
                sys.stdout.flush()
            except socket.timeout:
                pass
        if sys.stdin in r:
            x = sys.stdin.read(1)
            if len(x) == 0:
                break
            chan.send(x)

finally:
    # 重新设置终端属性
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)

chan.close()
tran.close()

#完美  #python2适用

#encoding:utf-8
import paramiko
import sys
import os
import socket
import getpass
import time
from paramiko.py3compat import u

# windows does not have termios...
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False

def interactive_shell(chan):
    if has_termios:
        posix_shell(chan)
    else:
        windows_shell(chan)

def posix_shell(chan):
    import select

    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        chan.settimeout(0.0)
        log = open(‘handle.log‘, ‘a+‘)
        flag = False
        temp_list = []
        while True:
            r, w, e = select.select([chan, sys.stdin], [], [])
            if chan in r:
                try:
                    x = u(chan.recv(1024))
                    if len(x) == 0:
                        sys.stdout.write(‘\r\n*** EOF\r\n‘)
                        break
                    if flag:
                        if x.startswith(‘\r\n‘):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    #print(x)
                    sys.stdout.write(x)
                    sys.stdout.flush()
                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == ‘\t‘:
                    flag = True
                else:
                    temp_list.append(x)
                if x == ‘\r‘:
            print temp_list
                    if ‘‘.join(temp_list) == "su -\r":
                        print("aaaaaaaaaaaaa")
                    date = time.ctime()
                    log.write("%s  wuwenyui %s \n" % (‘‘.join(temp_list),str(date)))
                    log.flush()
                    temp_list=[]
                chan.send(x)

    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)

def windows_shell(chan):
    import threading

    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")

    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(chan,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            chan.send(d)
    except EOFError:
        # user hit ^Z or F6
        pass

def run():
    tran = paramiko.Transport((‘127.0.0.1‘, 1715,))
    tran.start_client()
    tran.auth_password(‘root‘, ‘123‘)

    # 打开一个通道
    chan = tran.open_session()
    # 获取一个终端
    chan.get_pty()
    # 激活器
    chan.invoke_shell()

    interactive_shell(chan)

    chan.close()
    tran.close()

if __name_

#增加输入iput 以及日志 #python2适用

#encoding:utf-8
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
from paramiko.py3compat import u

# windows does not have termios...
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False

def interactive_shell(chan):
    if has_termios:
        posix_shell(chan)
    else:
        windows_shell(chan)

def posix_shell(chan):
    import select

    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        chan.settimeout(0.0)
        log = open(‘handle.log‘, ‘a+‘)
        flag = False
        temp_list = []
        while True:
            r, w, e = select.select([chan, sys.stdin], [], [])
            if chan in r:
                try:
                    x = u(chan.recv(1024))
                    if len(x) == 0:
                        sys.stdout.write(‘\r\n*** EOF\r\n‘)
                        break
                    if flag:
                        if x.startswith(‘\r\n‘):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    sys.stdout.write(x)
                    sys.stdout.flush()
                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == ‘\t‘:
                    flag = True
                else:
                    temp_list.append(x)
                if x == ‘\r‘:
                    log.write(‘‘.join(temp_list))
                    log.flush()
                    temp_list=[]
                chan.send(x)

    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)

def windows_shell(chan):
    import threading

    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")

    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(chan,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            chan.send(d)
    except EOFError:
        # user hit ^Z or F6
        pass

def run():
    default_username = getpass.getuser()
    username = raw_input(‘Username [%s]: ‘ % default_username)
    if len(username) == 0:
        username = default_username

    hostname = raw_input(‘Hostname: ‘)
    print (hostname)
    if len(hostname) == 0:
        print(‘*** Hostname required.‘)
        sys.exit(1)
    tran = paramiko.Transport((hostname, 1715,))
    #tran = paramiko.Transport((hostname, 1715,))
    tran.start_client()

    default_auth = "p"
    auth = raw_input(‘Auth by (p)assword or (r)sa key[%s] ‘ % default_auth)
    if len(auth) == 0:
        auth = default_auth

    if auth == ‘r‘:
        default_path = os.path.join(os.environ[‘HOME‘], ‘.ssh‘, ‘id_rsa‘)
        path = raw_input(‘RSA key [%s]: ‘ % default_path)
        if len(path) == 0:
            path = default_path
        try:
            key = paramiko.RSAKey.from_private_key_file(path)
        except paramiko.PasswordRequiredException:
            password = getpass.getpass(‘RSA key password: ‘)
            key = paramiko.RSAKey.from_private_key_file(path, password)
        tran.auth_publickey(username, key)
    else:

        pw = getpass.getpass(‘Password for %[email protected]%s: ‘ % (username, hostname))
        print(pw)
        tran.auth_password(username, pw)

    # 打开一个通道
    chan = tran.open_session()
    # 获取一个终端
    chan.get_pty()
    # 激活器
    chan.invoke_shell()

    interactive_shell(chan)

    chan.close()
    tran.close()

if __name__ == ‘__main__‘:
    run()
时间: 2024-08-09 02:17:44

sqlalchemy 、paramiko的相关文章

Python之操作Redis、 RabbitMQ、SQLAlchemy、paramiko、mysql

一.Redis Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,并提供多种语言的API.Redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原

python16_day10【SelectWeb、SelectWget、paramiko、pymysql】

一.select实现Web框架(自定义WEB框架) 1 import select 2 import socket 3 4 5 class Flask(object): 6 def __init__(self, routers): 7 self.routers = routers 8 9 def process_data(self, client): 10 data = bytes() 11 while True: 12 try: 13 trunk = client.recv(1024) # 没

python使用mysql的三个模块:mysql.connector、sqlalchemy、MySQLdb

在python中使用mysql其实很简单,只要先安装对应的模块即可,那么对应的模块都有什么?官方也没指定也没提供,pcat就推荐自己遇到的3个模块:mysql.connector.sqlalchemy.MySQLdb ------------------ 1. 安装mysql.connector MySQL Connector/Python is implementing the MySQL Client/Server protocol completely in Python. No MySQ

Python 模块续 configparser、shutil、XML、paramiko、系统命令、

一.configparse # 注释1 ; 注释2 [section1] # 节点 k1 = v1 # 值 k2:v2 # 值 [section2] # 节点 k1 = v1 # 值 1.获取所有节点 import configparser config = configparser.ConfigParser() config.read('test',encoding='utf-8') ret = config.sections() print(ret)#['section1', 'sectio

scp、paramiko、rsync复制文件的区别

1.paramiko只能复制文件,而不能复制目录,复制时,已经存在的会被覆盖;要想复制目录,只能把目录里的文件一个一个复制过去 2.scp可以复制文件.目录,复制时,已经存在的会被覆盖:可以模糊匹配:scp *.jar [email protected]:~/aa:可以递归复制,参数-r 3.rsync功能比较强大,复制时,可以选择覆盖或者不覆盖:可以复制目录.文件:可以模糊匹配等:可以递归复制,参数-r 4.scp还非常不占资源,不会提高多少系统负荷,在这一点上,rsync就远远不及它了.虽然

scp、paramiko、rsync上传下载限流、限速、速度控制方法

1.scp限速 scp -l 800 a.txt  [email protected]:/home/admin/downloads 此时的传输速率就是800/8=100KB左右 man -a scp查看参数含义.注意单位是bit 2.rsync是用来同步更新的,也可以用来上传文件,但是不建议这样使用 man -a rsync查看参数帮助信息 使用rsync实现限速100KB/s   rsync -auvzP --bwlimit=100 本地文件 远程文件 参数说明:  v:详细提示   a:以a

代码发布项目(二)——django实现websocket(使用channels)、基于channels实现群聊功能、gojs插件、paramiko模块

一.django实现websocket django默认是不支持websocket,只支持http协议 在django中如果想要基于websocket开发项目 你需要安装模块:channles pip3 install channels==2.3 版本不要使用最新的,如果安装最新的可能会自动把你的django版本升级到最新版 对应的解释器环境建议使用3.6(官网的说法:3.5可能有问题,3.7可能也有问题...具体原因没有给解释) channels模块内部已经帮我们封装好了 握手/加密/解密 面

python运维开发(十三)----SQLalchemy和paramiko续

内容目录: ORM架构SQLalchemy Paramiko SQLalchemy对表的操作 使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作.根据类创建对象,对象转换成SQL,执行SQL. 1.创建表 # 单表 class Test(Base): __tablename__ = 'test' nid = Column(Integer, primary_key=True,

python之SQLAchemy、paramiko

SQLAchemy SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果. 一.底层处理 使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句. #!/usr/bin/env python # -*- coding:u