SQLAlchemy模型使用

一、SQLAlchemy初始化和创建连接

1. SQLAlchemy初始化

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column
from sqlalchemy.types import String, Integer
from sqlalchemy.ext.declarative import declarative_base
#导入相应的模块
engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test", max_overflow=5) #创建数据库连接,max_overflow指定最大连接数
DBSession = sessionmaker(engine)#创建DBSession类型
session = DBSession()#创建session对象

BaseModel = declarative_base()#创建对象的基类

class User(BaseModel): #定义User对象
    __tablename__ = ‘user‘ #创建表,指定表名称

    #指定表的结构
    id = Column(String, primary_key=True)
    username = Column(String, index=True)

class Session(BaseModel):
    __tablename__ = ‘session‘

    id = Column(String, primary_key=True)
    user = Column(String, index=True)
    ip = Column(String)
BaseModel.metadata.create_all(engine)#创建表,执行所有BaseModel类的子类
session.commit()#提交,必须
 

2. SQLAlchemy创建连接

SQLAlchemy 的连接创建是 Lazy 的方式, 即在需要使用时才会去真正创建. 之前做的工作, 全是"定义".连接的定义是在 engine 中做的.

2.1. Engine

engine 的定义包含了三部分的内容, 一是具体数据库类型的实现, 二是连接池, 三是策略(即engine 自己的实现).

所谓的数据库类型即是 MYSQL , Postgresql , SQLite 这些不同的数据库.

一般创建 engine 是使用 create_engine 方法:

engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test")

参数字符串的各部分的意义:

"mysql+pymysql://mysql:[email protected]:3306/test"

对于这个字符串, SQLAlchemy 提供了工具可用于处理它:

 1 # -*- coding: utf-8 -*-
 2
 3 from sqlalchemy import create_engine
 4 from sqlalchemy.engine.url import make_url
 5 from sqlalchemy.engine.url import URL
 6
 7 s = ‘postgresql://[email protected]:5432/bbcustom‘
 8 url = make_url(s)
 9 s = URL(drivername=‘postgresql‘, username=‘test‘, password="",host="localhost", port=5432, database="bbcustom")
10
11 engine = create_engine(url)
12 engine = create_engine(s)
13
14 print engine.execute(‘select id from "user"‘).fetchall()

create_engine 函数有很多的控制参数, 这个后面再详细说.

2.2. Engine的策略

create_engine 的调用, 实际上会变成 strategy.create 的调用. 而 strategy 就是 engine 的实现细节. strategy 可以在 create_engine 调用时通过 strategy 参数指定, 目前官方的支持有三种:

  • plain, 默认的
  • threadlocal, 连接是线程局部的
  • mock, 所有的 SQL 语句的执行会使用指定的函数

mock 这个实现, 会把所有的 SQL 语句的执行交给指定的函数来做, 这个函数是由create_engine 的 executor 参数指定:

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 from sqlalchemy.ext.declarative import declarative_base
 6 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
 7 from sqlalchemy.orm import sessionmaker, relationship
 8 from sqlalchemy import create_engine

 9 def f(sql,*args,**kwargs):
10     print(sql,args,kwargs)
11 s="mysql+pymysql://mysql:[email protected]:3306/test"
12 engin=create_engine(s,strategy=‘mock‘,executor=f)
13 print(engin.execute(‘select id from "user"‘))

2.3. 各数据库实现

各数据库的实现在 SQLAlchemy 中分成了两个部分, 一是数据库的类型, 二是具体数据库中适配的客户端实现. 比如对于 Postgresql 的访问, 可以使用 psycopg2 , 也可以使用 pg8000 :

1 s = ‘postgresql+psycopg2://[email protected]:5432/bbcustom‘
2 s = ‘postgresql+pg8000://[email protected]:5432/bbcustom‘
3 engine = create_engine(s)

具体的适配工作, 是需要在代码中实现一个 Dialect 类来完成的. 官方的实现在 dialects 目录下.

获取具体的 Dialect 的行为, 则是前面提到的 URL 对象的 get_dialect 方法. create_engine 时你单传一个字符串, SQLAlchemy 自己也会使用 make_url 得到一个 URL 的实例).

2.4. 连接池

