之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。
主要功能:
1.查询单个对象:
所需参数:表名,过滤条件
2.查询多个对象:
所需参数:表名,过滤条件
3.按主键查询:
所需参数:表名,值
4.分页查询:
所需参数:表名,页码,每页记录数,过滤条件
具体代码如下:
1 import json, os, sys, time 2 3 import pymysql 4 from DBUtils import PooledDB 5 6 class BaseDao(object): 7 """ 8 简便的数据库操作基类 9 """ 10 __config = {} # 数据库连接配置 11 __conn = None # 数据库连接 12 __cursor = None # 数据库游标 13 __database = None # 用于临时村塾查询数据库 14 __tableName = None # 用于临时存储查询表名 15 __fields = [] # 用于临时存储查询表的字段列表 16 __primaryKey_dict = {} # 用于存储配置中的数据库中所有表的主键 17 18 def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 19 if host is None: 20 raise Exception("Parameter [host] is None.") 21 if user is None: 22 raise Exception("Parameter [user] is None.") 23 if password is None: 24 raise Exception("Parameter [password] is None.") 25 if database is None: 26 raise Exception("Parameter [database] is None.") 27 if port is None: 28 raise Exception("Parameter [port] is None.") 29 self.__config = dict({ 30 "creator" : creator, "charset":charset, 31 "host":host, "port":port, 32 "user":user, "password":password, "database":database 33 }) 34 self.__conn = PooledDB.connect(**self.__config) 35 self.__cursor = self.__conn.cursor() 36 self.__database = self.__config["database"] 37 self.__init_primaryKey() 38 print(get_time(), "数据库连接初始化成功。") 39 40 def __del__(self): 41 ‘重写类被清除时调用的方法‘ 42 if self.__cursor: 43 self.__cursor.close() 44 print(get_time(), "游标关闭") 45 if self.__conn: 46 self.__conn.close() 47 print(get_time(), "连接关闭") 48 49 def select_one(self, tableName=None, filters={}): 50 ‘‘‘ 51 查询单个对象 52 @tableName 表名 53 @filters 过滤条件 54 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 55 ‘‘‘ 56 self.__check_params(tableName) 57 sql = self.__query_util(filters) 58 self.__cursor.execute(sql) 59 result = self.__cursor.fetchone() 60 return self.__parse_result(result) 61 62 def select_pk(self, tableName=None, primaryKey=None): 63 ‘‘‘ 64 按主键查询 65 @tableName 表名 66 @primaryKey 主键值 67 ‘‘‘ 68 self.__check_params(tableName) 69 filters = {} 70 filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 71 sql = self.__query_util(filters) 72 self.__cursor.execute(sql) 73 result = self.__cursor.fetchone() 74 return self.__parse_result(result) 75 76 def select_all(self, tableName=None, filters={}): 77 ‘‘‘ 78 查询所有 79 @tableName 表名 80 @filters 过滤条件 81 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 82 ‘‘‘ 83 self.__check_params(tableName) 84 sql = self.__query_util(filters) 85 self.__cursor.execute(sql) 86 results = self.__cursor.fetchall() 87 return self.__parse_results(results) 88 89 def count(self, tableName=None): 90 ‘‘‘ 91 统计记录数 92 ‘‘‘ 93 self.__check_params(tableName) 94 sql = "SELECT count(*) FROM %s"%(self.__tableName) 95 self.__cursor.execute(sql) 96 result = self.__cursor.fetchone() 97 return result[0] 98 99 def select_page(self, tableName=None, pageNum=1, limit=10, filters={}): 100 ‘‘‘ 101 分页查询 102 @tableName 表名 103 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 104 ‘‘‘ 105 self.__check_params(tableName) 106 totalCount = self.count() 107 if totalCount / limit == 0 : 108 totalPage = totalCount / limit 109 else: 110 totalPage = totalCount // limit + 1 111 if pageNum > totalPage: 112 print("最大页数为%d"%totalPage) 113 pageNum = totalPage 114 elif pageNum < 1: 115 print("页数不能小于1") 116 pageNum = 1 117 beginindex = (pageNum-1) * limit 118 filters.setdefault("_limit_", (beginindex, limit)) 119 sql = self.__query_util(filters) 120 self.__cursor.execute(sql) 121 results = self.__cursor.fetchall() 122 return self.__parse_results(results) 123 124 def __parse_result(self, result): 125 ‘用于解析单个查询结果,返回字典对象‘ 126 obj = {} 127 for k,v in zip(self.__fields, result): 128 obj[k] = v 129 return obj 130 131 def __parse_results(self, results): 132 ‘用于解析多个查询结果,返回字典列表对象‘ 133 objs = [] 134 for result in results: 135 obj = self.__parse_result(result) 136 objs.append(obj) 137 return objs 138 139 def __init_primaryKey(self): 140 ‘根据配置中的数据库读取该数据库中所有表的主键集合‘ 141 sql = """SELECT TABLE_NAME, COLUMN_NAME 142 FROM Information_schema.columns 143 WHERE COLUMN_KEY=‘PRI‘ AND TABLE_SCHEMA=‘%s‘"""%(self.__database) 144 self.__cursor.execute(sql) 145 results = self.__cursor.fetchall() 146 for result in results: 147 self.__primaryKey_dict[result[0]] = result[1] 148 149 def __query_fields(self, tableName=None, database=None): 150 ‘查询表的字段列表, 将查询出来的字段列表存入 __fields 中‘ 151 sql = """SELECT column_name 152 FROM Information_schema.columns 153 WHERE table_Name = ‘%s‘ AND TABLE_SCHEMA=‘%s‘"""%(tableName, database) 154 self.__cursor.execute(sql) 155 fields_tuple = self.__cursor.fetchall() 156 self.__fields = [fields[0] for fields in fields_tuple] 157 158 def __query_util(self, filters=None): 159 """ 160 SQL 语句拼接方法 161 @filters 过滤条件 162 """ 163 sql = r‘SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}‘ 164 # 拼接查询表 165 sql = sql.replace("#{TABLE_NAME}", self.__tableName) 166 # 拼接查询字段 167 self.__query_fields(self.__tableName, self.__database) 168 FIELDS = "" 169 for field in self.__fields: 170 FIELDS += field + ", " 171 FIELDS = FIELDS[0: len(FIELDS)-2] 172 sql = sql.replace("#{FIELDS}", FIELDS) 173 # 拼接查询条件(待优化) 174 if filters is None: 175 sql = sql.replace("#{FILTERS}", "") 176 else: 177 FILTERS = "" 178 if not isinstance(filters, dict): 179 raise Exception("Parameter [filters] must be dict type. ") 180 isPage = False 181 if filters.get("_limit_"): 182 isPage = True 183 beginindex, limit = filters.get("_limit_") 184 for k, v in filters.items(): 185 if k.startswith("_in_"): # 拼接 in 186 FILTERS += "AND %s IN (" %(k[4:]) 187 values = v.split(",") 188 for value in values: 189 FILTERS += "%s,"%value 190 FILTERS = FILTERS[0:len(FILTERS)-1] + ") " 191 elif k.startswith("_nein_"): # 拼接 not in 192 FILTERS += "AND %s NOT IN (" %(k[4:]) 193 values = v.split(",") 194 for value in values: 195 FILTERS += "%s,"%value 196 FILTERS = FILTERS[0:len(FILTERS)-1] + ") " 197 elif k.startswith("_like_"): # 拼接 like 198 FILTERS += "AND %s like ‘%%%s%%‘ " %(k[6:], v) 199 elif k.startswith("_ne_"): # 拼接不等于 200 FILTERS += "AND %s != ‘%s‘ " %(k[4:], v) 201 elif k.startswith("_lt_"): # 拼接小于 202 FILTERS += "AND %s < ‘%s‘ " %(k[4:], v) 203 elif k.startswith("_le_"): # 拼接小于等于 204 FILTERS += "AND %s <= ‘%s‘ " %(k[4:], v) 205 elif k.startswith("_gt_"): # 拼接大于 206 FILTERS += "AND %s > ‘%s‘ " %(k[4:], v) 207 elif k.startswith("_ge_"): # 拼接大于等于 208 FILTERS += "AND %s >= ‘%s‘ " %(k[4:], v) 209 elif k in self.__fields: # 拼接等于 210 FILTERS += "AND %s = ‘%s‘ "%(k, v) 211 sql = sql.replace("#{FILTERS}", FILTERS) 212 if isPage: 213 sql += "LIMIT %d,%d"%(beginindex, limit) 214 215 print(get_time(), sql) 216 return sql 217 218 def __check_params(self, tableName): 219 ‘‘‘ 220 检查参数 221 ‘‘‘ 222 if tableName: 223 self.__tableName = tableName 224 else: 225 if self.__tableName is None: 226 raise Exception("Parameter [tableName] is None.") 227 228 def get_time(): 229 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) 230 231 if __name__ == "__main__": 232 config = { 233 # "creator": pymysql, 234 # "host" : "127.0.0.1", 235 "user" : "root", 236 "password" : "root", 237 "database" : "test", 238 # "port" : 3306, 239 # "charset" : ‘utf8‘ 240 } 241 base = BaseDao(**config) 242 ######################################################################## 243 # user = base.select_one("user") 244 # print(user) 245 ######################################################################## 246 # users = base.select_all("user") 247 # print(users) 248 ######################################################################## 249 # filter1 = { 250 # "sex":0, 251 # "_in_id":"1,2,3,4,5", 252 # "_like_name":"zhang", 253 # "_ne_name":"wangwu" 254 # } 255 # user_filters = base.select_all(tableName="user", filters=filter1) 256 # print(user_filters) 257 ######################################################################## 258 # menu = base.select_one(tableName="menu") 259 # print(menu) 260 ######################################################################## 261 # user_pk = base.select_pk("user", 2) 262 # print(user_pk) 263 ######################################################################## 264 # filter2 = { 265 # "_in_id":"1,2,3,4", 266 # "_like_name":"test" 267 # } 268 # user_limit = base.select_page("user", 2, 10, filter2) #未实现 269 # print(user_limit) 270 ########################################################################
代码中已经给出了几个具体示例,大家可以参考使用。
如果有感兴趣一起学习、讨论Python的可以加QQ群:626787819,有啥意见或者建议的可以发我邮箱:[email protected]。
时间: 2024-11-12 02:26:42