基于Torndb的简易ORM

最近在用tornado写一个基于Rest的WebService服务端,只提供后端服务,其他webserver应用通过URL,Rest的方式来访问。

我们在开发web应用的时候,难免会想到ORM的一些框架,比如java ee中常用的hibernate, ibatis以及python中的SQLAlchemy之类。使用ORM会在一定程度上加快我们的开发效率。

一个简易ORM框架主要实现如下几个功能就足够了:

1.插入: 类对象映射为数据库记录

2.查询:数据库记录映射为类对象

3.修改、删除:可以通过自己写sql语句来搞定。

python中有类,同时也有dict字典类型,如果将字典再包装为类,则显得过渡包装了,反倒很不灵活,因此,提炼一下,python的ORM框架只需要实现如下几点就足够:

1.插入: python的dict映射为数据库记录

2.查询:数据库记录映射为python的dict以及list等

3.修改、删除:可以通过自己写sql语句来搞定。

经过一些测试,技术选型,最终确定了使用tornadb,非常轻量级,查询数据库返回的对象直接映射为python的数据类型dict或者list之类。可以用类似java中“对象.属性”的方式来访问数据。这简直是太爽了~首先,看一个小例子。

import types
import time

class Row(dict):
    """A dict that allows for object-like property access syntax."""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)

dic = Row()
dic.name = ‘hello‘
dic.num = ‘12334‘
print type(dic)
print "dic.name: " + dic.name
print "dic.num: " + dic.num

输出结果为:

<class ‘__main__.Row‘>

dic.name: hello

dic.num: 12334

通过这个例子,我们可以看到,python里面的dict类型,是可以变成类似java中“对象.属性”的方式来访问的。

torndb就是通过这样的方式,查询返回的数据可以通过“.列名”来直接访问。

查询的时候直接返回dict或者list类型,那插入呢?如果可以像java一样,传入一个对象,通过ORM框架直接反射为sql操作,这样多方便啊~

还是dict,如果我们插入的时候,直接将插入的数据格式保存为dict,通过这个dict生成insert语句就可以了,经过查阅各种资料,我提炼出来了如下方法:(使用的时候直接将该方法放入torndb.py中即可)

    def insert_by_dict(self, tablename, rowdict, replace=False):
        cursor = self._cursor()
        cursor.execute("describe %s" % tablename)
        allowed_keys = set(row[0] for row in cursor.fetchall())
        keys = allowed_keys.intersection(rowdict)

        if len(rowdict) > len(keys):
            unknown_keys = set(rowdict) - allowed_keys
            logging.error("skipping keys: %s", ", ".join(unknown_keys))

        columns = ", ".join(keys)
        values_template = ", ".join(["%s"] * len(keys))

        if replace:
            sql = "REPLACE INTO %s (%s) VALUES (%s)" % (
                tablename, columns, values_template)
        else:
            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
                tablename, columns, values_template)

        values = tuple(rowdict[key] for key in keys)
        try:
            cursor.execute(sql, values)
            #self._execute(cursor, sql, values, None)
            return cursor.lastrowid
        finally:
            cursor.close()

这样,插入的时候我们就再也不用写繁琐的sql语句了,只需要将我们要插入的对象使用dict封装,比如:

有个host表,里面有hostname,ip两个字段,则我们可以用如下几行代码,就可以插入到数据库:

    host = {}
    host[‘hostname‘] = ‘test1‘
    host[‘ip‘] = ‘10.22.10.90‘
    ret = db.insert_by_dict("Host", host)

是不是很方便呢?:)下面是我修改过后,完整的torndb源码。欢迎大家多多下载使用。

#!/usr/bin/env python
#
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""A lightweight wrapper around MySQLdb.

