python socketpool:通用连接池

简介

在软件开发中经常要管理各种“连接”资源,通常我们会使用对应的连接池来管理,比如mysql数据库连接可以用sqlalchemy中的池来管理,thrift连接可以通过thriftpool管理,redis-py中的StrictRedis实现本身就是基于连接池的,等等。 而今天介绍的socketpool是一个通用的python连接池库,通过它可以实现任意类型连接的管理,虽然不是很完美,但在一些找不到合适连接池实现、而又不想自己造轮子的时候使用起来会节省很多精力。

内部实现要点

  • 这个类库的代码其实并不是特别的漂亮,但结构设计的不错,关键留下了对拓展开放的钩子,能让使用者根据自己的需要定制自己的连接池
  • 内部主要的组件有ConnectionPool,Connector和backend_mod三个
    • ConnectionPool实现了一个连接池的通用逻辑,用一个优先级队列管理所有连接,另外支持connection的生命周期定制,有一个reap机制(可选),基本思想是每个conn有一个最大生命周期,比如600秒,过了这个时间,就必须回收掉,reap线程(也有可能是greenlet或eventlet)定期检查过期的conn并进行回收
    • Connector是一个接口,它可以看做是一个制造conn的工厂,ConnectionPool在需要新建conn的时候,会通过这个工厂来生成conn。所以我们只要实现Connector的接口方法就可以定制一个自己的连接工厂
    • backend_mod是为了支持不同的线程模型(比如python原生线程,gevent或者eventlet)抽象出来的后端模块,它统一封装了Socket, PriorityQueue, Semaphore等和并发模型相关的组件,在创造ConnectionPool对象时可以通过参数控制选用哪种backend

部分代码阅读

ConnectionPool的初始化函数

     def __init__(self, factory,
                  retry_max=3, retry_delay=.1,
                  timeout=-1, max_lifetime=600.,
                  max_size=10, options=None,
                  reap_connections=True, reap_delay=1,
                  backend="thread"):

         if isinstance(backend, str):
             self.backend_mod = load_backend(backend)
             self.backend = backend
         else:
             self.backend_mod = backend
             self.backend = str(getattr(backend, ‘__name__‘, backend))
         self.max_size = max_size
         self.pool = getattr(self.backend_mod, ‘PriorityQueue‘)()
         self._free_conns = 0
         self.factory = factory
         self.retry_max = retry_max
         self.retry_delay = retry_delay
         self.timeout = timeout
         self.max_lifetime = max_lifetime
         if options is None:
             self.options = {"backend_mod": self.backend_mod,
                             "pool": self}
         else:
             self.options = options
             self.options["backend_mod"] = self.backend_mod
             self.options["pool"] = self

         # bounded semaphore to make self._alive ‘safe‘
         self._sem = self.backend_mod.Semaphore(1)

         self._reaper = None
         if reap_connections:
             self.reap_delay = reap_delay
             self.start_reaper()

这里几个参数的意义:

  • factory是类对象,需要实现Connector接口,用来生成conn,options是调用factory时传入的参数
  • retry_max是获取conn时如果出错最多重试几次
  • max_lifetime是规定每个conn最大生命时间,见上面说的reap机制
  • max_size是这个pool的大小上限
  • backend是线程模型
  • reap_connections控制是否启用reap机制

被启动的reap就是一个单独的线程,定时调用下面的方法把过期的conn回收掉:

     def murder_connections(self):
         current_pool_size = self.pool.qsize()
         if current_pool_size > 0:
             for priority, candidate in self.pool:
                 current_pool_size -= 1
                 if not self.too_old(candidate):
                     self.pool.put((priority, candidate))
                 else:
                     self._reap_connection(candidate)
                 if current_pool_size <= 0:
                     break

_reap_connection最终会回调conn对象的invalidate方法(Connector的接口)进行销毁。每次使用完conn后会调用release_connection, 它的逻辑是

     def release_connection(self, conn):
         if self._reaper is not None:
             self._reaper.ensure_started()

         with self._sem:
             if self.pool.qsize() < self.max_size:
                 connected = conn.is_connected()
                 if connected and not self.too_old(conn):
                     self.pool.put((conn.get_lifetime(), conn))
                 else:
                     self._reap_connection(conn)
             else:
                 self._reap_connection(conn)

