11.5 Flask 蓝图,数据库链接

蓝图

使用场景

如果代码非常多,要进行归类。不同的功能放在不同的文件,把相关的视图函数也放进去。

蓝图也就是对flask的目录结构进行分配(应用于小,中型的程序)

当然对于大型项目也可以通过   url_prefix 加前缀的方式实现

使用方法

# __init__.py
from .views.account import ac
from .views.user import us

app.register_blueprint(ac)
app.register_blueprint(us)

# account.py
from flask import Blueprint,render_template
ac = Blueprint("ac" ,__name__,template_folder="xxxx",static_url_path="xxxx")

# template_folder 优先在 templates 文件夹找。找不到再去 xxxx 里找
# static_url_path 优先在 static 文件夹找。找不到再去 xxxx 里找
# url_prefix="/xx" 为当前蓝图的url里加前缀 

目录结构

crm
    crm
        view
            account.py
            user.py
        static
        templates
            login.html
        __init__.py
    manage.py

 __init__.py

只要一导入crm就会执行__init__.py文件

在此文件实现app 对象的生成,以及所有蓝图的注册功能

from flask import Flask
from .views.account import ac
from .views.user import us
def create_app():
    app = Flack(__name__)

    @app.before_request   # 对全局的视图有效
    def xx():
        print("app.before_request")

    app.register_blueprint(ac)
    app.register_blueprint(us)

    return app 

account.py

各自的视图文件,创建蓝图对象

自己视图的使用为自己的蓝图对象

注意: 视图函数的名字不能和蓝图对象重名

from flask import Blueprint,render_template
ac = Blueprint("ac" ,__name__,template_folder="xxxx",static_url_path="xxxx")
# template_folder 优先在 templates 文件夹找。找不到再去 xxxx 里找
# static_url_path 优先在 static 文件夹找。找不到再去 xxxx 里找
# url_prefix="/xx" 为当前蓝图的url里加前缀
@ac.route("/login")
def login():
    return render_template("login.html")

user.py

from flask import Blueprint
us = Blueprint("us" ,__name__)

@us.before_request   # 仅对当前的视图有效
def xx():
    print("us.before_request")

@us.route("/user")
def user():
    return "user"

链接数据库

flask中是没有ORM的,如果在flask里面连接数据库有两种方式

pymysql
SQLAlchemy
  是python 操作数据库的一个库。能够进行 orm 映射官方文档 sqlchemy
  SQLAlchemy“采用简单的Python语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy的理念是,SQL数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。

pymysql 实现数据库操作

方式一  数据库链接放在视图中

  每次视图的执行进行数据库连接查询关闭。

  反复创建数据库链接,多次链接数据库会非常耗时

解决办法:放在全局,单例模式

#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from  flask import Flask

app = Flask(__name__)

@app.route(‘/index‘)
def index():
    # 链接数据库
    conn = pymysql.connect(host="127.0.0.1",port=3306,user=‘root‘,password=‘123‘, database=‘pooldb‘,charset=‘utf8‘)
    cursor = conn.cursor()
    cursor.execute("select * from td where id=%s", [5, ])
    result = cursor.fetchall()  # 获取数据
    cursor.close()
    conn.close()  # 关闭链接
    print(result)
    return  "执行成功"

if __name__ == ‘__main__‘:
    app.run(debug=True)

方式二  放在全局

  不在频繁链接数据库。

  如果是单线程,这样没什么问题,

  但是如果是多线程,就得加把锁。这样就成串行的了

  为了支持并发,此方法依旧不可取

#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from  flask import Flask
from threading import RLock

app = Flask(__name__)
CONN = pymysql.connect(host="127.0.0.1",port=3306,user=‘root‘,password=‘123‘, database=‘pooldb‘,charset=‘utf8‘)

@app.route(‘/index‘)
def index():
    with RLock:
        cursor = CONN.cursor()
        cursor.execute("select * from td where id=%s", [5, ])
        result = cursor.fetchall()  # 获取数据
        cursor.close()
        print(result)
        return  "执行成功"
if __name__ == ‘__main__‘:
    app.run(debug=True)

为此。为了解决方式一二的问题,实现不频繁操作且可以并行的数据库链接,我们需要用到 DBUtils

方式三 DBUtils + thread.local

为每一个线程创建一个链接(是基于本地线程来实现的。thread.local),

每个线程独立使用自己的数据库链接,该线程关闭不是真正的关闭,本线程再次调用时,还是使用的最开始创建的链接,直到线程终止,数据库链接才关闭

