在Spring与Hibernate整合的时候,可以利用Spring的事务管理机制,为我们管理事务的开启、提交、回滚等操作。这样的方式极大的减少了我们的代码量,让我们只专注于业务代码的编写。在使用Hibernate的时候,每一个操作都要经历事务开启与提交这样的操作,他们在业务代码的周围,这样来看是不是就想到了AOP编程,把这部分代码抽取出来。没错,Spring正是这样做的,Spring的事务管理就是基于AOP的。
1 Spring的事务隔离与传播
Srping的事务定义了五个隔离等级(isolation)与7个传播行为(propagation)。他们与我们了解的数据库事务有什么区别与联系呢?先介绍一下事务隔离与传播的概念
- 事务隔离:当前事务和其他事务的隔离成都。例如,这个事务能否看到其他事务未提交的数据等。(read-uncommitted)
- 事务传播:通常,在事务中执行的代码都会在当前事务中运行。但是,如果一个事务上下文已经存在,有几个选项可以指定该事务性方法的执行行为。如挂起、创建一个新的事务、嵌套执行等等。
REQUIRED | 要求在当前事务环境中执行该方法,如果已处于当前环境,直接调用,否则启动新的事务执行该方法 |
SUPPORTS | 如果当前执行线程处于事务环境中,则使用当前事务,否则不使用事务 |
MANDATORY | 要求调用该方法的线程必须处于事务环境中,否则抛出异常 |
REQUIRES_NEW | 该方法要求在新的事务环境中执行,如果当前线程已处于事务中,则先挂起该事务,启动新的事务;如果不处于事务中,则启动熄灯呢事务 |
NOT_SUPPORTED | 如果调用该方法的线程处于事务中,则先暂停该事务,然后执行该方法 |
NEVER | 不允许调用该方法的线程处于事务中,如果该线程处于事务中,抛出异常 |
NESTED | 如果已经处于事务中,启动新的事务并嵌套执行 |
这里面的描述视乎不太容易理解,在下面的源码分析中,就会看到他们是如何起作用的了。到时候再对照这些描述,就很容易理解了。
2 Spring事务的基本流程
Spring的事务过程与我们理解的AOP过程是很相似的,它大致分为两个阶段,图中蓝色区域表示的是事务的准备阶段,这个阶段主要完成了事务的准备工作,包括事务属性的读取,事务的创建等工作。这部分代码基本定义在AbstractPlatformTransactionManager类中,也就是说他的大部分代码是与平台无关的;第二个阶段是图中绿色区域,这里进入到事务的实施阶段,包括事务的开启、提交、回滚等操作。这部分代码对于不同的ORM框架来说是不相同的,所以他们都定义在各自的transactionManager中,这也是典型的策略模式,本文将以HibernateTransactionManager源码进行分析。
3 事务准备工作
3.1 事务从哪里开始?
先来看applicationContext中与事务有关的配置文档
<!-- 配置事务增强处理Bean,指定事务管理器 --> <tx:advice id="txAdvice" transaction-manager="transactionManager"> <!-- 用于配置详细的事务语义 --> <tx:attributes> <!-- 所有以'get'开头的方法是read-only的 --> <tx:method name="get*" read-only="true" /> <!-- 其他方法使用默认的事务设置 --> <tx:method name="*" propagation="REQUIRED" isolation="DEFAULT"/> </tx:attributes> </tx:advice> <bean id="test" class="com.songxu.entity.Log"></bean> <aop:config expose-proxy="true"> <!-- 只对业务逻辑层实施事务 --> <aop:pointcut id="txPointcut" expression="execution(* com.songxu.entity.*.*(..))" /> <!-- Advisor定义,切入点和通知分别为txPointcut、txAdvice --> <aop:advisor pointcut-ref="txPointcut" advice-ref="txAdvice" /> </aop:config>
笔者在分析源码之前,这个<tx:advice>标签是如何被包装成advisor的,这必须要看一看它的schema文件了,这里截取了一小段来看
看到这里是不是有点感觉了,这个<tx:advice>实际对应的是TransactionInterceptor这个类,它配置的属性最后都作为事务的属性注入到这个类中。找到这个类,也就找到了我们熟悉的invoke方法。对于XXXInterceptor这样的类,它一定是为代理对象服务的。但是我们似乎没有明确的指定出那个类或接口作为目标对象。笔者又翻看了log输出,在其中找到了答案。其实在我们指定<aop:pointcut>的时候,对应的表达式指定的就是那些类被作为代理类。我们通常在这里会指定一个包的范围,他们或是service层或是dao层。因为数据库事务通常需要在这里开启。当我们调用这些类里面的方法时候,通常也是在访问数据库的过程,这时就会执行invoke方法,进入了AOP事务增强的方法链,也就完成了事务的工作。
3.2 invoke
invoke方法是输入aop的入口,对于事务也不例外。从这里开始,就进入到了Spring 事务阶段。它实际调用的是TransactionAspectSupport类的invokeWithinTransaction方法
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... //这里实际调用TransactionAspectSupport类的方法 return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override // 注册一个回调方法 回调实际业务方法 public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
invokeWithinTransaction 方法
它实际上是整个事务过程的纲领性方法。所有的过程都在这里完成,这个过程也很清晰。首先读取了事务的配置属于,然后得到事务的处理器,获得一个事务存放在TransactionInfo里面(这里实际暗含了开启事务的操作,后面会介绍到),然后调用目标方法,然后根据实际情况提交或是回滚,最后释放掉TranscationInfo对象。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 读取事务的属性配置 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); //获得具体的事务处理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass); /** *这里区分不同类型的PlatformTransactionManager 因为他们的调用方式不同 对于CallbackPreferringPlatformTransactionManager来说,它需要以回调函数的方式实现事务的创建与提交 */ if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //获取事务,并把他们放到TransactionInfo中。 //这个TransactionInfo就是一个小的容器,里面包含了与事务有关的属性信息。在事务关闭的时候需要释放掉 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { //这里非常熟悉,通过方法链调用来执行目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 如果发生了异常,需要根据实际情况回滚或提交 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //释放掉TransactionInfo cleanupTransactionInfo(txInfo); } //进行事务提交 commitTransactionAfterReturning(txInfo); return retVal; } //采用回调方法使用事务处理器 else { // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); // Check result: It might indicate a Throwable to rethrow. if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } }
3.3 创建事务(createTransactionIfNecessary && getTransaction)
在事务开启之前,首先要有事务,这就是第一步创建或获取事务的过程。这个过程的入口是createTransactionIfNecessary 方法。
3.3.1 createTransactionIfNecessary
在这个方法中,可以看到两个重要的数据对象TransactionSatus和TransactionInfo的创建,这两个对象持有的数据是事务处理器对事务进行处理的重要依据,这两耳光对象的使用贯穿整个事务处理的过程。在这个方法的最后一行是对另外一个准备方法的调用,它构造了一个TransactionInfo对象,并把这个对象绑定到了当前线程中,同时在TransactionInfo对象中由一个变量来保存以前的TransactionInfo。这样就有了一连串的TransactionInfo,虽然不一定总是创建新的事务,但是一定会创建这样一个TransactionInfo对象。
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // 如果没有指定名字,使用方法特征作为事务名 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } // 这个TransactionStatus封装了事务执行的状态信息 TransactionStatus status = null; if (txAttr != null) { if (tm != null) { /* 这里使用定义好的属性信息创建事务 事务创建通过事务管理器来完成,同时返回状态信息 */ status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 构造一个TransactionInfo对象封装事务的信息,并把这个对象与线程绑定 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
3.3.2 getTransaction 获取事务
这个方法定义了获取事务的基本方法,在里面根据我们配置的不同,来创建或挂起事务,这些配置就是我们最开始提到的事务传播行为。
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); // 缓存log的debug开关,避免总是去检验 boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // 如果没有设置事务属性,那么使用默认的事务 definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) { // 如果当前线程已经存在一个事务,那么就按照存在的方式去处理 return handleExistingTransaction(definition, transaction, debugEnabled); } //当前线程不存在事务,就会创建一个新的事务 // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 如果指定了事务传播为PROPAGATION_MANDATORY 那么又没有当前事务,就会抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //如果指定了传播为PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 就创建一个新的事务 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //不需要挂起任何事务 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //创建与开始事务的实际调用,这是由具体的事务管理器来完成的,例如HibernateTransactionManager doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
对于创建事务,当然还存在一种情况就是当前线程已经存在了一个事务,那么就需要handleExistingTransaction这个方法去解决这一问题。
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { //如果设置为PROPAGATION_NEVER,又存在一个事务,就要抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } //如果设置为PROPAGATION_NOT_SUPPORTED,又存在一个事务,就将这个事务挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //这里的前两个参数为null和false,说明事务不需要放在事务环境中,同时挂起的事务也被保存在这里 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } //如果设置为PROPAGATION_REQUIRES_NEW ,则创建新的事务,把当前事务挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 这里保存了新的事务信息,同时也保存了挂起的事务信息 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } //创建嵌套的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
4 基于Hibernate的事务管理器
4.1事务的开启
在前面的getTransaction方法中有一个doBegin方法,它是事务开启的实际调用方法。这个方法也是由不同的平台去实现的,来看一看HibernateTransactionManager是如何实现这个方法的。
这个过程大致分为两个阶段,第一构造一个sessionHolder对象,这里面封装了HibernateSession 以及HibernateTransaction ,如果session或Transaction不存在,需要通过sessionFactory获得;第二个阶段就是开启transaction,即transaction.begin();同时把sessionHolder绑定到线程。
在这里有一个步骤是在设置flushmode,FlushMode是session的刷新模式,它指定了session在查询、flush或commit方法时的动作,设置为auto的时候,以前三个动作都会清理session缓存,如果设置为NEVER(MANUAL)时,只有在flush时清理缓存。
protected void doBegin(Object transaction, TransactionDefinition definition) { //将事务转换为Hibernage事务组件 HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) { throw new IllegalTransactionStateException( "Pre-bound JDBC Connection found! HibernateTransactionManager does not support " + "running within DataSourceTransactionManager if told to manage the DataSource itself. " + "It is recommended to use a single HibernateTransactionManager for all transactions " + "on a single DataSource, no matter whether Hibernate or JDBC access."); } Session session = null; //如果SessionHolder还没有被创建,就创建在一个新的Hibernate session,并放入到SessionHolder中 //这个SessionHolder稍后会绑定到线程中 try { if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) { Interceptor entityInterceptor = getEntityInterceptor(); Session newSession = (entityInterceptor != null ? getSessionFactory().withOptions().interceptor(entityInterceptor).openSession() : getSessionFactory().openSession()); if (logger.isDebugEnabled()) { logger.debug("Opened new Session [" + newSession + "] for Hibernate transaction"); } txObject.setSession(newSession); } //从SessionHolder中得到Session session = txObject.getSessionHolder().getSession(); if (this.prepareConnection && isSameConnectionForEntireSession(session)) { // We're allowed to change the transaction settings of the JDBC Connection. if (logger.isDebugEnabled()) { logger.debug("Preparing JDBC Connection of Hibernate Session [" + session + "]"); } Connection con = ((SessionImplementor) session).connection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (this.allowResultAccessAfterCompletion && !txObject.isNewSession()) { int currentHoldability = con.getHoldability(); if (currentHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT) { txObject.setPreviousHoldability(currentHoldability); con.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); } } } else { // Not allowed to change the transaction settings of the JDBC Connection. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { // We should set a specific isolation level but are not allowed to... throw new InvalidIsolationLevelException( "HibernateTransactionManager is not allowed to support custom isolation levels: " + "make sure that its 'prepareConnection' flag is on (the default) and that the " + "Hibernate connection release mode is set to 'on_close' (the default for JDBC)."); } if (logger.isDebugEnabled()) { logger.debug("Not preparing JDBC Connection of Hibernate Session [" + session + "]"); } } //如果事务方法定义为只读并且这是一个新的Session 那么设置session的FlushMode 为Never/MANUAL if (definition.isReadOnly() && txObject.isNewSession()) { // Just set to MANUAL in case of a new Session for this transaction. session.setFlushMode(FlushMode.MANUAL); } //如果事务方法不是只读方法并且这也不是一个新的Session,设置FlushMode 为Auto if (!definition.isReadOnly() && !txObject.isNewSession()) { // We need AUTO or COMMIT for a non-read-only transaction. FlushMode flushMode = session.getFlushMode(); if (session.getFlushMode().equals(FlushMode.MANUAL)) { session.setFlushMode(FlushMode.AUTO); txObject.getSessionHolder().setPreviousFlushMode(flushMode); } } Transaction hibTx; // 如果指定了timeout,需要设置timeout 然后开始事务 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { // Use Hibernate's own transaction timeout mechanism on Hibernate 3.1+ // Applies to all statements, also to inserts, updates and deletes! hibTx = session.getTransaction(); hibTx.setTimeout(timeout); hibTx.begin();//开启事务 } //如果没有指定timeout,则直接开启事务 else { // Open a plain Hibernate transaction without specified timeout. hibTx = session.beginTransaction(); } // 将这个HibernateTransaction 放入到sessionHolder中 txObject.getSessionHolder().setTransaction(hibTx); // Register the Hibernate Session's JDBC Connection for the DataSource, if set. if (getDataSource() != null) { Connection con = ((SessionImplementor) session).connection(); ConnectionHolder conHolder = new ConnectionHolder(con); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { conHolder.setTimeoutInSeconds(timeout); } if (logger.isDebugEnabled()) { logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]"); } TransactionSynchronizationManager.bindResource(getDataSource(), conHolder); txObject.setConnectionHolder(conHolder); } // 将SessionHolder绑定到内存 if (txObject.isNewSessionHolder()) { TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder()); } txObject.getSessionHolder().setSynchronizedWithTransaction(true); } catch (Throwable ex) { if (txObject.isNewSession()) { try { if (session.getTransaction().isActive()) { session.getTransaction().rollback(); } } catch (Throwable ex2) { logger.debug("Could not rollback Session after failed transaction begin", ex); } finally { SessionFactoryUtils.closeSession(session); txObject.setSessionHolder(null); } } throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex); } }
4.2 事务的挂起
在前面getTransaction的时候,会存在一些情况需要挂起当前事务。调用了suspend方法,而在这个方法中,实际调用的依然是底层的doSuspend方法。HibernateTransactionManager实现了这个方法,也就是说Hibernate事务处理器支持事务挂起。
而这个事务挂起方法实际上就是把与这个事务相关的资源与当前的sessionHolder解除关系,也可以说是与当前线程解除关系,并把它保存在另外一个容器SuspendResourcesHolder中。这样可以方便事务的恢复。
protected Object doSuspend(Object transaction) { HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; //把当前的sessionHolder从线程中和TransactionObject中释放 txObject.setSessionHolder(null); SessionHolder sessionHolder = (SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory()); //把当前的connectionHolder从线程是和TransactionObject中释放 txObject.setConnectionHolder(null); ConnectionHolder connectionHolder = null; if (getDataSource() != null) { connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource()); } return new SuspendedResourcesHolder(sessionHolder, connectionHolder); }
4.3 事务的提交
在前面的分析中,可以看到事务提交的入口是commitTransactionAfterReturning方法。
if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); }
它实际调用的依然是AbstractPlatformTransactionManager中的commit方法。这个方法与getTransaction十分相似,它是提交过程的纲领,具体的工作交由processCommit或processRollBack来处理,对应的也就是提交过程中可能出现的两种情况。
4.3.1 commit方法
在这个方法的第一行有一个状态的判断,如果事务已经完成,就会抛出异常。笔者就曾经遇到这种情况,在利用Spring开启事务的情况下,依然选择了提交事务,这个时候就抛出了异常。
public final void commit(TransactionStatus status) throws TransactionException { //如果事务已经提交完成 抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } //如果事务过程中发生异常,就要回滚 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus);//处理回滚 return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); // Throw UnexpectedRollbackException only at outermost transaction boundary // or if explicitly asked to. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } //处理提交 processCommit(defStatus); }
4.3.2 processCommit方法
这个方法是处理提交的方法,在这里完成了事务提交的准备工作。在这个提交的过程中,会对当前事务进行一次判断,如果是一个全新的事务,就交给具体的事务管理器提交,如果不是一个新的事务,则不会发起提交操作,而是把这个提交任务交给已经存在的事务进行。实际的提交动作依然由具体的事务管理器去处理。、
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { //事务的提交准备工作 prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } //如果存在嵌套事务,会首先释放掉这个嵌套事务 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } // 仅对全新的事务进行提交工作 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } //交给具体的事务管理器去处理 doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
4.3.3 doCommit方法
对于这个方法已经很简单了,就是从sessionHolder中获得transaction,然后执行提交动作。
protected void doCommit(DefaultTransactionStatus status) { HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Committing Hibernate transaction on Session [" + txObject.getSessionHolder().getSession() + "]"); } try { //获得transaction并提交 txObject.getSessionHolder().getTransaction().commit(); } catch (org.hibernate.TransactionException ex) { // assumably from commit call to the underlying JDBC connection throw new TransactionSystemException("Could not commit Hibernate transaction", ex); } catch (HibernateException ex) { // assumably failed to flush changes to database throw convertHibernateAccessException(ex); } }
4.4 事务的回滚
在前面的invokeWithinTransaction方法中,24行执行了一个completeTransactionAfterThrowing方法,在这里面处理了目标方法执行过程中发生异常的情况。异常发生后,会根据异常的类型决定是否发生回滚操作。这就是第一个回滚的入口,另外一个回滚的入口就是在4.3中提到的回滚情况。回滚的执行与commit很类似,AbstractPlatformTransactionManager中定义了一个processRollBack方法
4.4.1
processRollback方法
这个方法与processCommit是极为相似的。它首先去事务进行了一些判断,处理嵌套事务的回滚。依然只处理当新建事务中回滚,如果不是新建事务,交给前一个事务去处理回滚。实际的回滚动作依然由具体的事务管理器去处理。
private void processRollback(DefaultTransactionStatus status) { try { try { triggerBeforeCompletion(status); //处理嵌套事务的回滚 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } //只处理新建事务的回滚 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } //具体动作由具体的事务管理器完成 doRollback(status); } //非新建事务的回滚由前一个事务去处理 else if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { cleanupAfterCompletion(status); } }
4.4.2 doRollback方法
Hibernate事务管理器对事务回滚的处理也是很简单的,从SessionHolder中获得transaction并调用回滚方法。
protected void doRollback(DefaultTransactionStatus status) { //获得事务 HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Rolling back Hibernate transaction on Session [" + txObject.getSessionHolder().getSession() + "]"); } try { //Hibernate事务回滚 txObject.getSessionHolder().getTransaction().rollback(); } catch (org.hibernate.TransactionException ex) { throw new TransactionSystemException("Could not roll back Hibernate transaction", ex); } catch (HibernateException ex) { // Shouldn't really happen, as a rollback doesn't cause a flush. throw convertHibernateAccessException(ex); } finally { if (!txObject.isNewSession() && !this.hibernateManagedSession) { // Clear all pending inserts/updates/deletes in the Session. // Necessary for pre-bound Sessions, to avoid inconsistent state. txObject.getSessionHolder().getSession().clear(); } } }
5 小结
从声明式事务的整个实现可以看到,这个过程就是一个完整的Spring AOP的实现。它根据我们的配置将dao层或service层封装成代理对象,以此所有方法的调用都会触发invoke方法进入AOP环节。在实际业务方法之前,进行了事务的开启动作以及事务的挂起操作,在业务方法之后,进行了事务的提交或回滚操作。而这些实际的操作则是由底层的事务管理器去实现的。
版权声明:本文为博主原创文章,未经博主允许不得转载。