python连接Greenplum数据库

配置greenplum客户端认证

配置pg_hba.conf

cd /home/gpadmin/gpdbdata/master/gpseg-1
vim pg_hba.conf
增加
host    all         gpadmin          10.1.201.55/32    trust

[gpadmin@ gpseg-1]$ export PGDATA=/home/gpadmin/gpdbdata/master/gpseg-1
[gpadmin@ gpseg-1]$ pg_ctl reload -D $PGDATA
server signaled

使用Psycopg2访问数据库

Psycopg2 是 Python 语言下最常用的连接PostgreSQL数据库连接库,Psycopg2 的底层是由 C 语言封装 PostgreSQL 的标准库  libpq 实现的,

运行速度非常快,Psycopg2支持大型多线程应用的大量并发Insert和Update操作,Psycopg2完全兼容 DB API 2.0 

安装Psycopg2 

pip install psycopg2

Psycopg2使用参考文档

http://initd.org/psycopg/docs/index.html

Psycopg2 连接PostgreSQL数据库接口

Psycopg2提供的操作数据库的两个重要类是ConnectionCursor,和获取数据库连接的快捷函数connect()

psycopg2.connect(host="localhost", port="5432", dbname="testdb", user="gpadmin", password="123456")

常用关键词参数说明如下:

  • host:主机名或 IP 地址
  • port:连接PostgreSQL数据库使用的端口
  • dbname:连接的数据库,默认为与用户名同名的数据库
  • user:连接数据库的用户
  • password:连接数据库用户的密码

Connection类方法说明

Connection类用于获取到PostgreSQL数据库的连接,以下介绍Connection类常用的方法,详细内容阅读 Psycopg2 Connection 类

  • Connection()构造函数

    用于构造一个到PostgreSQL数据库的连接,常用的是使用connect()快捷函数构造数据库连接

  • connection.cursor()

    用于从当前的数据库连接中获取一个Cursor对象(游标),用于执行SQL语句。
  • cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

    这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用

  • connection.close()

    关闭当前的数据库连接
  • connection.commit()

    此方法提交当前事务。如果不调用这个方法,无论做了什么修改,数据不能保存到数据库中。
  • connection.rollback()

    此方法会回滚任何更改数据库数据

Cursor类方法说明

Cursor类用于执行SQL语句,并返回执行结果,以下介绍Cursor类常用的方法,详细内容阅读 Psycopg2 Cursor 类

  • cursor.execute(query)

    用于执行SQL语句 query

  • cursor.mogrify(query)

    会返回生成的sql脚本, 用以查看生成的sql是否正确

  • cursor.fetchall()

    获取SQL执行结果中的所有记录,返回值是一个元组的列表,每一条记录是一个元组

  • cursor.fetchmany(([size=cursor.arraysize]))

    获取SQL执行结果中指定条数的记录,记录数由size指定,当不指定size值时,默认为arraysize属性的值,arraysize属性的默认值是1;返回值是一个元组的列表,每一条记录是一个元组

  • cursor.fetchone()

    获取执行结果中的一条记录

  • cursor.close()

    关闭当前连接的游标
  • curosr.callproc(procname[, parameters])

    这个程序执行的存储数据库程序给定的名称。该程序预计为每一个参数,参数的顺序必须包含一个条目
  • cursor.rowcount

    只读属性,它返回数据库中的行的总数已修改,插入或删除最后 execute*()

psycopg2.pool模块说明

提供了一些纯Python类直接在客户端应用程序实现简单的连接池。

class psycopg2.pool.AbstractConnectionPool(minconn, maxconn, *args, **kwargs)

基类实现通用的基于密钥池代码。

自动创建新的minconn连接。池将支持的最大maxconn连接。* args,* * kwargs传递到connect()函数。

以下预计将由子类实现方法:

getconn(key=None)

得到一个空连接,并将其分配给key如果不是没有。

