Python学习 - 编写自己的ORM(2)

上一篇文章简单的实现了ORM(对象关系模型),这一篇文章主要实现简单的MySQL数据库操作。

想要操作数据库,首先要建立一个数据库连接。下面定义一个创建数据库连接的函数,得到一个连接叫做engine。

def create_engine(user,password,database,host=‘127.0.0.1‘,port=3306,**kw):
    import mysql.connector
    global engine
    if engine is not None:
        raise DBError(‘Engine is already initialized.‘)
    params = dict(user=user,password=password,database=database,host=host,port=port)
    defaults = dict(use_unicode=True,charset=‘utf8‘,collation=‘utf8_general_ci‘,autocommit=False)
    #print (‘%s %s %s %s %s‘) % (user,password,database,host,port)
    for k,v in defaults.iteritems():
        params[k] = kw.pop(k,v)
    params.update(kw)
    params[‘buffered‘] = True
    engine = mysql.connector.connect(**params)
    cursor = engine.cursor()

有了连接就可以对数据库进行操作了。下面写了几个函数,可以对数据库进行查询和插入操作。

def _select(sql,first,*args):
    cursor = None
    sql = sql.replace(‘?‘,‘%s‘)
    global engine
    try:
        cursor = engine.cursor()
        cursor.execute(sql,args)
        if cursor.description:
            names = [x[0] for x in cursor.description]
        if first:
            values = cursor.fetchone()
            if not values:
                return None
            return Dict(names,values)
        return [Dict(names,x) for x in cursor.fetchall()]
    finally:
        if cursor:
            cursor.close()

def select_one(sql,*args):
    return _select(sql,True,*args)

def select(sql,*args):
    return _select(sql,False,*args)

def _update(sql,*args):
    cursor = None
    global engine
    sql = sql.replace(‘?‘,‘%s‘)
    print sql
    try:
        cursor = engine.cursor()
        cursor.execute(sql,args)
        r = cursor.rowcount
        engine.commit()
        return r
    finally:
        if cursor:
            cursor.close()

def insert(table,**kw):
    cols, args = zip(*kw.iteritems())
    sql = ‘insert into %s (%s) values(%s)‘ % (table,‘,‘.join([‘%s‘ % col for col in cols]),‘,‘.join([‘?‘ for i in range(len(cols))]))
    print (‘sql %s args %s‘ % (sql, str(args)))
    return _update(sql,*args)

到这里,基本的数据库操作已经完成了。但是,根据廖雪峰的教程,这还远远不够。

  • 如果要在一个数据库连接中实现多个操作,上面的代码效率很低,没次执行玩一条语句,就需要重新分配一个连接。
  • 在一次事务中执行多条操作也是一样效率低下。
  • 如果服务器为不同用户数据库请求都分配一个线程来建立连接,但是在进程中,连接是可供享使用的。这样问题就来了,导致数据库操作可能异常。

针对第三个问题,应该使每个连接是每个线程拥有的,其它线程不能访问,使用threading.local。首先定义一个类,来保存数据库的上下文:

class _DbCtx(threading.local):

    def __init__(self):
        self.connection = None
        self.transactions = 0

    def is_init(self):
        return not self.connection is None

    def init(self):
        self.connection = engine # 创建数据库连接
        self.transactions = 0

    def cleanup(self):
        self.connection.cleanup()
        self.connection = None

    def cursor(self):
        return self.connection.cursor()

上面的代码有一个错误。因为Python的赋值语句只是将一个对象的引用传给一个变量,就如上面代码中 init函数中 self.connection = engine。表明self.connection和engine都指向一个数据库连接的对象。如果将self.connection给cleanup了,那么engine指向的对象也被cleanup了。下图是一个例子:

a是类foo实例的一个引用,执行b=a后,在执行b.clean(),此时应该只是b的v值被更改为0,但是执行a.v却发现v的值也变为0了。