SQLAlchemy 支持连接池, 在 create_engine 时添加相关参数即可使用.

  • pool_size 连接数
  • max_overflow 最多多几个连接
  • pool_recycle 连接重置周期
  • pool_timeout 连接超时时间

连接池效果:

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column
from sqlalchemy.types import String, Integer
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test", pollsize=2,max_overflow=0)
DBSession = sessionmaker(engine)
session = DBSession()
BaseModel = declarative_base()

from threading import Thread
def f():
    print (engine.execute(‘show databases‘).fetchall())

p = []
for i in range(3):
    p.append(Thread(target=f))

for t in p:
    t.start()

3.SQLAlchemy 模型使用

3.1. 模型定义

对于 Table 的定义, 本来是直接的实例化调用, 通过 declarative 的包装, 可以像"定义类"这样的更直观的方式来完成.

user = Table(‘user‘,
    Column(‘user_id‘, Integer, primary_key = True),
    Column(‘user_name‘, String(16), nullable = False),
    Column(‘email_address‘, String(60)),
    Column(‘password‘, String(20), nullable = False)
)                                                                                     
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from sqlalchemy.ext.declarative import declarative_base
 4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
 5 from sqlalchemy.orm import sessionmaker, relationship
 6 from sqlalchemy import create_engine
 7
 8 engine = create_engine("mysql+pymysql://mysql:[email protected]:3306/test", max_overflow=5) #创建数据库连接
 9 DBSession=sessionmaker(engine)#创建了一个自定义了的 Session类
10 session=DBSession()
11 Base = declarative_base()#创建对象的基类
12
13
14 class Blog(Base):
15     __tablename__ = ‘blog‘
16
17     id = Column(Integer, primary_key=True)
18     title = Column(String(64), server_default=‘‘, nullable=False)
19     text = Column(String, server_default=‘‘, nullable=False)
20     user = Column(String(32), index=True, server_default=‘‘, nullable=False)
21     create = Column(String(32), index=True, server_default=‘0‘, nullable=False)
22
23
24 class User(Base):
25     __tablename__ = ‘user‘
26
27     id = Column(Integer, primary_key=True)
28     name = Column(String(32), server_default=‘‘, nullable=False)
29     username = Column(String(32), index=True, server_default=‘‘, nullable=False)
30     password = Column(String(64), server_default=‘‘, nullable=False)
31
32
33 def init_db():
34     Base.metadata.create_all(engine)
35
36 def drop_db():
37     Base.metadata.drop_all(engine)
38
39 if __name__ == ‘__main__‘:
40     init_db()
41     #drop_db()
42     #Base.metadata.tables[‘user‘].create(engine, checkfirst=True)
43     #Base.metadata.tables[‘user‘].drop(engine, checkfirst=False)
44     #pass

3.2. 创建

1 session = Session()
2 session.add(User(id=1)) #创建一个
3 session.add(Blog(id=1))
4 session.add_all([ User(id=2),Blog(id=2)]) #创建多个
5 session.commit()

执行的顺序并不一定会和代码顺序一致, SQLAlchemy 自己会整合逻辑再执行.

3.3. 查询

SQLAlchemy 实现的查询非常强大, 写起来有一种随心所欲的感觉.

查询的结果, 有几种不同的类型, 这个需要注意, 像是:

  • instance
  • instance of list
  • keyed tuple of list
  • value of list

基本查询:

 1 session.query(User).filter_by(username=‘abc‘).all()#查询用户User创建表中username字段等于abc的内容
 2 session.query(User).filter(User.username==‘abc‘).all()  #平时使用的时候,两者区别主要就是当使用filter的时候条件之间是使用“==",fitler_by使用的是"="

 3 session.query(Blog).filter(Blog.create >= 0).all() #查询Blog对象中创建表中create字段大于等于0的所有内容
 4 session.query(Blog).filter(Blog.create >= 0).first()
 5 session.query(Blog).filter(Blog.create >= 0 | Blog.title == ‘A‘).first()#条件或
 6 session.query(Blog).filter(Blog.create >= 0 & Blog.title == ‘A‘).first()#条件与
 7 session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar()#  offset(N)从第N条开始返回  limit(N)最多返回 N 条记录 scalar() 如果有记录
 8 session.query(User).filter(User.username ==  ‘abc‘).scalar()
 9 session.query(User.id).filter(User.username ==  ‘abc‘).scalar()
