Pandas 和 sqlalchemy 配合实现分页查询 Mysql 并获取总条数
@api.route(‘/show‘, methods=["POST"]) def api_show(): # 分页查询并获取总数 offset = request.json.get(‘offset‘, 0) limit = request.json.get(‘limit‘, 10) sql = "select SQL_CALC_FOUND_ROWS * from bidata.gen_adid_cost order by id desc limit {offset},{limit}".format( offset=offset, limit=limit) con = db.get_engine(current_app, ‘tm_new_hfjy‘) with con.connect() as conn: df1 = pd.read_sql(sql, con=conn) df2 = pd.read_sql("SELECT FOUND_ROWS() as total;", con=conn) # df1 = df1.drop(index=[0]) # print(df1["update_time"].head()) # print(df1.dtypes) # df1["create_time"] = df1["create_time"].dt.strftime("%Y/%m/%d") total = df2.loc[0, "total"] df = df.where(df.isnull(), None) result = list(df1.T.to_dict().values()) import numpy as np class MyNpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, pd.datetime): return obj.strftime("%Y-%m-%d %H:%M:%S") elif isinstance(obj, date): return obj.strftime("%Y/%m/%d") else: return super(MyNpEncoder, self).default(obj) data = dict(code=RET.OK, msg="共 %s 条,本页加载 %s 条记录!" % (total, len(result)), data=result, total=total) return json.dumps(data, cls=MyNpEncoder)
原文地址:https://www.cnblogs.com/spaceapp/p/10792660.html
时间: 2024-10-11 13:10:55