putconn(conn, key=None, close=False)

 put away 一个连接。

  如果关闭是真的,则放弃池中的连接。

 closeall()

关闭池处理的所有连接。

请注意所有的连接都关闭,包括最终在应用程序中使用的连接。

下面的类,子类可以使用abstractconnectionpool。

class psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwargs)

不能在不同线程中共享的连接池。

请注意,这个池类仅用于单线程应用程序。

class psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwargs)

一个连接池与线程模块一起工作。

注意这个池类可以安全地应用在多线程应用程序中。

Psycopg2中可用的异常错误类

异常错误类的继承关系如下:

StandardError
|__Warning
|__Error
   |__InterfaceError
   |__DatabaseError
      |__DataError
      |__OperationalError
         |__ psycopg2.extensions.QueryCanceledError
         |__ psycopg2.extensions.TransactionRollbackError
      |__IntegrityError
      |__InternalError
      |__ProgrammingError
      |__NotSupportedError
异常 描述
psycopg2.Warning 当有严重警告时触发,例如插入数据是被截断等等。
psycopg2.Error 警告以外所有其他错误类。
psycopg2.InterfaceError 当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发。
psycopg2.DatabaseError 和数据库有关的错误发生时触发。
psycopg2.DataError 当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等。
psycopg2.OperationalError 指非用户控制的,而是操作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等操作数据库是发生的错误。
psycopg2.IntegrityError 完整性相关的错误,例如外键检查失败等。
psycopg2.InternalError 数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等。
psycopg2.ProgrammingError 程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等。
psycopg2.NotSupportedError 不支持错误,指使用了数据库不支持的函数或API等。例如在连接对象上 使用.rollback()函数,然而数据库并不支持事务或者事务已关闭。

Psycopg2使用举例

简单的增加,查询记录

import psycopg2
import psycopg2.extras
import time

‘‘‘
    连接数据库
    returns:db
‘‘‘
def gp_connect():
    try:
        db = psycopg2.connect(dbname="testdb",
                              user="gpadmin",
                              password="gpadmin",
                              host="10.1.208.42",
                              port="5432")
        # connect()也可以使用一个大的字符串参数,
        # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
        return db
    except psycopg2.DatabaseError as e:
        print("could not connect to Greenplum server",e)

if __name__ == ‘__main__‘:
    conn = gp_connect()
    print(conn)
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
    ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
    conn.commit()
    # 提交到数据库中
    print(ret)
    ret = cur.execute("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);",(300, "abc‘def"))

    conn.commit()
    # 提交到数据库中
    print(cur.rowcount)  # 1
    # 返回数据库中的行的总数已修改,插入或删除最后 execute*().

    ret_sql = cur.mogrify("select * from pg_tables where tablename = %s;", (‘gp_test‘,))
    # 返回生成的sql脚本, 用以查看生成的sql是否正确.
    # sql脚本必须以;结尾, 不可以省略.其次, 不管sql中有几个参数, 都需要用 % s代替, 只有 % s, 不管值是字符还是数字, 一律 % s.
    # 最后, 第二个参数中, 一定要传入元组, 哪怕只有一个元素, 像我刚才的例子一样, (‘gp_test‘)这样是不行的.
    print(ret_sql.decode(‘utf-8‘))  # select * from pg_tables where tablename = E‘gp_test‘;

    cur.execute("select * from gp_test where num = %s;", (300,))
    pg_obj = cur.fetchone()
    print(pg_obj) # {‘id‘: 1, ‘num‘: 300, ‘data‘: "abc‘def"}

    conn.close() # 关闭连接

批量插入,查询

    conn = gp_connect()
    print(conn)
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    # # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
    # ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
    # conn.commit()
    # # 提交到数据库中
    # print(ret)
    gp_list = []
    for i in range(200):
        gp_list.append((i,‘abc%s‘%i))
    # print(gp_list)
    # 批量提交数据
    ret = cur.executemany("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);", gp_list)
    conn.commit()
    # 提交到数据库中
    print(cur.query)  # 查看上一条执行的脚本
    print(cur.rowcount)  # 200
    # 返回数据库中的行的总数已修改,插入或删除最后 execute*().
    cur.execute("select  count(*) num from gp_test")
    pg_obj = cur.fetchone()
    print(pg_obj)  # {‘num‘: 200}

    conn.close()  # 关闭连接