10 session.query(Blog.id).filter(Blog.create >= 0).all()
11 session.query(Blog.id).filter(Blog.create >= 0).all()[0].id
12 dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all())
13 session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title #记录不存在时,first() 会返回 None
14 session.query(User.id).order_by(‘id desc‘).all()#对结果集进行排序
15 session.query(User.id).order_by(‘id‘).first()
16 session.query(User.id).order_by(User.id).first()
17 session.query(User.id).order_by(-User.id).first()
18 session.query(‘id‘, ‘username‘).select_from(User).all()#与
19 session.query(User).get(‘16e19a64d5874c308421e1a835b01c69‘)# 以主键获取

多表查询:

session.query(Blog, User).filter(Blog.user == User.id).first().User.username
session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id
session.query(Blog.id,User.id,User.username).filter(Blog.user == User.id).first().keys()

条件查询:

from sqlalchemy import or_, not_

session.query(User).filter(or_(User.id == ‘‘,
                               User.id == ‘1‘)).all()
session.query(User).filter(not_(User.id == ‘1‘)).all()
session.query(User).filter(User.id.in_([‘1‘])).all()
session.query(User).filter(User.id.like(‘1%‘)).all()
session.query(User).filter(User.id.startswith(‘1‘)).all()
dir(User.id)

函数:

from sqlalchemy import func
session.query(func.count(‘1‘)).select_from(User).scalar()
session.query(func.count(‘1‘), func.max(User.username)).select_from(User).first()
session.query(func.count(‘1‘)).select_from(User).scalar()
session.query(func.md5(User.username)).select_from(User).all()
session.query(func.current_timestamp()).scalar()
session.query(User).count()

3.4. 修改

还是通常的两种方式:

1 session.query(User).filter(User.username == ‘abc‘).update({‘name‘: ‘123‘})
2 session.commit()
3
4 user=session.query(User).filter_by(username=‘abc‘).scalar()
5 user.name = ‘223‘
6 session.commit()

如果涉及对属性原值的引用, 则要考虑 synchronize_session 这个参数.

  • ‘evaluate‘ 默认值, 会同时修改当前 session 中的对象属性.
  • ‘fetch‘ 修改前, 会先通过 select 查询条目的值.
  • False 不修改当前 session 中的对象属性.

在默认情况下, 因为会有修改当前会话中的对象属性, 所以如果语句中有 SQL 函数, 或者"原值引用", 那是无法完成的操作, 自然也会报错, 比如:

from sqlalchemy import func
session.query(User).update({User.name: func.trim(‘123 ‘)})
session.query(User).update({User.name: User.name + ‘x‘})

这种情况下, 就不能要求 SQLAlchemy 修改当前 session 的对象属性了, 而是直接进行数据库的交互, 不管当前会话值:

session.query(User).update({User.name: User.name + ‘x‘}, synchronize_session=False)

是否修改当前会话的对象属性, 涉及到当前会话的状态. 如果当前会话过期, 那么在获取相关对象的属性值时, SQLAlchemy 会自动作一次数据库查询, 以便获取正确的值:

user = session.query(User).filter_by(username=‘abc‘).scalar()
print user.name
session.query(User).update({User.name: ‘new‘}, synchronize_session=False)
print (user.name)
session.commit()
print (user.name)

执行了 update 之后, 虽然相关对象的实际的属性值已变更, 但是当前会话中的对象属性值并没有改变. 直到 session.commit() 之后, 当前会话变成"过期"状态, 再次获取 user.name 时, SQLAlchemy 通过 user 的 id 属性, 重新去数据库查询了新值. (如果 user 的 id 变了呢? 那就会出事了啊.)

synchronize_session 设置成 ‘fetch‘ 不会有这样的问题, 因为在做 update 时已经修改了当前会话中的对象了.

不管 synchronize_session 的行为如何, commit 之后 session 都会过期, 再次获取相关对象值时, 都会重新作一次查询.

3.5. 删除

session.query(User).filter_by(username=‘abc‘).delete()

