一、SQLAlchemy初始化和创建连接
1. SQLAlchemy初始化
# -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import Column from sqlalchemy.types import String, Integer from sqlalchemy.ext.declarative import declarative_base #导入相应的模块 engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test", max_overflow=5) #创建数据库连接,max_overflow指定最大连接数 DBSession = sessionmaker(engine)#创建DBSession类型 session = DBSession()#创建session对象 BaseModel = declarative_base()#创建对象的基类 class User(BaseModel): #定义User对象 __tablename__ = ‘user‘ #创建表,指定表名称 #指定表的结构 id = Column(String, primary_key=True) username = Column(String, index=True) class Session(BaseModel): __tablename__ = ‘session‘ id = Column(String, primary_key=True) user = Column(String, index=True) ip = Column(String)
BaseModel.metadata.create_all(engine)#创建表,执行所有BaseModel类的子类
session.commit()#提交,必须
2. SQLAlchemy创建连接
SQLAlchemy 的连接创建是 Lazy 的方式, 即在需要使用时才会去真正创建. 之前做的工作, 全是"定义".连接的定义是在 engine 中做的.
2.1. Engine
engine 的定义包含了三部分的内容, 一是具体数据库类型的实现, 二是连接池, 三是策略(即engine 自己的实现).
所谓的数据库类型即是 MYSQL , Postgresql , SQLite 这些不同的数据库.
一般创建 engine 是使用 create_engine
方法:
engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test")
参数字符串的各部分的意义:
"mysql+pymysql://mysql:[email protected]:3306/test"
对于这个字符串, SQLAlchemy 提供了工具可用于处理它:
1 # -*- coding: utf-8 -*- 2 3 from sqlalchemy import create_engine 4 from sqlalchemy.engine.url import make_url 5 from sqlalchemy.engine.url import URL 6 7 s = ‘postgresql://[email protected]:5432/bbcustom‘ 8 url = make_url(s) 9 s = URL(drivername=‘postgresql‘, username=‘test‘, password="",host="localhost", port=5432, database="bbcustom") 10 11 engine = create_engine(url) 12 engine = create_engine(s) 13 14 print engine.execute(‘select id from "user"‘).fetchall()
create_engine
函数有很多的控制参数, 这个后面再详细说.
2.2. Engine的策略
create_engine
的调用, 实际上会变成 strategy.create
的调用. 而 strategy 就是 engine 的实现细节. strategy 可以在 create_engine
调用时通过 strategy
参数指定, 目前官方的支持有三种:
- plain, 默认的
- threadlocal, 连接是线程局部的
- mock, 所有的 SQL 语句的执行会使用指定的函数
mock 这个实现, 会把所有的 SQL 语句的执行交给指定的函数来做, 这个函数是由create_engine
的 executor
参数指定:
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 from sqlalchemy.ext.declarative import declarative_base 6 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index 7 from sqlalchemy.orm import sessionmaker, relationship 8 from sqlalchemy import create_engine 9 def f(sql,*args,**kwargs): 10 print(sql,args,kwargs) 11 s="mysql+pymysql://mysql:[email protected]:3306/test" 12 engin=create_engine(s,strategy=‘mock‘,executor=f) 13 print(engin.execute(‘select id from "user"‘))
2.3. 各数据库实现
各数据库的实现在 SQLAlchemy 中分成了两个部分, 一是数据库的类型, 二是具体数据库中适配的客户端实现. 比如对于 Postgresql 的访问, 可以使用 psycopg2
, 也可以使用 pg8000
:
1 s = ‘postgresql+psycopg2://[email protected]:5432/bbcustom‘ 2 s = ‘postgresql+pg8000://[email protected]:5432/bbcustom‘ 3 engine = create_engine(s)
具体的适配工作, 是需要在代码中实现一个 Dialect
类来完成的. 官方的实现在 dialects 目录下.
获取具体的 Dialect 的行为, 则是前面提到的 URL
对象的 get_dialect
方法. create_engine
时你单传一个字符串, SQLAlchemy 自己也会使用 make_url
得到一个 URL
的实例).
2.4. 连接池
SQLAlchemy 支持连接池, 在 create_engine
时添加相关参数即可使用.
- pool_size 连接数
- max_overflow 最多多几个连接
- pool_recycle 连接重置周期
- pool_timeout 连接超时时间
连接池效果:
# -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import Column from sqlalchemy.types import String, Integer from sqlalchemy.ext.declarative import declarative_base engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test", pollsize=2,max_overflow=0) DBSession = sessionmaker(engine) session = DBSession() BaseModel = declarative_base() from threading import Thread def f(): print (engine.execute(‘show databases‘).fetchall()) p = [] for i in range(3): p.append(Thread(target=f)) for t in p: t.start()
3.SQLAlchemy 模型使用
3.1. 模型定义
对于 Table 的定义, 本来是直接的实例化调用, 通过 declarative
的包装, 可以像"定义类"这样的更直观的方式来完成.
user = Table(‘user‘, Column(‘user_id‘, Integer, primary_key = True), Column(‘user_name‘, String(16), nullable = False), Column(‘email_address‘, String(60)), Column(‘password‘, String(20), nullable = False) )
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from sqlalchemy.ext.declarative import declarative_base 4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index 5 from sqlalchemy.orm import sessionmaker, relationship 6 from sqlalchemy import create_engine 7 8 engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test", max_overflow=5) #创建数据库连接 9 DBSession=sessionmaker(engine)#创建了一个自定义了的 Session类 10 session=DBSession() 11 Base = declarative_base()#创建对象的基类 12 13 14 class Blog(Base): 15 __tablename__ = ‘blog‘ 16 17 id = Column(Integer, primary_key=True) 18 title = Column(String(64), server_default=‘‘, nullable=False) 19 text = Column(String, server_default=‘‘, nullable=False) 20 user = Column(String(32), index=True, server_default=‘‘, nullable=False) 21 create = Column(String(32), index=True, server_default=‘0‘, nullable=False) 22 23 24 class User(Base): 25 __tablename__ = ‘user‘ 26 27 id = Column(Integer, primary_key=True) 28 name = Column(String(32), server_default=‘‘, nullable=False) 29 username = Column(String(32), index=True, server_default=‘‘, nullable=False) 30 password = Column(String(64), server_default=‘‘, nullable=False) 31 32 33 def init_db(): 34 Base.metadata.create_all(engine) 35 36 def drop_db(): 37 Base.metadata.drop_all(engine) 38 39 if __name__ == ‘__main__‘: 40 init_db() 41 #drop_db() 42 #Base.metadata.tables[‘user‘].create(engine, checkfirst=True) 43 #Base.metadata.tables[‘user‘].drop(engine, checkfirst=False) 44 #pass
3.2. 创建
1 session = Session() 2 session.add(User(id=1)) #创建一个 3 session.add(Blog(id=1)) 4 session.add_all([ User(id=2),Blog(id=2)]) #创建多个 5 session.commit()
执行的顺序并不一定会和代码顺序一致, SQLAlchemy 自己会整合逻辑再执行.
3.3. 查询
SQLAlchemy 实现的查询非常强大, 写起来有一种随心所欲的感觉.
查询的结果, 有几种不同的类型, 这个需要注意, 像是:
- instance
- instance of list
- keyed tuple of list
- value of list
基本查询:
1 session.query(User).filter_by(username=‘abc‘).all()#查询用户User创建表中username字段等于abc的内容 2 session.query(User).filter(User.username==‘abc‘).all() #平时使用的时候,两者区别主要就是当使用filter的时候条件之间是使用“==",fitler_by使用的是"=" 3 session.query(Blog).filter(Blog.create >= 0).all() #查询Blog对象中创建表中create字段大于等于0的所有内容 4 session.query(Blog).filter(Blog.create >= 0).first() 5 session.query(Blog).filter(Blog.create >= 0 | Blog.title == ‘A‘).first()#条件或 6 session.query(Blog).filter(Blog.create >= 0 & Blog.title == ‘A‘).first()#条件与 7 session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar()# offset(N)从第N条开始返回 limit(N)最多返回 N 条记录 scalar() 如果有记录 8 session.query(User).filter(User.username == ‘abc‘).scalar() 9 session.query(User.id).filter(User.username == ‘abc‘).scalar() 10 session.query(Blog.id).filter(Blog.create >= 0).all() 11 session.query(Blog.id).filter(Blog.create >= 0).all()[0].id 12 dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all()) 13 session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title #记录不存在时,first() 会返回 None 14 session.query(User.id).order_by(‘id desc‘).all()#对结果集进行排序 15 session.query(User.id).order_by(‘id‘).first() 16 session.query(User.id).order_by(User.id).first() 17 session.query(User.id).order_by(-User.id).first() 18 session.query(‘id‘, ‘username‘).select_from(User).all()#与 19 session.query(User).get(‘16e19a64d5874c308421e1a835b01c69‘)# 以主键获取
多表查询:
session.query(Blog, User).filter(Blog.user == User.id).first().User.username session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id session.query(Blog.id,User.id,User.username).filter(Blog.user == User.id).first().keys()
条件查询:
from sqlalchemy import or_, not_ session.query(User).filter(or_(User.id == ‘‘, User.id == ‘1‘)).all() session.query(User).filter(not_(User.id == ‘1‘)).all() session.query(User).filter(User.id.in_([‘1‘])).all() session.query(User).filter(User.id.like(‘1%‘)).all() session.query(User).filter(User.id.startswith(‘1‘)).all() dir(User.id)
函数:
from sqlalchemy import func session.query(func.count(‘1‘)).select_from(User).scalar() session.query(func.count(‘1‘), func.max(User.username)).select_from(User).first() session.query(func.count(‘1‘)).select_from(User).scalar() session.query(func.md5(User.username)).select_from(User).all() session.query(func.current_timestamp()).scalar() session.query(User).count()
3.4. 修改
还是通常的两种方式:
1 session.query(User).filter(User.username == ‘abc‘).update({‘name‘: ‘123‘}) 2 session.commit() 3 4 user=session.query(User).filter_by(username=‘abc‘).scalar() 5 user.name = ‘223‘ 6 session.commit()
如果涉及对属性原值的引用, 则要考虑 synchronize_session
这个参数.
‘evaluate‘
默认值, 会同时修改当前 session 中的对象属性.‘fetch‘
修改前, 会先通过select
查询条目的值.False
不修改当前 session 中的对象属性.
在默认情况下, 因为会有修改当前会话中的对象属性, 所以如果语句中有 SQL 函数, 或者"原值引用", 那是无法完成的操作, 自然也会报错, 比如:
from sqlalchemy import func session.query(User).update({User.name: func.trim(‘123 ‘)}) session.query(User).update({User.name: User.name + ‘x‘})
这种情况下, 就不能要求 SQLAlchemy 修改当前 session 的对象属性了, 而是直接进行数据库的交互, 不管当前会话值:
session.query(User).update({User.name: User.name + ‘x‘}, synchronize_session=False)
是否修改当前会话的对象属性, 涉及到当前会话的状态. 如果当前会话过期, 那么在获取相关对象的属性值时, SQLAlchemy 会自动作一次数据库查询, 以便获取正确的值:
user = session.query(User).filter_by(username=‘abc‘).scalar() print user.name session.query(User).update({User.name: ‘new‘}, synchronize_session=False) print (user.name) session.commit() print (user.name)
执行了 update
之后, 虽然相关对象的实际的属性值已变更, 但是当前会话中的对象属性值并没有改变. 直到 session.commit()
之后, 当前会话变成"过期"状态, 再次获取 user.name
时, SQLAlchemy 通过 user
的 id
属性, 重新去数据库查询了新值. (如果 user
的 id
变了呢? 那就会出事了啊.)
synchronize_session
设置成 ‘fetch‘
不会有这样的问题, 因为在做 update
时已经修改了当前会话中的对象了.
不管 synchronize_session
的行为如何, commit
之后 session
都会过期, 再次获取相关对象值时, 都会重新作一次查询.
3.5. 删除
session.query(User).filter_by(username=‘abc‘).delete() user = session.query(User).filter_by(username=‘abc‘).first() session.delete(user)
删除同样有像修改一样的 synchronize_session
参数的问题, 影响当前会话的状态.
3.6. JOIN
SQLAlchemy 可以很直观地作 join
的支持:
1 r = session.query(Blog, User).join(User, Blog.user == User.id).all() 2 for blog, user in r: 3 print (blog.id, blog.user, user.id) 4 r = session.query(Blog, User.name, User.username).join(User, Blog.user == User.id).all() 5 print (r)