SQLSERVER方法:
插入数据代码演示(上下文管理器方法):
import pymssql,uuid
from class_area.class_ReadConf import ReadDate #导入读取配置文件模块方法
sql_data=ReadDate(‘sqlserver.conf‘,‘DATABASE‘,‘config‘).readdata()
read_data=ReadDate(‘area.conf‘,‘AREAS‘,‘config‘).readdata()
area_read_2=eval(read_data)
# server 数据库服务器名称或IP
# user 用户名
# password 密码
# database 数据库名称‘
class SqlServer():
def insert_sql_server(self):
with pymssql.connect(eval(sql_data)[‘server‘],eval(sql_data)[‘user‘],eval(sql_data)[‘password‘],eval(sql_data)[‘database‘]) as conn:
with conn.cursor() as cursor:
for data_row in range(len(area_read_2)):
Area_Code=area_read_2[data_row][‘code‘]
Area_Name=area_read_2[data_row][‘name‘]
Center_Point=",".join((area_read_2[data_row][‘latitude‘],area_read_2[data_row][‘longitude‘]))
Area_ParentCode=area_read_2[data_row][‘parentCode‘]
sql=(‘INSERT INTO T_Sys_Area (ID,Area_Code,Area_Name,Center_Point,Area_ParentCode) VALUES (%s,%s,%s,%s,%s)‘%(‘\‘‘+str(uuid.uuid1()).replace(‘-‘,‘‘)+‘\‘‘,‘\‘‘+Area_Code+‘\‘‘,‘\‘‘+Area_Name+‘\‘‘,‘\‘‘+Center_Point+‘\‘‘,‘\‘‘+Area_ParentCode+‘\‘‘))
cursor.execute(sql) #执行sql
conn.commit() #提交数据
print("成功")
if __name__ == ‘__main__‘:
sql_server_data=SqlServer()
sql_server_data.insert_sql_server()
Oracle方法:
读取数据方法:
#!-*- encoding:utf-8 -*-
import cx_Oracle,uuid
from class_area.class_ReadConf import ReadDate
arearead_1=eval(ReadDate(‘Oracle.conf‘,‘DATABASE‘,‘config_1‘).readdata())
area_read=ReadDate(‘area.conf‘,‘AREAS‘,‘config‘).readdata()
area_read_2=eval(area_read)
class MySqlQueryData:
def __init__(self,mysqlsentence):
self.mysqlsentence = mysqlsentence
def mysqlconnect(self):
#连接数据库
#strConnection = eval(arearead_1)
#conn = cx_Oracle.connect(‘用户名/密码@数据库ip:端口号/数据库名‘)
#cnn=cx_Oracle.connect(‘newleader/[email protected]:1521/orcl‘)
data_1=arearead_1[‘user‘]+‘/‘+arearead_1[‘password‘]+‘@‘+arearead_1[‘host‘]+‘:‘+arearead_1[‘port‘]+‘/‘+arearead_1[‘database‘]
cnn=cx_Oracle.connect(data_1)
#使用cursor方法获取游标
cursor=cnn.cursor()
#cursor=cnn.cursor(buffered=True) #当SQL查询语句是查询所有的,然后调用的是fetchone()方法,那么就需要添加buffered=True,要不然会报错。
# 使用execute方法执行SQL语句
cursor.execute(self.mysqlsentence)
#使用fetchall方法获取所有数据
data=cursor.fetchall()
#读取一条数据
#data=cursor.fetchone()
#关闭游标
cursor.close()
#关闭数据库连接
cnn.close()
return data
if __name__ == "__main__":
mysqlquerydata = MySqlQueryData("select * from T_SYS_AREA")
print(mysqlquerydata.mysqlconnect())
插入数据方法:
class OracleInsertData():
def oracleconnect(self):
data_1=arearead_1[‘user‘]+‘/‘+arearead_1[‘password‘]+‘@‘+arearead_1[‘host‘]+‘:‘+arearead_1[‘port‘]+‘/‘+arearead_1[‘database‘]
cnn=cx_Oracle.connect(data_1)
cursor=cnn.cursor()
for data_row in range(len(area_read_2)):
AREA_CODE=area_read_2[data_row][‘code‘]
AREA_NAME=area_read_2[data_row][‘name‘]
CENTER_POINT=",".join((area_read_2[data_row][‘latitude‘],area_read_2[data_row][‘longitude‘]))
AREA_PARENTCODE=area_read_2[data_row][‘parentCode‘]
sql=(‘INSERT INTO T_SYS_AREA (ID,AREA_CODE,AREA_NAME,AREA_PARENTCODE,CENTER_POINT) VALUES (%s,%s,%s,%s,%s)‘%(‘\‘‘+str(uuid.uuid1()).replace(‘-,‘‘)+‘\‘‘,‘\‘‘+AREA_CODE+‘\‘‘,‘\‘‘+AREA_NAME+‘\‘‘,‘\‘‘+AREA_PARENTCODE+‘\‘‘,‘\‘‘+CENTER_POINT+‘\‘‘))
cursor.execute(sql)
cnn.commit()
cursor.close()
#关闭数据库连接
cnn.close()
print("成功")
if __name__ == "__main__":
oracleinsertdata=OracleInsertData()
oracleinsertdata.oracleconnect()
原文地址:https://www.cnblogs.com/hao2018/p/9293694.html