如果连接还没过期或断开,就会被重新放入优先级队列中,用户可以通过实现Connector接口的get_lifetime来控制这里放回的conn的优先级,priority最小的conn下次会被优先取出

Connector定义了哪些接口呢?

 class Connector(object):
     def matches(self, **match_options):
         raise NotImplementedError()

     def is_connected(self):
         raise NotImplementedError()

     def handle_exception(self, exception):
         raise NotImplementedError()

     def get_lifetime(self):
         raise NotImplementedError()

     def invalidate(self):
         raise NotImplementedError()

matches方法主要用在pool取出一个conn时,除了优先选择priority最小的conn,还需要这个conn和get(**options)传入的参数match,这个match就是回调conn的matches方法。其他几个接口前面都涉及到了。

TcpConnector实现

来看一下socketpool自带的TcpConnector的实现,实现tcp socket的工厂

 class TcpConnector(Connector):

     def __init__(self, host, port, backend_mod, pool=None):
         self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)
         self._s.connect((host, port))
         self.host = host
         self.port = port
         self.backend_mod = backend_mod
         self._connected = True
         # use a ‘jiggle‘ value to make sure there is some
         # randomization to expiry, to avoid many conns expiring very
         # closely together.
         self._life = time.time() - random.randint(0, 10)
         self._pool = pool

     def __del__(self):
         self.release()

     def matches(self, **match_options):
         target_host = match_options.get(‘host‘)
         target_port = match_options.get(‘port‘)
         return target_host == self.host and target_port == self.port

     def is_connected(self):
         if self._connected:
             return util.is_connected(self._s)
         return False

     def handle_exception(self, exception):
         print(‘got an exception‘)
         print(str(exception))

     def get_lifetime(self):
         return self._life

     def invalidate(self):
         self._s.close()
         self._connected = False
         self._life = -1

     def release(self):
         if self._pool is not None:
             if self._connected:
                 self._pool.release_connection(self)
             else:
                 self._pool = None

     def send(self, data):
         return self._s.send(data)

     def recv(self, size=1024):
         return self._s.recv(size)

不需要太多额外解释。

拓展实现HiveConnector

根据自身项目需要,我用pyhs2实现了一个hive连接池

 class HiveConnector(Connector):

     def __init__(self, host, port, backend_mod, pool=None, authMechanism=‘NOSASL‘,
                  **options):
         self.host = host
         self.port = port
         self.backend_mod = backend_mod
         self._pool = pool
         self._connected = False
         self._conn = pyhs2.connect(host=host,
                                    port=port,
                                    authMechanism=authMechanism
                                    )
         self._connected = True
         # use a ‘jiggle‘ value to make sure there is some
         # randomization to expiry, to avoid many conns expiring very
         # closely together.
         self._life = time.time() - random.randint(0, 10)

     def __del__(self):
         self.release()

     def matches(self, **match_options):
         target_host = match_options.get(‘host‘)
         target_port = match_options.get(‘port‘)
         return target_host == self.host and target_port == self.port

     def is_connected(self):
         return self._connected

     def handle_exception(self, exception):
         logger.exception("error: %s" % str(exception))

     def get_lifetime(self):
         return self._life

     def invalidate(self):
         try:
             self._conn.close()
         except:
             pass
         finally:
             self._connected = False
             self._life = -1

     def release(self):
         if self._pool is not None:
             if self._connected:
                 self._pool.release_connection(self)
             else:
                 self._pool = None

     def cursor(self):
         return self._conn.cursor()

     def execute(self, hql):
         with self.curosr() as cur:
             return cur.execute(hql)

 hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)

使用这个hive_pool去执行hql语句非常容易:

     with hive_pool.connection() as conn:
         with conn.cursor() as cur:
             print cur.getDatabases()

总结

简绍了socketpool的内部实现,以及如何使用它构造自己的连接池。

时间: 2024-08-10 17:00:05

python socketpool:通用连接池的相关文章

如何实现python的mysql连接池并加入缓存过期