下面是最后的代码,只是封装了最底层的数据库操作,代码也写的很涨,虽然是模仿廖雪峰的代码。

# -*- coding: utf-8 -*-
import time, uuid, functools, threading, logging

class Dict(dict):
    ‘‘‘
    Simple dict but support access as x.y style.

    ‘‘‘
    def __init__(self, names=(), values=(), **kw):
        super(Dict, self).__init__(**kw)
        for k, v in zip(names, values):
            self[k] = v

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"‘Dict‘ object has no attribute ‘%s‘" % key)

    def __setattr__(self, key, value):
        self[key] = value
class DBError(Exception):
    pass
class MultiColumnsError(Exception):
    pass
engine = None
class _DbCtx(threading.local):

    def __init__(self):
        self.connection = None
        self.transactions = 0

    def is_init(self):
        return not self.connection is None

    def init(self):
        self.connection = engine
        self.transactions = 0

    def cleanup(self):
        self.connection = None

    def cursor(self):
        return self.connection.cursor()

def create_engine(user,password,database,host=‘127.0.0.1‘,port=3306,**kw):
    import mysql.connector
    global engine
    if engine is not None:
        raise DBError(‘Engine is already initialized.‘)
    params = dict(user=user,password=password,database=database,host=host,port=port)
    defaults = dict(use_unicode=True,charset=‘utf8‘,collation=‘utf8_general_ci‘,autocommit=False)
    #print (‘%s %s %s %s %s‘) % (user,password,database,host,port)
    for k,v in defaults.iteritems():
        params[k] = kw.pop(k,v)
    params.update(kw)
    params[‘buffered‘] = True
    engine = mysql.connector.connect(**params)
    print type(engine)

_db_ctx = _DbCtx()
class _ConnectionCtx(object):

    def __enter__(self):
        self.should_cleanuo = False
        if not _db_ctx.is_init():
            cursor = engine.cursor()
            _db_ctx.init()
            self.should_cleanup = True
        return self

    def __exit__(self,exctype,excvalue,traceback):
        if self.should_cleanup:
            _db_ctx.cleanup()

def with_connection(func):
    @functools.wraps(func)
    def _wrapper(*args,**kw):
        with _ConnectionCtx():
            return func(*args, **kw)
    return _wrapper

def _select(sql,first,*args):
    cursor = None
    sql = sql.replace(‘?‘,‘%s‘)
    global _db_ctx
    try:
        cursor = _db_ctx.cursor()
        cursor.execute(sql,args)
        if cursor.description:
            names = [x[0] for x in cursor.description]
        if first:
            values = cursor.fetchone()
            if not values:
                return None
            return Dict(names,values)
        return [Dict(names,x) for x in cursor.fetchall()]
    finally:
        if cursor:
            cursor.close()
@with_connection
def select_one(sql,*args):
    return _select(sql,True,*args)
@with_connection
def select_int(sql,*args):
    d = _select(sql,True,*args)
    if len(d) != 1:
        raise MultoColumnsError(‘Except only one column.‘)
    return d.values()[0]
@with_connection
def select(sql,*args):
    global engine
    print type(engine)
    return _select(sql,False,*args)
@with_connection
def _update(sql,*args):
    cursor = None
    global _db_ctx
    sql = sql.replace(‘?‘,‘%s‘)
    print sql
    try:
        cursor = _db_ctx.cursor()
        cursor.execute(sql,args)
        r = cursor.rowcount
        engine.commit()
        return r
    finally:
        if cursor:
            cursor.close()

def insert(table,**kw):
    cols, args = zip(*kw.iteritems())
    sql = ‘insert into %s (%s) values(%s)‘ % (table,‘,‘.join([‘%s‘ % col for col in cols]),‘,‘.join([‘?‘ for i in range(len(cols))]))
    print (‘sql %s args %s‘ % (sql, str(args)))
    return _update(sql,*args)