使用连接池,执行高性能的批量插入与查询

import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime

‘‘‘
    连接数据库
    使用数据库连接池
    returns:db
‘‘‘
def gp_connect():
    try:
        simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5,dbname="testdb",
                              user="gpadmin",
                              password="gpadmin",
                              host="10.1.208.42",
                              port="5432")
        # connect()也可以使用一个大的字符串参数,
        # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
        # 从数据库连接池获取连接
        conn = simple_conn_pool.getconn()
        return conn
    except psycopg2.DatabaseError as e:
        print("could not connect to Greenplum server",e)

if __name__ == ‘__main__‘:
    conn = gp_connect()
    print(conn)
    cur = conn.cursor()
    # 批量查询大小
    batch_size = 1000
    gp_list = []
    for i in range(2000, 100000):
        gp_list.append((i,‘abc%s‘%i))
    # print(gp_list)

    # 开始时间
    start_time = datetime.now()
    # 批量提交数据execute_values性能大于executemany
    psycopg2.extras.execute_values(cur, "INSERT INTO public.gp_test (num, data) VALUES %s", gp_list)
    conn.commit()
    # 提交到数据库中
    cur.execute("select  *  from gp_test order by id")
    count = 0

    while True:
        count = count + 1
        # 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
        data = cur.fetchmany(batch_size)
        # 数据为空的时候中断循环
        if not data:
            break
        else:
            print(data[-1])  # 得到最后一条(通过元祖方式返回)
        print(‘获取%s到%s数据成功‘ % ((count - 1) * batch_size, count * batch_size))
    print(‘insert到fetchmany获取全量数据所用时间:‘, (datetime.now() - start_time).seconds) # 16s
conn.close()  # 关闭连接

执行高性能的批量更新与查询

import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime

‘‘‘
    连接数据库
    使用数据库连接池
    returns:db
‘‘‘
def gp_connect():
    ……略

if __name__ == ‘__main__‘:
    conn = gp_connect()
    print(conn)
    cur = conn.cursor()
    # 批量查询大小
    batch_size = 1000
    gp_uplist = [] # 更新列表
    for i in range(2000, 10000):
        gp_uplist.append((i,‘def%s‘%i))
    print(gp_uplist)

    # 开始时间
    start_time = datetime.now()
    # 批量提交数据execute_values性能大于executemany

    sql = "UPDATE public.gp_test SET data = TEST.data  "           "FROM (VALUES %s) AS TEST(num, data) "           "WHERE public.gp_test.num = TEST.num"
    # 批量更新语句模版 UPDATE TABLE SET TABLE.COL = XX.col
    # FROM (VALUES %s) AS XX(id_col,col)
    # WHERE TABLE.id_col = XX.id_col
    # XX为别名
    psycopg2.extras.execute_values(cur, sql, gp_uplist, page_size=100)
    print(cur.query)
    conn.commit()
    # 提交到数据库中
    cur.execute("select  *  from gp_test order by id")
    count = 0

    while True:
        count = count + 1
        # 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
        data = cur.fetchmany(batch_size)
        # 数据为空的时候中断循环
        if not data:
            break
        else:
            print(data[-1])  # 得到最后一条(通过元祖方式返回)
        print(‘获取%s到%s数据成功‘ % ((count - 1) * batch_size, count * batch_size))
    print(‘update到fetchmany获取全量数据所用时间:‘, (datetime.now() - start_time).seconds) # 16s
conn.close()  # 关闭连接

原文地址:https://www.cnblogs.com/xiao-apple36/p/10362367.html

时间: 2024-11-01 20:50:35