Originally part of the Tornado framework.  The tornado.database module
is slated for removal in Tornado 3.0, and it is now available separately
as torndb.
"""

from __future__ import absolute_import, division, with_statement

import copy
import itertools
import logging
import os
import time

try:
    import MySQLdb.constants
    import MySQLdb.converters
    import MySQLdb.cursors
except ImportError:
    # If MySQLdb isn‘t available this module won‘t actually be useable,
    # but we want it to at least be importable on readthedocs.org,
    # which has limitations on third-party modules.
    if ‘READTHEDOCS‘ in os.environ:
        MySQLdb = None
    else:
        raise

version = "0.2"
version_info = (0, 2, 0, 0)

class Connection(object):
    """A lightweight wrapper around MySQLdb DB-API connections.

    The main value we provide is wrapping rows in a dict/object so that
    columns can be accessed by name. Typical usage::

        db = torndb.Connection("localhost", "mydatabase")
        for article in db.query("SELECT * FROM articles"):
            print article.title

    Cursors are hidden by the implementation, but other than that, the methods
    are very similar to the DB-API.

    We explicitly set the timezone to UTC and assume the character encoding to
    UTF-8 (can be changed) on all connections to avoid time zone and encoding errors.

    The sql_mode parameter is set by default to "traditional", which "gives an error instead of a warning"
    (http://dev.mysql.com/doc/refman/5.0/en/server-sql-mode.html). However, it can be set to
    any other mode including blank (None) thereby explicitly clearing the SQL mode.
    """
    def __init__(self, host, database, user=None, password=None,
                 max_idle_time=7 * 3600, connect_timeout=0,
                 time_zone="+0:00", charset = "utf8", sql_mode="TRADITIONAL"):
        self.host = host
        self.database = database
        self.max_idle_time = float(max_idle_time)

        args = dict(conv=CONVERSIONS, use_unicode=True, charset=charset,
                    db=database, init_command=(‘SET time_zone = "%s"‘ % time_zone),
                    connect_timeout=connect_timeout, sql_mode=sql_mode)
        if user is not None:
            args["user"] = user
        if password is not None:
            args["passwd"] = password

        # We accept a path to a MySQL socket file or a host(:port) string
        if "/" in host:
            args["unix_socket"] = host
        else:
            self.socket = None
            pair = host.split(":")
            if len(pair) == 2:
                args["host"] = pair[0]
                args["port"] = int(pair[1])
            else:
                args["host"] = host
                args["port"] = 3306

        self._db = None
        self._db_args = args
        self._last_use_time = time.time()
        try:
            self.reconnect()
        except Exception:
            logging.error("Cannot connect to MySQL on %s", self.host,
                          exc_info=True)

    def __del__(self):
        self.close()

    def close(self):
        """Closes this database connection."""
        if getattr(self, "_db", None) is not None:
            self._db.close()
            self._db = None

    def reconnect(self):
        """Closes the existing database connection and re-opens it."""
        self.close()
        self._db = MySQLdb.connect(**self._db_args)
        self._db.autocommit(True)

    def initClientEncode(self):
        """mysql client encoding=utf8"""
        curs = self._cursor()
        curs.execute("SET NAMES utf8")
        return curs

    def iter(self, query, *parameters, **kwparameters):
        """Returns an iterator for the given query and parameters."""
        self._ensure_connected()
        cursor = MySQLdb.cursors.SSCursor(self._db)
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            for row in cursor:
                yield Row(zip(column_names, row))
        finally:
            cursor.close()

    def query(self, query, *parameters, **kwparameters):
        """Returns a row list for the given query and parameters."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            return [Row(itertools.izip(column_names, row)) for row in cursor]
        finally:
            cursor.close()

    def get(self, query, *parameters, **kwparameters):
        """Returns the (singular) row returned by the given query.

        If the query has no results, returns None.  If it has
        more than one result, raises an exception.
        """
        rows = self.query(query, *parameters, **kwparameters)
        if not rows:
            return None
        elif len(rows) > 1:
            raise Exception("Multiple rows returned for Database.get() query")
        else:
            return rows[0]

    # rowcount is a more reasonable default return value than lastrowid,
    # but for historical compatibility execute() must return lastrowid.
    def execute(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        return self.execute_lastrowid(query, *parameters, **kwparameters)

    def execute_lastrowid(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.lastrowid
        finally:
            cursor.close()

    def execute_rowcount(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the rowcount from the query."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.rowcount
        finally:
            cursor.close()

    def executemany(self, query, parameters):
        """Executes the given query against all the given param sequences.

        We return the lastrowid from the query.
        """
        return self.executemany_lastrowid(query, parameters)

    def executemany_lastrowid(self, query, parameters):
        """Executes the given query against all the given param sequences.

        We return the lastrowid from the query.
        """
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.lastrowid
        finally:
            cursor.close()

    def get_fields_str(self, tablename):
        cursor = self._cursor()
        cursor.execute("describe %s" % tablename)
        fields=[]
        for row in cursor.fetchall():
            fields.append(row[0])
        str = ", ".join(fields)

        cursor.close()
        return str

    def get_fields_prefix_str(self, tablename, prefix):
        cursor = self._cursor()
        cursor.execute("describe %s" % tablename)
        fields=[]
        for row in cursor.fetchall():
            fields.append(prefix+row[0])
        str = ", ".join(fields)

        cursor.close()
        return str

    def get_select_sql(self, tablename):
        str = self.get_fields_str(tablename)
        sql = "SELECT " + str + " FROM " + tablename + " "
        return sql

    def insert_by_dict(self, tablename, rowdict, replace=False):
        cursor = self._cursor()
        cursor.execute("describe %s" % tablename)
        allowed_keys = set(row[0] for row in cursor.fetchall())
        keys = allowed_keys.intersection(rowdict)

        if len(rowdict) > len(keys):
            unknown_keys = set(rowdict) - allowed_keys
            logging.error("skipping keys: %s", ", ".join(unknown_keys))

        columns = ", ".join(keys)
        values_template = ", ".join(["%s"] * len(keys))

        if replace:
            sql = "REPLACE INTO %s (%s) VALUES (%s)" % (
                tablename, columns, values_template)
        else:
            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
                tablename, columns, values_template)

        values = tuple(rowdict[key] for key in keys)
        try:
            cursor.execute(sql, values)
            #self._execute(cursor, sql, values, None)
            return cursor.lastrowid
        finally:
            cursor.close()

    def transaction(self, query, *parameters, **kwparameters):
        self._db.begin()
        cursor = self._cursor()
        status = True
        try:
            for sql in query:
                cursor.execute(sql, kwparameters or parameters)
            self._db.commit()
        except OperationalError, e:
            self._db.rollback()
            status = False
            raise Exception(e.args[1], e.args[0])
        finally:
            cursor.close()
        return status

    def executemany_rowcount(self, query, parameters):
        """Executes the given query against all the given param sequences.

        We return the rowcount from the query.
        """
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.rowcount
        finally:
            cursor.close()

    update = execute_rowcount
    updatemany = executemany_rowcount

    insert = execute_lastrowid
    insertmany = executemany_lastrowid

    def _ensure_connected(self):
        # Mysql by default closes client connections that are idle for
        # 8 hours, but the client library does not report this fact until
        # you try to perform a query and it fails.  Protect against this
        # case by preemptively closing and reopening the connection
        # if it has been idle for too long (7 hours by default).
        if (self._db is None or
            (time.time() - self._last_use_time > self.max_idle_time)):
            self.reconnect()
        self._last_use_time = time.time()

    def _cursor(self):
        self._ensure_connected()
        return self._db.cursor()

    def _execute(self, cursor, query, parameters, kwparameters):
        try:
            return cursor.execute(query, kwparameters or parameters)
        except OperationalError:
            logging.error("Error connecting to MySQL on %s", self.host)
            self.close()
            raise

class Row(dict):
    """A dict that allows for object-like property access syntax."""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)

if MySQLdb is not None:
    # Fix the access conversions to properly recognize unicode/binary
    FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
    FLAG = MySQLdb.constants.FLAG
    CONVERSIONS = copy.copy(MySQLdb.converters.conversions)

    field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]
    if ‘VARCHAR‘ in vars(FIELD_TYPE):
        field_types.append(FIELD_TYPE.VARCHAR)

    for field_type in field_types:
        CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]

    # Alias some common MySQL exceptions
    IntegrityError = MySQLdb.IntegrityError
    OperationalError = MySQLdb.OperationalError