create_engine(user=‘root‘,password=‘z5201314‘,database=‘test‘)
u1 = select_one(‘select * from user where id=?‘,1)
print ‘u1‘
print u1
print ‘start selet()...‘
u2 = select(‘select * from user‘)
for item in u2:
    print (‘%s %s‘ % (item.name,item.id))
print ‘name:%s id: %s‘ % (u1.name,u1.id)
时间: 2025-01-02 06:52:51

Python学习 - 编写自己的ORM(2)的相关文章

Python学习 - 编写自己的ORM(1)

这篇博文参考的是廖雪峰的Python教程的实战部分,传送门.推荐大家看看装饰器和使用元类这两个章节,然后在看实战部分. 这篇博文有时间了还会更新,主要是学习Python的语法,如上面提到的装饰器和元类. 起步:编写简单的ORM对象 写一个类映射某个数据表,下面是写一个User类,对应数据库中的user表: class User(Model): id = StringField(primary_key=True,ddl='varchar(50)') name = StringField(ddl='

python 学习笔记十一 SQLALchemy ORM(进阶篇)

SqlAlchemy ORM SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果. Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如: MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port&g

Python学习 - 编写一个简单的web框架(一)

自己动手写一个web框架,因为我是菜鸟,对于python的一些内建函数不是清楚,所以在写这篇文章之前需要一些python和WSGI的预备知识,这是一系列文章.这一篇只实现了如何处理url. 参考这篇文章:http://www.cnblogs.com/russellluo/p/3338616.html 预备知识 web框架主要是实现web服务器和web应用之间的交互.底层的网络协议主要有web服务器完成.譬如监听端口,填充报文等等. Python内建函数__iter__和__call__和WSGI

Python学习笔记八:ORM框架SQLAlchemy

一:SQLAlchemy使用 1:实体类的创建 ORM中的实体类与一般的Python类不同,在其中,使用 __tablename__=""指明该类与数据库中某个表相对应,然后定义一系列成员属性,属性值使用 Column(数据类型) 来映射到表中具体哪一列. 首先,创建数据库引擎,并由静态方法获取一个基类:declarative_base() 创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联. 然后,继承base类,定义实体类: 带外键的实体类创建: 最后,运行基类中

Python学习---抽屉框架分析[ORM操作]180314

Django ORM操作     1. 字段操作         class User(model.Model);             u=字段        用处:            1 .admin中的字段验证            2. obj.clean_fields() 进行自定义的验证             3. 利用Djanfo Form进行验证,此时前台和后台的操作分开               但form和model里的字段重复[推荐使用]             

Python学习 - 编写一个简单的web框架(二)

在上一篇日志中已经讨论和实现了根据url执行相应应用,在我阅读了bottle.py官方文档后,按照bottle的设计重写一遍,主要借鉴大牛们的设计思想. 一个bottle.py的简单实例 来看看bottle是如何使用的,代码来自http://www.bottlepy.org/docs/0.12/index.html: from bottle import route, run, template @route('/hello/<name>') def index(name): return t

python学习笔记(十六) - ORM框架(SQLAlchemy)

所谓的ORM就是Object-Relational Mapping,把关系数据库的表结果映射到对象上. 1. 安装SQLAlchemy: easy_install sqlalchemy 2. 导入SQLAlchemy,并初始化DBSession: # 导入: from sqlalchemy import Column, String, create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declara

python学习笔记-Day022 - django ORM操作

一对多:models.ForeignKey() 首先定义表结构: class UserType(models.Model):     caption = models.CharField(max_length=32)     def __unicode__(self):         return self.caption          class UserInfo(models.Model):     username = models.CharField(max_length=32)

Python学习(三):入门篇:Python中怎么编写类

Python中怎么编写类 Last Edit 2013/5/2 先看一个例子: #person.py class person: """class to representaion a person""" def __init__(self,name,age): self.name=name if 0<age<=150: self.age=age else: print 'age is no valid!' def display(s