#!usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
app = Flask(__name__)
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host=‘127.0.0.1‘,
    port=3306,
    user=‘root‘,
    password=‘123‘,
    database=‘pooldb‘,
    charset=‘utf8‘
)

@app.route(‘/func‘)
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute(‘select * from tb1‘)
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect()   conn.close()

  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute(‘select * from tb1‘)
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == ‘__main__‘: app.run(debug=True)

方式四 DBUtils + 链接池

创建一个链接池,为所有线程提供连接,使用时来进行获取,使用完毕后在放回到连接池。

PS:

  假设最大链接数有10个,其实也就是一个列表,当你pop一个,人家会在append一个,链接池的所有的链接都是按照排队的这样的方式来链接的。

  链接池里所有的链接都能重复使用,共享的, 即实现了并发,又防止了链接次数太多

#!usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask
app = Flask(__name__)
from DBUtils.PersistentDB import PersistentDB
import pymysql

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。
    # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列    AQ 表。如:["set datestyle to ...", "set time zone ..."]
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
        # 取值:
        # 0 = None = never,
        # 1 = default = whenever it is requested,
        # 2 = when a cursor is created,
        # 4 = when a query is executed,
        # 7 = always
    host=‘127.0.0.1‘,
    port=3306,
    user=‘root‘,
    password=‘‘,
    database=‘core_master‘,
    charset=‘utf8‘
)
@app.route(‘/func‘)
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute(‘select * from tb1‘)
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect()   conn.close()

  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute(‘select * from tb1‘)
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == ‘__main__‘: app.run(debug=True)

其他

  pymysql 的操作很是繁琐,大量的重复代码,可以进一部封装

# 创建 链接池的操作封装
import pymysql

from settings import Config

def connect():
    conn = Config.POOL.connection()
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    return conn,cursor

def connect_close(conn,cursor):
    cursor.close()
    conn.close()

def fetch_all(sql,args):
    conn,cursor = connect()

    cursor.execute(sql, args)
    record_list = cursor.fetchall()
    connect_close(conn,cursor)

    return record_list

def fetch_one(sql, args):
    conn, cursor = connect()
    cursor.execute(sql, args)
    result = cursor.fetchone()
    connect_close(conn, cursor)

    return result

def insert(sql, args):
    conn, cursor = connect()
    row = cursor.execute(sql, args)
    conn.commit()
    connect_close(conn, cursor)
    return row

封装代码

DBUtils 内部原理

在 DBUtils 中为每个线程创建一个数据库连接的时候,

对每个线程单独创建内存空间来保存数据,实现数据的空间分离

初始的多线程

import threading
from threading import local
import time

def task(i):
    global v
    time.sleep(1)
    print(v)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()
#  9 9 9 9 9 9 9 9 9 

实现数据隔离的多线程

import threading
from threading import local
import time

obj = local()  # 为每个线程创建一个独立的空间。数据空间隔离

def task(i):
    obj.xxxxx = i
    time.sleep(2)
    print(obj.xxxxx,i)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()
# 0-9 打乱顺序 

获取线程的唯一标记示例

import threading
from threading import local

def task(i):
    print(threading.get_ident(),i)  # 获取线程的唯一标记 

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()

threading local 的内部实现原理

import time
import threading
import greenlet

DIC = {}    # 用线程的唯一标识作为 key 来创建一个大字典分别保存每个线程的数据

def task(i):

    # ident = threading.get_ident()  # 获取进程的 唯一id
    ident = greenlet.getcurrent()    # 获取协程的 唯一id
    if ident in DIC:
        DIC[ident][‘xxxxx‘] = i
    else:
        DIC[ident] = {‘xxxxx‘:i }
    time.sleep(2)

    print(DIC[ident][‘xxxxx‘],i)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()

最终完整版

import time
import threading
try:
    import greenlet
    get_ident =  greenlet.getcurrent
except Exception as e:
    get_ident = threading.get_ident

class Local(object):
    DIC = {}

    def __getattr__(self, item):
        ident = get_ident()
        if ident in self.DIC:
            return self.DIC[ident].get(item)
        return None

    def __setattr__(self, key, value):
        ident = get_ident()
        if ident in self.DIC:
            self.DIC[ident][key] = value
        else:
            self.DIC[ident] = {key:value}

obj = Local()