外带一个小例子,完整版请参照我在github上发布的一个webservice框架:https://github.com/yunfeiflying/tornado-rest-web-service-framwork/

#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
#   Author  :   YunJianFei
#   E-mail  :   [email protected]
#   Date    :   2014/02/25
#   Desc    :   Test db
#

""" Data Access Object
    This file impelements DBI for the table ‘Host‘

The Host table‘s create sql is : 

CREATE TABLE IF NOT EXISTS `test`.`Host` (
  `host_id` INT NOT NULL AUTO_INCREMENT,
  `host_type` INT NULL,
  `hostname` VARCHAR(45) NULL,
  `ip` VARCHAR(45) NULL,
  `create_time` VARCHAR(45) NULL,
  `cpu_count` INT NULL,
  `cpu_pcount` INT NULL,
  `memory` INT NULL,
  `os` VARCHAR(200) NULL,
  `comment` VARCHAR(200) NULL,
  PRIMARY KEY (`host_id`))
ENGINE = InnoDB;

"""

from util.dbconst import TableName, TableFields, TableSelectSql
import logging

class HostDao:
    def __init__(self, db):
        mysql_host = "192.168.10.11:3306"
        db_name = "test"
        db_user = "root"
        db_pass = ""

        self.db = torndb.Connection(
            host=mysql_host, database=db_name,
            user=db_user, password=db_pass
        )

    def insert_by_dict(self, host, replace=False):
        try:
            id = self.db.insert_by_dict("Host", host, replace)
            return id
        except Exception, ex:
            logging.error("Insert host failed! Exception: %s   Host: %s", str(ex), str(host))
            return None

    def if_exist(self, hostname, ip):
        ret = self.get_by_hostname(hostname)
        if ret != None:
            return True

        ret = self.get_by_ip(ip)
        if ret != None:
            return True

        return False

    def get_by_ip(self, ip):
        sql = TableSelectSql.HOST + " where ip=‘" + str(ip)+"‘"
        return self.db.get(sql)

    def get_all(self):
        sql = TableSelectSql.HOST
        return self.db.query(sql)

    def get_by_hostname(self, hostname):
        sql = TableSelectSql.HOST + " where hostname=‘" + str(hostname)+"‘"
        return self.db.get(sql)

    def get_by_id(self, host_id):
        sql = TableSelectSql.HOST + " where host_id=%s" % str(host_id)
        return self.db.get(sql)

    def get_id_by_hostname(self, hostname):
        sql = TableSelectSql.HOST + " where hostname=‘" + str(hostname)+"‘"
        ret = self.db.get(sql)
        if ret != None:
            return ret.host_id
        return None

    def update_worker_num_by_hostname(self, hostname, worker_num):
        try:
            sql = "UPDATE Host SET worker_num=%s WHERE hostname=‘%s‘" % (worker_num, str(hostname))
            ret = self.db.execute(sql)
            return ret
        except Exception, ex:
            logging.error("Update Host failed! Exception: %s   hostname: %s , worker_num: %s", str(ex), str(hostname), worker_num)
            return None

    def update_worker_num_by_id(self, host_id, worker_num):
        try:
            sql = "UPDATE Host SET worker_num=%s WHERE host_id=%s" % (worker_num, host_id)
            ret = self.db.execute(sql)
            return ret
        except Exception, ex:
            logging.error("Update Host failed! Exception: %s   host_id: %s , worker_num: %s", str(ex), host_id, worker_num)
            return None

    def del_by_hostname(self, hostname):
        try:
            sql = "DELETE FROM Host WHERE hostname=‘" + str(hostname) + "‘"
            ret = self.db.execute(sql)
            return ret
        except Exception, ex:
            logging.error("Delete host failed! Exception: %s   hostname: %s", str(ex), str(hostname))
            return None

    def del_by_id(self, host_id):
        try:
            sql = "DELETE FROM Host WHERE host_id=" + str(host_id)
            ret = self.db.execute(sql)
            return ret
        except Exception, ex:
            logging.error("Delete host failed! Exception: %s   host_id: %s", str(ex), host_id)
            return None