user = session.query(User).filter_by(username=‘abc‘).first()
session.delete(user)

删除同样有像修改一样的 synchronize_session 参数的问题, 影响当前会话的状态.

3.6. JOIN

SQLAlchemy 可以很直观地作 join 的支持:

1 r = session.query(Blog, User).join(User, Blog.user == User.id).all()
2 for blog, user in r:
3     print (blog.id, blog.user, user.id)

4 r = session.query(Blog, User.name, User.username).join(User, Blog.user == User.id).all()
5 print (r)
时间: 2024-10-17 17:52:34

SQLAlchemy模型使用的相关文章

Flask SQLAlchemy 表字段默认值

在网上查到的SQLAlchemy设置字段默认值的方法都是类似如下方法: from sqlalchemy.sql.sqltypes import TIMESTAMP class Test(db.Model):     id = db.Column(db.Integer, primary_key = True)     name = db.Column(db.String(32))     create_date = db.Column(TIMESTAMP, default = datetime.d

爱上 SQLAlchemy 的 10 个理由(转)

原文:http://python.jobbole.com/82453/ 本文由 伯乐在线 - Namco 翻译,唐尤华 校稿.未经许可,禁止转载!英文出处:Paul Johnston.欢迎加入翻译组. 最近,我见到了很多针对 ORM 的抨击,但是我觉得有些批评是莫须有的.我本人就是 SQLAlchemy 的忠实拥趸.在我的项目里很多地方都用到了 SQLAlchemy,我也为 SQLAlchemy 项目贡献了一些代码.这篇文章里,我会阐述你应当爱上 SQLAlchemy 的10个理由.说实话,除了

Python Flask高级编程之RESTFul API前后端分离精讲 (网盘免费分享)

Python Flask高级编程之RESTFul API前后端分离精讲 (免费分享)  点击链接或搜索QQ号直接加群获取其它资料: 链接:https://pan.baidu.com/s/12eKrJKN-MzscalsJKRoL5w 提取码:88hj 免费分享,如若链接失效请加群 其它资源在群里,私聊管理员即可免费领取:群——517432778,点击加群,或扫描二维码 免费课程资料领取目录:  Python Flask构建微信小程序订餐系统 Python分布式爬虫必学框架Scrapy打造搜索引擎

Flask 教程 第十六章:全文搜索

本文翻译自The Flask Mega-Tutorial Part XVI: Full-Text Search 这是Flask Mega-Tutorial系列的第十六部分,我将在其中为Microblog添加全文搜索功能. 本章的目标是为Microblog实现搜索功能,以便用户可以使用自然语言查找有趣的用户动态内容.许多不同类型的网站,都可以使用Google,Bing等搜索引擎来索引所有内容,并通过其搜索API提供搜索结果. 这这方法适用于静态页面较多的的大部分网站,比如论坛. 但在我的应用中,基

flask-admin,自定义视图参数设置。

# 许可类 can_create = True """是否可以新建""" can_edit = True """是否可以编辑""" can_delete = True """是否可以编辑""" can_view_details = False """是否显示详情,如果你设置了不显示,可以设置是否使用

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memcached安装和基本使用 Memcached安装: wget http://memcached.org/latest

SQLAlchemy 教程 —— 基础入门篇

SQLAlchemy 教程 -- 基础入门篇 一.课程简介 1.1 实验内容 本课程带领大家使用 SQLAlchemy 连接 MySQL 数据库,创建一个博客应用所需要的数据表,并介绍了使用 SQLAlchemy 进行简单了 CURD 操作及使用 Faker 生成测试数据. 1.2课程知识点 学会用 SQLALchemy 连接数据库(MySQL, SQLite, PostgreSQL), 创建数据表: 掌握表数据之间一对一,一对多及多对多的关系并能转化为对应 SQLAlchemy 描述: 掌握使

Python中应用SQL及SQLAlchemy(一)

以SQLit3为例: import sqlite3 conn = sqlite3.connect('db.sqlite3') #获取游标对象 cur = conn.cursor() #执行一系列SQL语句 #建立一张表 #cur.execute("create table demo(num int, str vachar(20));") #插入一些记录 cur.execute("insert into demo values(%d, '%s')" % (1, 'aa