本文章比较枯燥,源码居多。都是本人一步一步debug出来的,如果有问题欢迎指出。为了体现流程连贯性,所以由很多无用步骤。读者可以一边看一边debug。如果简单可以自行略过。
在前面的章节中我们已经知道mybatis在初始化过程。(org.mybatis.spring.SqlSessionFactoryBean的afterPropertiesSet())
在初始化mybatis的时候会将所有配置封装到Configuration类中,由JVM加载到内存中。这样做的好处是内存级操作是最快的,无需重复读取配置文件信息。Configuration类几乎包含所有的mytatis配置信息。sqlSessionFactoryBuilder也将会以此作为参数,创建SqlSessionFactory(实现类DefaultSqlSessionFactory)。
在创建SqlSessionFactory时我们可能记得有这样一段代码:
public void afterPropertiesSet() throws Exception { notNull(dataSource, "Property ‘dataSource‘ is required"); notNull(sqlSessionFactoryBuilder, "Property ‘sqlSessionFactoryBuilder‘ is required"); state((configuration == null && configLocation == null) || !(configuration != null && configLocation != null), "Property ‘configuration‘ and ‘configLocation‘ can not specified with together"); this.sqlSessionFactory = buildSqlSessionFactory(); } protected SqlSessionFactory buildSqlSessionFactory() throws IOException { Configuration configuration; XMLConfigBuilder xmlConfigBuilder = null; if (this.configuration != null) { configuration = this.configuration; if (configuration.getVariables() == null) { configuration.setVariables(this.configurationProperties); } else if (this.configurationProperties != null) { configuration.getVariables().putAll(this.configurationProperties); } } else if (this.configLocation != null) { xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(), null, this.configurationProperties); configuration = xmlConfigBuilder.getConfiguration(); } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Property `configuration` or ‘configLocation‘ not specified, using default MyBatis Configuration"); } configuration = new Configuration(); configuration.setVariables(this.configurationProperties); } ... ... //当我们不指定transactionFactory时会创建SpringManagedTransactionFactory。这个是mybatis由spring管理事务的桥梁 if (this.transactionFactory == null) { this.transactionFactory = new SpringManagedTransactionFactory(); } configuration.setEnvironment(new Environment(this.environment, this.transactionFactory, this.dataSource)); |
在不指定transactionFactory的时候会创建默认SpringManagedTransactionFactory。SpringManagedTransactionFactory用于创建Transaction。Transaction是ibatis底层的一个数据源链接connection的包装类。管理数据库连接的声明周期,包括creation, preparation, commit/rollback and close。mytatis将此Transaction作为参数,创建sql执行的Executor。(mytatis的sql执行者是Executor)
/** * Wraps a database connection. * Handles the connection lifecycle that comprises: its creation, preparation, commit/rollback and close. * * @author Clinton Begin */ public interface Transaction { /** * Retrieve inner database connection * @return DataBase connection * @throws SQLException */ Connection getConnection() throws SQLException; |
那么SpringManagedTransactionFactory在项目中是如何流转的,又是如何和spring进行沟通的呢?接着往下走。
在mytatis中是通过sqlSession开启会话的,它相当于一个connection链接。我们既可以通过sqlSession直接查询也可以通过sqlSession获取映射器,通过动态代理方式查询。
无论哪种方式,总归都要获取sqlSession。在上面初始化的过程中,我们知到创建的是DefaultSqlSessionFactory。进入源码看一看有啥
public class DefaultSqlSessionFactory implements SqlSessionFactory { private final Configuration configuration; public DefaultSqlSessionFactory(Configuration configuration) { this.configuration = configuration; } @Override public SqlSession openSession() { return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false); } @Override public SqlSession openSession(ExecutorType execType) { return openSessionFromDataSource(execType, null, false); } 省略略干方法... ... @Override public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) { return openSessionFromDataSource(execType, level, false); } @Override public SqlSession openSession(ExecutorType execType, boolean autoCommit) { return openSessionFromDataSource(execType, null, autoCommit); } private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment();//1 final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);//2 tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);//3 final Executor executor = configuration.newExecutor(tx, execType);//4 return new DefaultSqlSession(configuration, executor, autoCommit);//5 } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } } |
我们可以看见这个类中提供了茫茫多openSession的方法。通过数据源创建session最终调用的都是openSessionFromDataSource。这个方法很简单。
1、通过全局configuration获取当前环境
2、再通过环境拿到当初创建的SpringManagedTransactionFactory。
3、再通过SpringManagedTransactionFactory创建Transaction(SpringManagedTransaction)
4、创建sql执行器Executor(SimpleExecutor)
5、创建sqlsession会话(DefaultSqlSession)。
在会话创建完成后,就是进行具体的数据库操作,获取链接,查询,映射器解析结果,返回等操作。在DefaultSqlSession我们可以看到很多查询方法,具体用哪个mytatis经过解析自己选择,我们不做分析。拿一个案例getById做一下简单分析。(值得注意的是mybatis并没有在创建session会话时就建立数据库连接的)
当一个查询getById进入系统后,经过前面的初始化,创建会话,参数解析等步骤后进入DefaultSqlSession的public <T> T selectOne(String statement, Object parameter)方法
public class DefaultSqlSession implements SqlSession { @Override public <T> T selectOne(String statement) { return this.<T>selectOne(statement, null); } //1 @Override public <T> T selectOne(String statement, Object parameter) { // Popular vote was to return null on 0 results and throw exception on too many. List<T> list = this.<T>selectList(statement, parameter); if (list.size() == 1) { return list.get(0); } else if (list.size() > 1) { throw new TooManyResultsException("Expected one result (or null) to be returned by selectOne(), but found: " + list.size()); } else { return null; } } } |
public class DefaultSqlSession implements SqlSession { //2 @Override public <E> List<E> selectList(String statement, Object parameter) { return this.selectList(statement, parameter, RowBounds.DEFAULT); } //3 @Override public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { try { MappedStatement ms = configuration.getMappedStatement(statement); //executor执行器具体执行sql return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); } catch (Exception e) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } } |
public class CachingExecutor implements Executor { //4 @Override public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameterObject); CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql); return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); } //5 @Override public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { Cache cache = ms.getCache(); //第一次是没有缓存的走下边的 if (cache != null) { flushCacheIfRequired(ms); if (ms.isUseCache() && resultHandler == null) { ensureNoOutParams(ms, boundSql); @SuppressWarnings("unchecked") List<E> list = (List<E>) tcm.getObject(cache, key); if (list == null) { list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); tcm.putObject(cache, key, list); // issue #578 and #116 } return list; } } return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); } } |
public abstract class BaseExecutor implements Executor { //6 @SuppressWarnings("unchecked") @Override public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId()); if (closed) { throw new ExecutorException("Executor was closed."); } if (queryStack == 0 && ms.isFlushCacheRequired()) { clearLocalCache(); } List<E> list; try { queryStack++; list = resultHandler == null ? (List<E>) localCache.getObject(key) : null; if (list != null) { handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); } else { list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql); } } finally { queryStack--; } if (queryStack == 0) { for (DeferredLoad deferredLoad : deferredLoads) { deferredLoad.load(); } // issue #601 deferredLoads.clear(); if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) { // issue #482 clearLocalCache(); } } return list; } //7 private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; } } |
public class SimpleExecutor extends BaseExecutor { //8 @Override public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { Statement stmt = null; try { Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql); stmt = prepareStatement(handler, ms.getStatementLog()); return handler.<E>query(stmt, resultHandler); } finally { closeStatement(stmt); } } //9 private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Statement stmt; //获取数据库连接 Connection connection = getConnection(statementLog); stmt = handler.prepare(connection, transaction.getTimeout()); handler.parameterize(stmt); return stmt; } } |
在执行器SimpleExecutor中我们看到一个getConnection的方法。这是具体获取数据源连接的过程。我们记得在开启会话的时候第三步是创建执行器,参数是SpringManagedTransaction和执行器类型simple。所以继续往下跟,看SpringManagedTransaction如何拿到数据库连接的
public abstract class BaseExecutor implements Executor { //10 protected Connection getConnection(Log statementLog) throws SQLException { //transaction是一个接口,创建会话时指定的是SpringManagedTransaction Connection connection = transaction.getConnection(); if (statementLog.isDebugEnabled()) { return ConnectionLogger.newInstance(connection, statementLog, queryStack); } else { return connection; } } } |
public class SpringManagedTransaction implements Transaction { @Override public Connection getConnection() throws SQLException { //如果Transaction包装类中有数据源就返回,没有就创建 if (this.connection == null) { openConnection(); } return this.connection; } private void openConnection() throws SQLException { //创建数据源,并赋值到包装类中(装饰者) this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); if (LOGGER.isDebugEnabled()) { LOGGER.debug( "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } } } |
public abstract class DataSourceUtils { public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException { try { return doGetConnection(dataSource); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex); } catch (IllegalStateException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage()); } } public static Connection doGetConnection(DataSource dataSource) throws SQLException { Assert.notNull(dataSource, "No DataSource specified"); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(fetchConnection(dataSource)); } return conHolder.getConnection(); } // Else we either got no holder or an empty thread-bound holder here. logger.debug("Fetching JDBC Connection from DataSource"); Connection con = fetchConnection(dataSource); if (TransactionSynchronizationManager.isSynchronizationActive()) { logger.debug("Registering transaction synchronization for JDBC Connection"); // Use same Connection for further JDBC actions within the transaction. // Thread-bound object will get removed by synchronization at transaction completion. ConnectionHolder holderToUse = conHolder; if (holderToUse == null) { holderToUse = new ConnectionHolder(con); } else { holderToUse.setConnection(con); } holderToUse.requested(); TransactionSynchronizationManager.registerSynchronization( new ConnectionSynchronization(holderToUse, dataSource)); holderToUse.setSynchronizedWithTransaction(true); if (holderToUse != conHolder) { TransactionSynchronizationManager.bindResource(dataSource, holderToUse); } } return con; } private static Connection fetchConnection(DataSource dataSource) throws SQLException { //使用指定的dataSource创建链接。具体步骤需要看使用的是什么线程池技术。如果是druid就看druid源码。如果是c3p0就看c3p0的源码。 Connection con = dataSource.getConnection(); if (con == null) { throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource); } return con; } } |
在dataSource.getConnection()中具体需要看用的什么线程池技术(一般都会用到线程池)。我们自己的项目使用的是druid连接池。由于流量大所以加入了主从复制多数据源,再继承spring的AbstractRoutingDataSource作为包装。使用Aspect动态切换数据源。所以我们看我项目自己的包装类即可。配置和源码如下:
//数据源包装类 public class MultipleDataSource extends AbstractRoutingDataSource { private static final ThreadLocal<String> dataSourceKey = new InheritableThreadLocal<>(); public static void setDataSourceKey(String dataSource) { dataSourceKey.set(dataSource); } //注意这个重写的方法。它是后面从数据源Map中获取key的方法 @Override protected Object determineCurrentLookupKey() { return dataSourceKey.get(); } } |
AbstractRoutingDataSource源码如下。AbstractRoutingDataSource实现了InitializingBean接口,spring在完成对象创建和成员变量赋值后,会调用afterPropertiesSet()方法。这一点和druid数据源创建很类似。在这个方法中会做一些校验和其他成员变量的赋值工作。将所有的数据源以key=value的方式放到一个Map中。
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean { @Nullable private Map<Object, Object> targetDataSources; @Nullable private Object defaultTargetDataSource; @Nullable private Map<Object, DataSource> resolvedDataSources; @Nullable private DataSource resolvedDefaultDataSource; @Override public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property ‘targetDataSources‘ is required"); } this.resolvedDataSources = new HashMap<>(this.targetDataSources.size()); this.targetDataSources.forEach((key, value) -> { Object lookupKey = resolveSpecifiedLookupKey(key); DataSource dataSource = resolveSpecifiedDataSource(value); this.resolvedDataSources.put(lookupKey, dataSource); }); if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } } @Override public Connection getConnection() throws SQLException { return determineTargetDataSource().getConnection(); } protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; } @Nullable protected abstract Object determineCurrentLookupKey(); } |
回到上面的问题我们看我项目自己的包装类MultipleDataSource中并没有getConnection() 方法,调用的是父类AbstractRoutingDataSource的getConnection()。接着调用determineTargetDataSource(),再次方法中调用determineCurrentLookupKey();这个方法是在包装类中重写的方法。它的返回值是从ThreadLocal副本中获取。在每个线程链接mysql之前,都会用Aspect动态在ThreadLocal副本保存一个数据源key,当getConnection() 时会根据这个key,从数据源Map(afterPropertiesSet()方法设置的)中get出来对应的数据源,这样就能动态的从spring中获取指定的数据源。
原文地址:https://www.cnblogs.com/Houz/p/11957908.html