先把上一节的时序图拿来,强化一下印象,然后顺着图往下捋:
Created with Rapha?l 2.1.0C3P0连接池初始化过程UserUserComboPooledDataSourceComboPooledDataSourceC3P0PooledConnectionPoolManagerC3P0PooledConnectionPoolManagerC3P0PooledConnectionPoolC3P0PooledConnectionPoolBasicResourcePoolBasicResourcePoolgetConnection()getPool()checkoutPooledConnection()checkoutResource()
核心类对应角色及作用
整个C3P0源码分析的第一篇应该已经介绍过了这几个核心类各自的角色及作用了,还不清楚的童鞋可以移步C3P0整体类结构简单分析去学习一下。这里就再说说它们之间的关系。
C3P0PooledConnectionPoolManager创建过程
ComboPooledDataSource
与C3P0PooledConnectionPoolManager
是一对一的关系,并且只会在初始化过程创建一次。C3P0PooledConnectionPoolManager
的构造方法签名如下:
public C3P0PooledConnectionPoolManager(ConnectionPoolDataSource cpds,
Map flatPropertyOverrides, Map forceUserOverrides,
int num_task_threads, String parentDataSourceIdentityToken,
String parentDataSourceName) throws SQLException;
但是在ComboPooledDataSource
创建C3P0PooledConnectionPoolManager
的过程中,flatPropertyOverrides
和forceUserOverrides
都是null
,所以为了看起来更清晰,下面的构造方法是简化过的版本:
public C3P0PooledConnectionPoolManager(ConnectionPoolDataSource cpds,
Map flatPropertyOverrides, Map forceUserOverrides,
int num_task_threads, String parentDataSourceIdentityToken,
String parentDataSourceName) throws SQLException {
try {
this.cpds = cpds;
this.flatPropertyOverrides = flatPropertyOverrides;
this.num_task_threads = num_task_threads;
this.parentDataSourceIdentityToken = parentDataSourceIdentityToken;
this.parentDataSourceName = parentDataSourceName;
DbAuth auth = null;
if (auth == null)
auth = C3P0ImplUtils.findAuth(cpds);
this.defaultAuth = auth;
Map tmp = new HashMap();
BeanInfo bi = Introspector.getBeanInfo(cpds.getClass());
PropertyDescriptor[] pds = bi.getPropertyDescriptors();
PropertyDescriptor pd = null;
for (int i = 0, len = pds.length; i < len; ++i) {
pd = pds[i];
String name = pd.getName();
Method m = pd.getReadMethod();
if (m != null)
tmp.put(name, m);
}
this.propNamesToReadMethods = tmp;
if (forceUserOverrides == null) {
Method uom = (Method) propNamesToReadMethods
.get("userOverridesAsString");
if (uom != null) {
String uoas = (String) uom.invoke(cpds, (Object[]) null);
Map uo = C3P0ImplUtils.parseUserOverridesAsString(uoas);
this.userOverrides = uo;
} else
this.userOverrides = Collections.EMPTY_MAP;
} else
this.userOverrides = forceUserOverrides;
poolsInit();
} catch (Exception e) {
if (Debug.DEBUG)
logger.log(MLevel.FINE, null, e);
// e.printStackTrace();
throw SqlUtils.toSQLException(e);
}
}
不要被一大堆代码给吓到了,其实这个构造方法很简单,除poolsInit();
这一句之外,其他的代码都在做一件事——那就是重写一些配置而已。因此我们跟踪进poolsInit();
:
private void poolsInit() {
// 核心就这么一句,其余有些是防止热部署时内存泄露的
maybePrivilegedPoolsInit(privilege_spawned_threads);
}
// 该方法也经过简化
private void maybePrivilegedPoolsInit(final boolean privilege_spawned_threads){
_poolsInit();
}
private synchronized void _poolsInit() {
String idStr = idString();
this.timer = new Timer(idStr + "-AdminTaskTimer", true);
int matt = this.getMaxAdministrativeTaskTime();
this.taskRunner = createTaskRunner(num_task_threads, matt, timer, idStr
+ "-HelperThread");
int num_deferred_close_threads = this
.getStatementCacheNumDeferredCloseThreads();
if (num_deferred_close_threads > 0)
this.deferredStatementDestroyer = createTaskRunner(
num_deferred_close_threads, matt, timer, idStr
+ "-DeferredStatementDestroyerThread");
else
this.deferredStatementDestroyer = null;
// 写死的为fasle
if (POOL_EVENT_SUPPORT)
this.rpfact = ResourcePoolFactory.createInstance(taskRunner, null,
timer);
else
this.rpfact = BasicResourcePoolFactory
.createNoEventSupportInstance(taskRunner, timer);
this.authsToPools = new HashMap();
}
可以看到上面最核心的方法就是_poolsInit()
,我们梳理一下它做了哪些事呢:
- 创建定时器
AdminTaskTimer
- 创建线程池,其中包含
num_task_threads
个HelperThread
- 可选项,缓存的statement过期销毁任务,这个只要在配置了
num_deferred_close_threads
选项后才会生效 - 创建
BasicResourcePoolFactory
对象 - 创建缓存
C3P0CollectionPool
的Map
C3P0PooledConnectionPool创建过程
到这里,C3P0PooledConnectionPoolManager
的创建工作就完毕了,那么下一步就是通过C3P0PooledConnectionPoolManager
来创建C3P0PooledConnectionPool
了。
private C3P0PooledConnectionPool createPooledConnectionPool(DbAuth auth)
throws SQLException {
String userName = auth.getUser();
String automaticTestTable = getAutomaticTestTable(userName);
String realTestQuery;
if (automaticTestTable != null) {
realTestQuery = initializeAutomaticTestTable(automaticTestTable,
auth);
if (this.getPreferredTestQuery(userName) != null) {
if (logger.isLoggable(MLevel.WARNING)) {
logger.logp(
MLevel.WARNING,
C3P0PooledConnectionPoolManager.class.getName(),
"createPooledConnectionPool",
"[c3p0] Both automaticTestTable and preferredTestQuery have been set! "
+ "Using automaticTestTable, and ignoring preferredTestQuery. Real test query is ‘‘{0}‘‘.",
realTestQuery);
}
}
} else {
if (!defaultAuth.equals(auth))
ensureFirstConnectionAcquisition(auth);
realTestQuery = this.getPreferredTestQuery(userName);
}
C3P0PooledConnectionPool out = new C3P0PooledConnectionPool(cpds, auth,
this.getMinPoolSize(userName), this.getMaxPoolSize(userName),
this.getInitialPoolSize(userName),
this.getAcquireIncrement(userName),
this.getAcquireRetryAttempts(userName),
this.getAcquireRetryDelay(userName),
this.getBreakAfterAcquireFailure(userName),
this.getCheckoutTimeout(userName),
this.getIdleConnectionTestPeriod(userName),
this.getMaxIdleTime(userName),
this.getMaxIdleTimeExcessConnections(userName),
this.getMaxConnectionAge(userName),
this.getPropertyCycle(userName),
this.getUnreturnedConnectionTimeout(userName),
this.getDebugUnreturnedConnectionStackTraces(userName),
this.getForceSynchronousCheckins(userName),
this.getTestConnectionOnCheckout(userName),
this.getTestConnectionOnCheckin(userName),
this.getMaxStatements(userName),
this.getMaxStatementsPerConnection(userName),
this.getConnectionTester(userName),
this.getConnectionCustomizer(userName), realTestQuery, rpfact,
taskRunner, deferredStatementDestroyer,
parentDataSourceIdentityToken);
return out;
}
上面的过程也很简单,如果在配置连接池时配置了automaticTestTable
参数,那么会在创建C3P0PooledConnectionPool
之前先检测是否存在该表且为空表,如果没有则创建一张测试表,并且这个表是一张只包含一列的空表。这个表的目的就是为了之后检测连接有效性用的,检测过程就是用SELECT * FROM XXX
,XXX为你的automaticTestTable
参数值。
然后就开始调用构造方法创建C3P0PooledConnectionPool
了,并且传入了一个BasicResourcePoolFactory
用于让C3P0PooledConnectionPool
创建BasicResourcePool
:
// 这里剔除了PooledConnectionResourcePoolManager的部分
C3P0PooledConnectionPool( final ConnectionPoolDataSource cpds,
final DbAuth auth,
int min,
int max,
int start,
int inc,
int acq_retry_attempts,
int acq_retry_delay,
boolean break_after_acq_failure,
int checkoutTimeout, //milliseconds
int idleConnectionTestPeriod, //seconds
int maxIdleTime, //seconds
int maxIdleTimeExcessConnections, //seconds
int maxConnectionAge, //seconds
int propertyCycle, //seconds
int unreturnedConnectionTimeout, //seconds
boolean debugUnreturnedConnectionStackTraces,
boolean forceSynchronousCheckins,
final boolean testConnectionOnCheckout,
final boolean testConnectionOnCheckin,
int maxStatements,
int maxStatementsPerConnection,
/* boolean statementCacheDeferredClose, */
final ConnectionTester connectionTester,
final ConnectionCustomizer connectionCustomizer,
final String testQuery,
final ResourcePoolFactory fact,
ThreadPoolAsynchronousRunner taskRunner,
ThreadPoolAsynchronousRunner deferredStatementDestroyer,
final String parentDataSourceIdentityToken) throws SQLException
{
try
{
// 这个scache是用来缓存statement的,这里先不介绍
if (maxStatements > 0 && maxStatementsPerConnection > 0)
this.scache = new DoubleMaxStatementCache( taskRunner, deferredStatementDestroyer, maxStatements, maxStatementsPerConnection );
else if (maxStatementsPerConnection > 0)
this.scache = new PerConnectionMaxOnlyStatementCache( taskRunner, deferredStatementDestroyer, maxStatementsPerConnection );
else if (maxStatements > 0)
this.scache = new GlobalMaxOnlyStatementCache( taskRunner, deferredStatementDestroyer, maxStatements );
else
this.scache = null;
// 用来检测连接有效性的Tester
this.connectionTester = connectionTester;
// 获取连接时的最长等待时间
this.checkoutTimeout = checkoutTimeout;
// 执行任务的线程池
this.sharedTaskRunner = taskRunner;
// 销毁过期的statement的线程
this.deferredStatementDestroyer = deferredStatementDestroyer;
// 这个写死为true
this.c3p0PooledConnections = (cpds instanceof WrapperConnectionPoolDataSource);
this.effectiveStatementCache = c3p0PooledConnections && (scache != null);
this.inUseLockFetcher = (c3p0PooledConnections ? C3P0_POOLED_CONNECION_NESTED_LOCK_LOCK_FETCHER : RESOURCE_ITSELF_IN_USE_LOCK_FETCHER);
class PooledConnectionResourcePoolManager implements ResourcePool.Manager
{
// 省略....
}
ResourcePool.Manager manager = new PooledConnectionResourcePoolManager();
synchronized (fact)
{
fact.setMin( min );
fact.setMax( max );
fact.setStart( start );
fact.setIncrement( inc );
fact.setIdleResourceTestPeriod( idleConnectionTestPeriod * 1000);
fact.setResourceMaxIdleTime( maxIdleTime * 1000 );
fact.setExcessResourceMaxIdleTime( maxIdleTimeExcessConnections * 1000 );
fact.setResourceMaxAge( maxConnectionAge * 1000 );
fact.setExpirationEnforcementDelay( propertyCycle * 1000 );
fact.setDestroyOverdueResourceTime( unreturnedConnectionTimeout * 1000 );
fact.setDebugStoreCheckoutStackTrace( debugUnreturnedConnectionStackTraces );
fact.setForceSynchronousCheckins( forceSynchronousCheckins );
fact.setAcquisitionRetryAttempts( acq_retry_attempts );
fact.setAcquisitionRetryDelay( acq_retry_delay );
fact.setBreakOnAcquisitionFailure( break_after_acq_failure );
// 用一个ResourcePoolFactory创建出了一个ResourcePool
rp = fact.createPool( manager );
}
}
catch (ResourcePoolException e)
{ throw SqlUtils.toSQLException(e); }
}
可以看到,在C3P0PooledConnectionPool
被创建的过程中,还同时使用ResourcePoolFactory
创建了一个ResourcePool
,那么这个ResourcePool
是用来干什么的呢?
首先我们知道,一个C3P0PooledConnectionPool
对象就对应着一个数据库连接池。所以它具有各种连接池应该有的属性和方法,比如:
public PooledConnection checkoutPooledConnection() throws SQLException;
public void checkinPooledConnection(PooledConnection pcon) throws SQLException
我们就从这两个最基本的连接池方法的源码入手:
public PooledConnection checkoutPooledConnection() throws SQLException {
try {
PooledConnection pc = (PooledConnection) this
.checkoutAndMarkConnectionInUse();
pc.addConnectionEventListener(cl);
return pc;
} catch (TimeoutException e) {
throw SqlUtils
.toSQLException(
"An attempt by a client to checkout a Connection has timed out.",
e);
} catch (CannotAcquireResourceException e) {
throw SqlUtils
.toSQLException(
"Connections could not be acquired from the underlying database!",
"08001", e);
} catch (Exception e) {
throw SqlUtils.toSQLException(e);
}
}
private Object checkoutAndMarkConnectionInUse() throws TimeoutException,
CannotAcquireResourceException, ResourcePoolException,
InterruptedException {
Object out = null;
boolean success = false;
while (!success) {
try {
// 这句就是取出连接的核心所在
out = rp.checkoutResource(checkoutTimeout);
if (out instanceof AbstractC3P0PooledConnection) {
// cast should succeed, because effectiveStatementCache
// implies c3p0 pooled Connections
AbstractC3P0PooledConnection acpc = (AbstractC3P0PooledConnection) out;
Connection physicalConnection = acpc
.getPhysicalConnection();
success = tryMarkPhysicalConnectionInUse(physicalConnection);
} else
success = true; // we don‘t pool statements from non-c3p0
// PooledConnections
} finally {
try {
if (!success && out != null)
rp.checkinResource(out);
} catch (Exception e) {
logger
.log(
MLevel.WARNING,
"Failed to check in a Connection that was unusable due to pending Statement closes.",
e);
}
}
}
return out;
}
public void checkinPooledConnection(PooledConnection pcon) throws SQLException
{
//System.err.println(this + " -- CHECKIN");
try
{
pcon.removeConnectionEventListener( cl );
unmarkConnectionInUseAndCheckin( pcon );
}
catch (ResourcePoolException e)
{ throw SqlUtils.toSQLException(e); }
}
private void unmarkConnectionInUseAndCheckin(PooledConnection pcon) throws ResourcePoolException
{
if (effectiveStatementCache)
{
try
{
// cast should generally succeed, because effectiveStatementCache implies c3p0 pooled Connections
// but clients can try to check-in whatever they want, so there are potential failures here
AbstractC3P0PooledConnection acpc = (AbstractC3P0PooledConnection) pcon;
Connection physicalConnection = acpc.getPhysicalConnection();
unmarkPhysicalConnectionInUse(physicalConnection);
}
catch (ClassCastException e)
{
if (logger.isLoggable(MLevel.SEVERE))
logger.log(MLevel.SEVERE,
"You are checking a non-c3p0 PooledConnection implementation into" +
"a c3p0 PooledConnectionPool instance that expects only c3p0-generated PooledConnections." +
"This isn‘t good, and may indicate a c3p0 bug, or an unusual (and unspported) use " +
"of the c3p0 library.", e);
}
}
// 这句就是放回连接的核心所在
rp.checkinResource(pcon);
}
我们发现,其实所有关于连接池中的操作其实都是委托给了BasicResourcePool
这个类来完成,并且这个类还有一个帮手,叫做BasicResourcePoolManager
,实现了ResourcePool.Manager
接口。当然这两个类在连接池初始化过程中也非常重要,详细讲起来可能又会占用大量篇幅,所以这篇文章先到这里。关于这两个类我们下一篇博客再做分析
版权声明:本文为博主原创文章,未经博主允许不得转载。