python连接Greenplum数据库的相关文章

Python连接Mysql数据库(Debian)

Python连接Mysql数据库(Debian) 以下是Python 2.*版本的安装方法,MySQL-python暂不支持Python 3.*版本 提前要做的工作: 安装setuptools,在终端中运行 wget https://bootstrap.pypa.io/ez_setup.py -O - | sudo python 安装pip,下载“get_pip.py”,运行 python get_pip.py 运行如下命令装好必要的包 sudo apt-get install python-d

python连接Oracle数据库

# python连接oracle数据 ## 介绍------------------------------ python 连接oracle数据库,可以使用cx_oracle模块 - 使用如下命令安装```python -m pip install cx_oracle --pre``` ## 连接oracle代码-----------------------------```pythonimport cx_oracle # 设置 dsn = cx_oracle.makedsn("192.168.

[笔记]--在Ubuntu系统用Python连接Mysql数据库

环境:Ubuntu11.10,Python2.7,Mysql5.0.95 在Ubuntu终端输入命令安装Python的Mysql模块 sudo apt-get install python-mysqldb 就这么简单: 运行一下脚本: #!/usr/bin/python #-*-coding=utf-8# # import MySQLdb cn = MySQLdb.Connection(host="192.168.88.124",user="root",passwd

Python连接MySQL数据库之pymysql模块使用

Python连接MySQL数据库之pymysql模块使用 Python3连接MySQL PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2中则使用mysqldb. Django中也可以使用PyMySQL连接MySQL数据库. PyMySQL安装 pip install pymysql 连接数据库 注意事项 在进行本文以下内容之前需要注意: 你有一个MySQL数据库,并且已经启动. 你有可以连接该数据库的用户名和密码 你有一个有权限操作的datab

Python 连接 Oracle数据库

1.环境设置 [[email protected] ~]# cat /etc/redhat-release CentOS release 6.9 (Final) [[email protected] ~]# python -V Python 2.6.6 版本:Oracle 12c 2.前提:安装cx_Oracle模块依赖包 由于使用Python连接Oracle,所以需要下载oracle客户端包 官网:http://www.oracle.com/technetwork/topics/linuxx8

windows下python连接oracle数据库

python连接oracle数据库的方法,具体如下 1.首先安装cx_Oracle包2.解压instantclient-basic-windows.x64-11.2.0.4.0.zip到c:\oracle3.拷贝instantclient_11_2下所有.dll文件到c:\python34\Lib\site-packages\下(根据自己的python版本拷贝到相应的site-packages文件夹下) python连接示例代码: # -*- coding: utf-8 -*- import c

python连接mysql数据库——版本问题

今天终于解决了使用python连接数据库不成功的问题,现将过程总结如下: 一.出现的问题 在使用python连接mysql数据库是一直出现如下问题: 1.只能连接到我数据库中的的第一个数据库,但是不能操作里面的表,会报错表不存在.(表是存在的)2.更换其他数据库后,直接报错找不到该数据库.(数据库和表均存在) 运行连接数据库的代码,会出现: conn = pymysql.connect(user='root', password='password', database='XXX') Trace

Python 连接Oracle数据库

连接:python操作oracle数据库 1. 下载模块 cx_Oracle 2. Windows下用easy_install.exe (Python安装目录下,Script目录中)先安装pip 3. 在下载cx_Oracle模块的目录下,执行pip install  xxx 命令 4. 验证: Python执行 import cx_Oracle

Python连接Access数据库

前言 今天想要用Python访问Access数据库,折腾了半天,特记录一下 背景 最近想将一些文件记录下来,存入数据库,为此拿LabVIEW写了一个版本,记录环境配置为: LabVIWE:2015 Access:2016 驱动连接字符串: Provider=Microsoft.Jet.OLEDB.4.0;Data Source=%s;Jet OLEDB:Database Password=;Persist Security Info=False 虽然用LabVIEW已经实现功能,但觉得还是太笨重