public void afterPropertiesSet() throws Exception { notNull(dataSource, "Property ‘dataSource‘ is required"); notNull(sqlSessionFactoryBuilder, "Property ‘sqlSessionFactoryBuilder‘ is required"); state((configuration == null && configLocation == null) || !(configuration != null && configLocation != null), "Property ‘configuration‘ and ‘configLocation‘ can not specified with together"); this.sqlSessionFactory = buildSqlSessionFactory(); } protected SqlSessionFactory buildSqlSessionFactory() throws IOException { Configuration configuration; XMLConfigBuilder xmlConfigBuilder = null; if (this.configuration != null) { configuration = this.configuration; if (configuration.getVariables() == null) { configuration.setVariables(this.configurationProperties); } else if (this.configurationProperties != null) { configuration.getVariables().putAll(this.configurationProperties); } } else if (this.configLocation != null) { xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(), null, this.configurationProperties); configuration = xmlConfigBuilder.getConfiguration(); } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Property `configuration` or ‘configLocation‘ not specified, using default MyBatis Configuration"); } configuration = new Configuration(); configuration.setVariables(this.configurationProperties); } ... ... //当我们不指定transactionFactory时会创建SpringManagedTransactionFactory。这个是mybatis由spring管理事务的桥梁 if (this.transactionFactory == null) { this.transactionFactory = new SpringManagedTransactionFactory(); } configuration.setEnvironment(new Environment(this.environment, this.transactionFactory, this.dataSource)); |
在不指定transactionFactory的时候会创建默认SpringManagedTransactionFactory。SpringManagedTransactionFactory用于创建Transaction。Transaction是ibatis底层的一个数据源链接connection的包装类。管理数据库连接的声明周期,包括creation, preparation, commit/rollback and close。mytatis将此Transaction作为参数,创建sql执行的Executor。(mytatis的sql执行者是Executor)
/** * Wraps a database connection. * Handles the connection lifecycle that comprises: its creation, preparation, commit/rollback and close. * * @author Clinton Begin */ public interface Transaction { /** * Retrieve inner database connection * @return DataBase connection * @throws SQLException */ Connection getConnection() throws SQLException; |
public class DefaultSqlSessionFactory implements SqlSessionFactory { private final Configuration configuration; public DefaultSqlSessionFactory(Configuration configuration) { this.configuration = configuration; } @Override public SqlSession openSession() { return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false); } @Override public SqlSession openSession(ExecutorType execType) { return openSessionFromDataSource(execType, null, false); } 省略略干方法... ... @Override public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) { return openSessionFromDataSource(execType, level, false); } @Override public SqlSession openSession(ExecutorType execType, boolean autoCommit) { return openSessionFromDataSource(execType, null, autoCommit); } private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment();//1 final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);//2 tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);//3 final Executor executor = configuration.newExecutor(tx, execType);//4 return new DefaultSqlSession(configuration, executor, autoCommit);//5 } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } } |
当一个查询getById进入系统后,经过前面的初始化,创建会话,参数解析等步骤后进入DefaultSqlSession的public <T> T selectOne(String statement, Object parameter)方法
public class DefaultSqlSession implements SqlSession { @Override public <T> T selectOne(String statement) { return this.<T>selectOne(statement, null); } //1 @Override public <T> T selectOne(String statement, Object parameter) { // Popular vote was to return null on 0 results and throw exception on too many. List<T> list = this.<T>selectList(statement, parameter); if (list.size() == 1) { return list.get(0); } else if (list.size() > 1) { throw new TooManyResultsException("Expected one result (or null) to be returned by selectOne(), but found: " + list.size()); } else { return null; } } } |
public class DefaultSqlSession implements SqlSession { //2 @Override public <E> List<E> selectList(String statement, Object parameter) { return this.selectList(statement, parameter, RowBounds.DEFAULT); } //3 @Override public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { try { MappedStatement ms = configuration.getMappedStatement(statement); //executor执行器具体执行sql return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); } catch (Exception e) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } } |
public class CachingExecutor implements Executor { //4 @Override public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameterObject); CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql); return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); } //5 @Override public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { Cache cache = ms.getCache(); //第一次是没有缓存的走下边的 if (cache != null) { flushCacheIfRequired(ms); if (ms.isUseCache() && resultHandler == null) { ensureNoOutParams(ms, boundSql); @SuppressWarnings("unchecked") List<E> list = (List<E>) tcm.getObject(cache, key); if (list == null) { list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); tcm.putObject(cache, key, list); // issue #578 and #116 } return list; } } return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); } } |
public abstract class BaseExecutor implements Executor { //6 @SuppressWarnings("unchecked") @Override public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId()); if (closed) { throw new ExecutorException("Executor was closed."); } if (queryStack == 0 && ms.isFlushCacheRequired()) { clearLocalCache(); } List<E> list; try { queryStack++; list = resultHandler == null ? (List<E>) localCache.getObject(key) : null; if (list != null) { handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); } else { list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql); } } finally { queryStack--; } if (queryStack == 0) { for (DeferredLoad deferredLoad : deferredLoads) { deferredLoad.load(); } // issue #601 deferredLoads.clear(); if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) { // issue #482 clearLocalCache(); } } return list; } //7 private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; } } |
public class SimpleExecutor extends BaseExecutor { //8 @Override public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { Statement stmt = null; try { Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql); stmt = prepareStatement(handler, ms.getStatementLog()); return handler.<E>query(stmt, resultHandler); } finally { closeStatement(stmt); } } //9 private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Statement stmt; //获取数据库连接 Connection connection = getConnection(statementLog); stmt = handler.prepare(connection, transaction.getTimeout()); handler.parameterize(stmt); return stmt; } } |
public abstract class BaseExecutor implements Executor { //10 protected Connection getConnection(Log statementLog) throws SQLException { //transaction是一个接口,创建会话时指定的是SpringManagedTransaction Connection connection = transaction.getConnection(); if (statementLog.isDebugEnabled()) { return ConnectionLogger.newInstance(connection, statementLog, queryStack); } else { return connection; } } } |
public class SpringManagedTransaction implements Transaction { @Override public Connection getConnection() throws SQLException { //如果Transaction包装类中有数据源就返回,没有就创建 if (this.connection == null) { openConnection(); } return this.connection; } private void openConnection() throws SQLException { //创建数据源,并赋值到包装类中(装饰者) this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); if (LOGGER.isDebugEnabled()) { LOGGER.debug( "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } } } |
public abstract class DataSourceUtils { public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException { try { return doGetConnection(dataSource); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex); } catch (IllegalStateException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage()); } } public static Connection doGetConnection(DataSource dataSource) throws SQLException { Assert.notNull(dataSource, "No DataSource specified"); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(fetchConnection(dataSource)); } return conHolder.getConnection(); } // Else we either got no holder or an empty thread-bound holder here. logger.debug("Fetching JDBC Connection from DataSource"); Connection con = fetchConnection(dataSource); if (TransactionSynchronizationManager.isSynchronizationActive()) { logger.debug("Registering transaction synchronization for JDBC Connection"); // Use same Connection for further JDBC actions within the transaction. // Thread-bound object will get removed by synchronization at transaction completion. ConnectionHolder holderToUse = conHolder; if (holderToUse == null) { holderToUse = new ConnectionHolder(con); } else { holderToUse.setConnection(con); } holderToUse.requested(); TransactionSynchronizationManager.registerSynchronization( new ConnectionSynchronization(holderToUse, dataSource)); holderToUse.setSynchronizedWithTransaction(true); if (holderToUse != conHolder) { TransactionSynchronizationManager.bindResource(dataSource, holderToUse); } } return con; } private static Connection fetchConnection(DataSource dataSource) throws SQLException { //使用指定的dataSource创建链接。具体步骤需要看使用的是什么线程池技术。如果是druid就看druid源码。如果是c3p0就看c3p0的源码。 Connection con = dataSource.getConnection(); if (con == null) { throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource); } return con; } } |
//数据源包装类 public class MultipleDataSource extends AbstractRoutingDataSource { private static final ThreadLocal<String> dataSourceKey = new InheritableThreadLocal<>(); public static void setDataSourceKey(String dataSource) { dataSourceKey.set(dataSource); } //注意这个重写的方法。它是后面从数据源Map中获取key的方法 @Override protected Object determineCurrentLookupKey() { return dataSourceKey.get(); } } |
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean { @Nullable private Map<Object, Object> targetDataSources; @Nullable private Object defaultTargetDataSource; @Nullable private Map<Object, DataSource> resolvedDataSources; @Nullable private DataSource resolvedDefaultDataSource; @Override public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property ‘targetDataSources‘ is required"); } this.resolvedDataSources = new HashMap<>(this.targetDataSources.size()); this.targetDataSources.forEach((key, value) -> { Object lookupKey = resolveSpecifiedLookupKey(key); DataSource dataSource = resolveSpecifiedDataSource(value); this.resolvedDataSources.put(lookupKey, dataSource); }); if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } } @Override public Connection getConnection() throws SQLException { return determineTargetDataSource().getConnection(); } protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; } @Nullable protected abstract Object determineCurrentLookupKey(); } |
回到上面的问题我们看我项目自己的包装类MultipleDataSource中并没有getConnection() 方法,调用的是父类AbstractRoutingDataSource的getConnection()。接着调用determineTargetDataSource(),再次方法中调用determineCurrentLookupKey();这个方法是在包装类中重写的方法。它的返回值是从ThreadLocal副本中获取。在每个线程链接mysql之前,都会用Aspect动态在ThreadLocal副本保存一个数据源key,当getConnection() 时会根据这个key,从数据源Map(afterPropertiesSet()方法设置的)中get出来对应的数据源,这样就能动态的从spring中获取指定的数据源。