基于Torndb的简易ORM

时间: 2024-11-09 01:34:45

基于Torndb的简易ORM的相关文章

十四、EnterpriseFrameWork框架核心类库之简易ORM

在写本章前先去网上找了一下关于ORM的相关资料,以为本章做准备,发现很多东西今天才了解,所以在这里也对ORM做不了太深入的分析,但还是浅谈一下EFW框架中的设计的简易ORM:文中有一点讲得很有道理,Dao与ORM的区别,Dao是对数据库操作的封装,编写的代码方式像一种设计方法,而ORM支持对象与数据结构的映射,更像一种代码开发工具,有了这个工具会让我们开发代码更简单方便:但是同一类工具有简单的也有复杂的,比如文字工具有简单的Notepad,也有复杂的Word,不是说有了复杂的简单的工具就不需要了

C语言 linux环境基于socket的简易即时通信程序

转载请注明出处:http://www.cnblogs.com/kevince/p/3891033.html   By Kevince 最近在看linux网络编程相关,现学现卖,就写了一个简易的C/S即时通信程序,代码如下: head.h 1 /*头文件,client和server编译时都需要使用*/ 2 #include <unistd.h> 3 #include <stdio.h> 4 #include <sys/types.h> 5 #include <sys

CCS2.2基于软件仿真简易教程(汇编)