def task(i):
    obj.xxxxx = i
    time.sleep(2)
    print(obj.xxxxx,i)

for i in range(10):
    t = threading.Thread(target=task,args=(i,))
    t.start()

原文地址:https://www.cnblogs.com/shijieli/p/10355840.html

时间: 2024-10-08 11:04:59

11.5 Flask 蓝图,数据库链接的相关文章

原创:mysql5 还原至mysql 8.0.11数据库链接配置提示错误改动备注

原创:mysql5 还原至mysql 8.0.11数据库链接配置提示错误改有三: a) mysql 连接jar包版修改 b)类路径修改 c)配置连接池地址修改 因版本升级,首先要修改 1:mysql-connector-java 架包版本修改 <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> !--原版本 5.1.6-->

Flask -SQLAlchemy 数据库一对一

from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom datetime import datetimeimport configapp = Flask(__name__)app.config.from_object(config)db = SQLAlchemy(app)class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer

flask 蓝图

转自:http://spacewander.github.io/explore-flask-zh/7-blueprints.html 蓝图 什么是蓝图? 一个蓝图定义了可用于单个应用的视图,模板,静态文件等等的集合.举个例子,想象一下我们有一个用于管理面板的蓝图.这个蓝图将定义像/admin/login和/admin/dashboard这样的路由的视图.它可能还包括所需的模板和静态文件.你可以把这个蓝图当做你的应用的管理面板,管它是宇航员的交友网站,还是火箭推销员的CRM系统. 我什么时候会用到

Flask蓝图目录、Flask-SQLAlchemy、Flask-Script、Flask-Migrate

一.Flask蓝图目录 我们之前写的Flask项目都是自己组织的目录结构,其实Flask官方有其推荐的目录结构,以下就是一个符合官方推荐的Flask小型应用的项目结构目录示例,如下: 如图,这就是我们建立好的一个目录结构,一层一层的看一下,首先是app目录,它就是我们的主应用程序目录了,其中有一个__init__.py文件,里面的内容如下: from flask import Flask from .views.acc import acc_bp from .views.user import

沫沫金【实践可用】--web工程ORM数据库链接(JDBC)链接集群库||普通库,两种标准

普通链接配置,应用到集群会启动失败,请修改 集群数据库链接 jdbc.url=jdbc:oracle:thin:@//127.0.0.1:1521/momojin 普通数据库链接 jdbc.url=jdbc:oracle:thin:@127.0.0.1:1521/momojin 区别就在于:"//",如上所示 标红的地方.请务必清楚

Oracle 数据库链接

SQL> CREATE DATABASE LINK   mydblink 2    CONNECT TO   test   IDENTIFIED BY   test123 3    USING '(DESCRIPTION = 4      (ADDRESS_LIST = 5        (ADDRESS = (PROTOCOL = TCP)(HOST=192.168.1.210)(PORT = 1521))) 6        (CONNECT_DATA = (SERVICE_NAME = o

EntityFramework 多数据库链接,MySql,SqlServer,Oracel等

环境:EntityFramework5.0,MySql5.6,MSSQL2012 EF是强大的ORM工具,真正意义上的多数据库链接指的是不同类型的数据库,以及同种类型的数据库多个库,EF很好的支持这一点,下面简单演示下: 创建一个MVC4.0,Framework4.5的基本项目,然后重点是WebConfig配置: <?xml version="1.0" encoding="utf-8"?> <!-- For more information on

计算机改名导致数据库链接的诡异问题

前几天给开发部门部署测试数据库时,遇到一个很诡异的问题:创建一个链接服务器GEK-MIS01时,报错如下: 消息 15190,级别 16,状态 1,过程 sp_dropserver,第 56 行 仍有对服务器 'GEK-MIS01' 的远程登录或链接登录. 脚本如下(略去登录名等关键信息):   因为当时是一批脚本执行而且仅有这个脚本出错,当我准备查检查出错原因的时候,又有更紧急的事情要处理,就耽搁了处理这个问题,开发那边在测试过程发现这个数据库链接有问题,邮件反馈给我,我检查时居然发现很多不可

EF之MSSQL分布式部署一:EFContext自定义数据库链接

不废话,上代码: 来源:http://bbs.csdn.net/topics/390823046 原文地址:EF之MSSQL分布式部署一:EFContext自定义数据库链接 /// <summary> /// 得到Entity的连接字符串 /// </summary> /// <param name="edmxFullName">Edmx的包括命名空间的全名称</param> /// <param name="server