mysql建立的连接,在8小时内都没有访问请求的话,mysql server将主动断开这条连接.在使用pymysql或MySQLdb操作数据库连接时,当cursor一直处于连接状态,未及时close时,连接池被占用.查看后台日志: "MySQL server has gone away (%r)" % (e,)) pymysql.err.OperationalError: (2006, "MySQL server has gone away (TimeoutError(110

用python自定义实现db2的连接池

想要模仿zabbix的oracle插件orabix来实现对db2的监控,但是Java能力有限,就用python来实现了.但是python常用的连接池PooledDB似乎并不支持db2,一直报这样的错误:"Database module is not thread-safe."所幸我只是用来做监控的,要求并不是很高,只要实现连接池的两个基本功能即可: 1.连接复用 2.连接检查,重连 1 #!/usr/local/bin/python 2 # -*- coding: utf-8 -*-

python 里的 redis 连接池的原理

python设置redis连接池的好处: 通常情况下,需要连接redis时,会创建一个连接,基于这个连接进行redis操作,操作完成后去释放,正常情况下,这是没有问题的,但是并发量较高的情况下,频繁的连接创建和释放对性能会有较高的影响,于是连接池发挥作用. 连接池的原理:‘预先创建多个连接,当进行redis操作时,直接获取已经创建好的连接进行操作.完成后,不会释放这个连接,而是让其返回连接池,用于后续redis操作!这样避免连续创建和释放,从而提高了性能! import redis pool =

day24——NoSQL简介、redis服务搭建、redis连接池、redis管道

一.Redis 安装 yum install -y epel-releaseyum install -y gcc jemalloc-devel wgetcd /usr/local/srcwget https://codeload.github.com/antirez/redis/tar.gz/2.8.21 -O redis-2.8.21.tar.gztar xf redis-2.8.21.tar.gzcd redis-2.8.21makemake PREFIX=/usr/local/redis

greentor MySQL连接池实现

greentor MySQL连接池实现 https://en.wikipedia.org/wiki/Connection_pool 通过greentor实现了pymysql在Tornado上异步调用的过程后发现,每次建立数据库连接都会经过socket 3次握手,而每一次socket读写都会伴随着greenlet的切换,以及ioloop的callback过程,虽然是异步了,但是IO性能并没有提升,所以在研究了TorMySQL连接池的实现后,实现了greentor自己的连接池. https://gi

python 基础 10.0 nosql 简介--redis 连接池及管道

一. NOSQL 数据库简介 NoSQL 泛指非关系型的数据库.非关系型数据库与关系型数据库的差别 非关系型数据库的优势: 1.性能NOSQL 是基于键值对的,可以想象成表中的主键和值的对应关系,而且不需要经过SQL 层的解析,所以性能非常高. 2.可扩展性同样也是因为基于键值对,数据之间没有耦合性,所以非常容易水平扩展. 关系型数据库的优势: 1. 复杂查询可以用SQL语句方便的在一个表以及多个表之间做非常复杂的数据查询. 2.事务支持使得对于安全性能很高的数据访问要求得以实现.对于这两类数据

Python 使用 PyMysql、DBUtils 创建连接池提升性能

Python 使用 PyMysql.DBUtils 创建连接池提升性能 Python 编程中可以使用 PyMysql 进行数据库的连接及诸如查询/插入/更新等操作,但是每次连接 MySQL 数据库请求时,都是独立的去请求访问,相当浪费资源,而且访问数量达到一定数量时,对 mysql 的性能会产生较大的影响.因此,实际使用中,通常会使用数据库的连接池技术,来访问数据库达到资源复用的目的. 解决方案:DBUtils DBUtils 是一套 Python 数据库连接池包,并允许对非线程安全的数据库接口

Python下Mysql数据连接池——单例

# coding:utf-8 import threading import pymysql from DBUtils.PooledDB import PooledDB from app.common.file_config import get_config class DbPool(object): _instance_lock = threading.Lock() def __init__(self): if not hasattr(DbPool, "pool"): DbPool

python通过连接池连接redis,操作redis队列

在每次使用redis都进行连接的话会拉低redis的效率,都知道redis是基于内存的数据库,效率贼高,所以每次进行连接比真正使用消耗的资源和时间还多.所以为了节省资源,减少多次连接损耗,连接池的作用相当于缓存了多个客户端与redis服务端的连接,当有新的客户端来进行连接时,此时,只需要去连接池获取一个连接即可,实际上连接池就是把一个连接共享给多个客户端,可以说是广播,要用的话就去接收. #-*-coding:utf-8-*- import redis # 连接池连接使用,节省了每次连接用的时间