CCS2.2基于软件仿真简易教程(汇编) Rev 1.0 Writer Nirvana Silence 配置目标芯片 打开此图标 导入配置,生成gel文件,导入点击close 然后关闭 保存changes 启动工程软件 新建工程.asm文件,添加到工程 新建文件 保存为汇编格式 添加到工程 编写程序,编译程序,load程序 在新建的ASM文件中输入以下程序,查看运行后(1030H).(1040H).*AR3,AR4的值 记得助记符前面至少要有一个空格 编译 没有问题,load .out文件 打开

基于.NET的微软ORM框架视频教程(Entity Framework技术)

基于.NET的微软ORM框架视频教程(Entity Framework技术) 第一讲  ORM映射 第二讲 初识EntifyFramework框架 第三讲 LINQ表达式查询 第四讲 LINQ方法查询 第五讲 LINQ TO Entities 第六讲 ObjectQuery查询(上) 第七讲 ObjectQuery查询(下) 第八讲 Entity中的增删改及事务处理 第九讲 Entity中的存储过程使用(完) 源代码及视频

基于JAVA的简易在线聊天系统,觉得挺自豪的一个项目

项目做的比较多,最近,做的一个项目:基于JAVA的简易在线聊天系统,感觉挺自豪的. 这个项目应用JAVA编程语言实现基于网络的文本交互软件的设计和实现,达到多客户端收发文本消息的交互操作.通过服务器端多线程地监控客户端的登陆和退出,实时接收客户端发出的消息并定向发送到指定客户端,以实现在线实时文本消息传输功能:当服务器端运行时,客户端可以随时登陆和退出:客户端登陆时可以设置个人信息,包括用户名和密码:保持一定的系统稳定性,客户端与服务器端的连接稳定. 这个“基于JAVA的简易在线聊天系统”是我在

基于Spring的简易SSO设计

通常稍微规模大一些的企业,内部已经有很多的应用系统,多个系统整合首先要解决的便是“统一登录(SSO)”问题,之前写过一篇 利用Membership实现SSO(单点登录) ,java环境下已经有一些开源的成熟sso项目(比如CAS),但如果觉得CAS太麻烦,想自己再造轮子重复发明一个,可以参考下面的思路:(仍然是基于Cookie的实现,只不过安全性上略有加强,cookie端存放的token标识,不再与用户名.密码等这些敏感信息相关) 1.组件图 主要由3大部分组成, 1.1 SSO Client

基于Android 平台简易即时通讯的研究与设计[转]

摘要:论文简单介绍Android 平台的特性,主要阐述了基于Android 平台简易即时通讯(IM)的作用和功能以及实现方法.(复杂的通讯如引入视频音频等可以考虑AnyChat SDK~)关键词:Android 平台:即时通讯 (本文中图表点击附件即可见) 1 Android 平台简介Android 是Google 公司于2007年11月5日推出的手机操作系统,经过2年多的发展,Android平台在智能移动领域占有不小的份额,由Google为首的40 多家移动通信领域的领军企业组成开放手机联盟(

基于Android平台简易即时通讯的研究与设计

1 Android平台简介 Android是Google公司于2007年11月5日推出的手机操作系统,经过2年多的发展,Android平台在智能移动领域占有不小的份额,由Google为首的40多家移动通信领域的领军企业组成开放手机联盟(OHA).Google与运营商.设备制造商.开发商和其他第三方结成深层次的合作伙伴关系,希望通过建立标准化.开放式的移动电话软件平台,在移动产业内形成一个开放式的生态系统.正因如此,Android正在被越来越多的开发者和使用者所接受.近日,Google发言人Ant

【转】基于Map的简易记忆化缓存

看到文章后,自己也想写一些关于这个方面的,但是觉得写的估计没有那位博主好,而且又会用到里面的许多东西,所以干脆转载.但是会在文章末尾写上自己的学习的的东西. 原文出处如下: http://www.cnblogs.com/micrari/p/6921661.html 背景 在应用程序中,时常会碰到需要维护一个map,从中读取一些数据避免重复计算,如果还没有值则计算一下塞到map里的的小需求(没错,其实就是简易的缓存或者说实现记忆化).在公司项目里看到过有些代码中写了这样简易的缓存,但又忽视了线程安