Spring 事务源码分析——Hibernate篇

在Spring与Hibernate整合的时候,可以利用Spring的事务管理机制,为我们管理事务的开启、提交、回滚等操作。这样的方式极大的减少了我们的代码量,让我们只专注于业务代码的编写。在使用Hibernate的时候,每一个操作都要经历事务开启与提交这样的操作,他们在业务代码的周围,这样来看是不是就想到了AOP编程,把这部分代码抽取出来。没错,Spring正是这样做的,Spring的事务管理就是基于AOP的。

1 Spring的事务隔离与传播

Srping的事务定义了五个隔离等级(isolation)与7个传播行为(propagation)。他们与我们了解的数据库事务有什么区别与联系呢?先介绍一下事务隔离与传播的概念

  • 事务隔离:当前事务和其他事务的隔离成都。例如,这个事务能否看到其他事务未提交的数据等。(read-uncommitted)
  • 事务传播:通常,在事务中执行的代码都会在当前事务中运行。但是,如果一个事务上下文已经存在,有几个选项可以指定该事务性方法的执行行为。如挂起、创建一个新的事务、嵌套执行等等。
Spring事务传播行为
REQUIRED 要求在当前事务环境中执行该方法,如果已处于当前环境,直接调用,否则启动新的事务执行该方法
SUPPORTS 如果当前执行线程处于事务环境中,则使用当前事务,否则不使用事务
MANDATORY 要求调用该方法的线程必须处于事务环境中,否则抛出异常
REQUIRES_NEW 该方法要求在新的事务环境中执行,如果当前线程已处于事务中,则先挂起该事务,启动新的事务;如果不处于事务中,则启动熄灯呢事务
NOT_SUPPORTED 如果调用该方法的线程处于事务中,则先暂停该事务,然后执行该方法
NEVER 不允许调用该方法的线程处于事务中,如果该线程处于事务中,抛出异常
NESTED 如果已经处于事务中,启动新的事务并嵌套执行

这里面的描述视乎不太容易理解,在下面的源码分析中,就会看到他们是如何起作用的了。到时候再对照这些描述,就很容易理解了。

2 Spring事务的基本流程

Spring的事务过程与我们理解的AOP过程是很相似的,它大致分为两个阶段,图中蓝色区域表示的是事务的准备阶段,这个阶段主要完成了事务的准备工作,包括事务属性的读取,事务的创建等工作。这部分代码基本定义在AbstractPlatformTransactionManager类中,也就是说他的大部分代码是与平台无关的;第二个阶段是图中绿色区域,这里进入到事务的实施阶段,包括事务的开启、提交、回滚等操作。这部分代码对于不同的ORM框架来说是不相同的,所以他们都定义在各自的transactionManager中,这也是典型的策略模式,本文将以HibernateTransactionManager源码进行分析。

3 事务准备工作

3.1 事务从哪里开始?

先来看applicationContext中与事务有关的配置文档

<!-- 配置事务增强处理Bean,指定事务管理器 -->
	<tx:advice id="txAdvice" transaction-manager="transactionManager">
		<!-- 用于配置详细的事务语义 -->
		<tx:attributes>
			<!-- 所有以'get'开头的方法是read-only的 -->
			<tx:method name="get*" read-only="true"   />
			<!-- 其他方法使用默认的事务设置 -->
			<tx:method name="*" propagation="REQUIRED" isolation="DEFAULT"/>
		</tx:attributes>
	</tx:advice>
	<bean id="test" class="com.songxu.entity.Log"></bean>

	<aop:config expose-proxy="true">
		<!-- 只对业务逻辑层实施事务 -->
		<aop:pointcut id="txPointcut" expression="execution(* com.songxu.entity.*.*(..))" />
		<!-- Advisor定义,切入点和通知分别为txPointcut、txAdvice -->
		<aop:advisor pointcut-ref="txPointcut" advice-ref="txAdvice" />
	</aop:config>

笔者在分析源码之前,这个<tx:advice>标签是如何被包装成advisor的,这必须要看一看它的schema文件了,这里截取了一小段来看

看到这里是不是有点感觉了,这个<tx:advice>实际对应的是TransactionInterceptor这个类,它配置的属性最后都作为事务的属性注入到这个类中。找到这个类,也就找到了我们熟悉的invoke方法。对于XXXInterceptor这样的类,它一定是为代理对象服务的。但是我们似乎没有明确的指定出那个类或接口作为目标对象。笔者又翻看了log输出,在其中找到了答案。其实在我们指定<aop:pointcut>的时候,对应的表达式指定的就是那些类被作为代理类。我们通常在这里会指定一个包的范围,他们或是service层或是dao层。因为数据库事务通常需要在这里开启。当我们调用这些类里面的方法时候,通常也是在访问数据库的过程,这时就会执行invoke方法,进入了AOP事务增强的方法链,也就完成了事务的工作。

3.2 invoke

invoke方法是输入aop的入口,对于事务也不例外。从这里开始,就进入到了Spring 事务阶段。它实际调用的是TransactionAspectSupport类的invokeWithinTransaction方法

@Override
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
		//这里实际调用TransactionAspectSupport类的方法
		return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
			@Override
			// 注册一个回调方法  回调实际业务方法
			public Object proceedWithInvocation() throws Throwable {
				return invocation.proceed();
			}
		});
	}

invokeWithinTransaction 方法

它实际上是整个事务过程的纲领性方法。所有的过程都在这里完成,这个过程也很清晰。首先读取了事务的配置属于,然后得到事务的处理器,获得一个事务存放在TransactionInfo里面(这里实际暗含了开启事务的操作,后面会介绍到),然后调用目标方法,然后根据实际情况提交或是回滚,最后释放掉TranscationInfo对象。

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
			throws Throwable {

		// 读取事务的属性配置
		final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
		//获得具体的事务处理器
		final PlatformTransactionManager tm = determineTransactionManager(txAttr);
		final String joinpointIdentification = methodIdentification(method, targetClass);
		/**
		*这里区分不同类型的PlatformTransactionManager 因为他们的调用方式不同
		对于CallbackPreferringPlatformTransactionManager来说,它需要以回调函数的方式实现事务的创建与提交
		*/
		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
			//获取事务,并把他们放到TransactionInfo中。
			//这个TransactionInfo就是一个小的容器,里面包含了与事务有关的属性信息。在事务关闭的时候需要释放掉
			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
			Object retVal = null;
			try {
				//这里非常熟悉,通过方法链调用来执行目标方法
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// 如果发生了异常,需要根据实际情况回滚或提交
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
				//释放掉TransactionInfo
				cleanupTransactionInfo(txInfo);
			}
			//进行事务提交
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}
		//采用回调方法使用事务处理器
		else {
			// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
			try {
				Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
						new TransactionCallback<Object>() {
							@Override
							public Object doInTransaction(TransactionStatus status) {
								TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
								try {
									return invocation.proceedWithInvocation();
								}
								catch (Throwable ex) {
									if (txAttr.rollbackOn(ex)) {
										// A RuntimeException: will lead to a rollback.
										if (ex instanceof RuntimeException) {
											throw (RuntimeException) ex;
										}
										else {
											throw new ThrowableHolderException(ex);
										}
									}
									else {
										// A normal return value: will lead to a commit.
										return new ThrowableHolder(ex);
									}
								}
								finally {
									cleanupTransactionInfo(txInfo);
								}
							}
						});

				// Check result: It might indicate a Throwable to rethrow.
				if (result instanceof ThrowableHolder) {
					throw ((ThrowableHolder) result).getThrowable();
				}
				else {
					return result;
				}
			}
			catch (ThrowableHolderException ex) {
				throw ex.getCause();
			}
		}
	}

3.3 创建事务(createTransactionIfNecessary && getTransaction)

在事务开启之前,首先要有事务,这就是第一步创建或获取事务的过程。这个过程的入口是createTransactionIfNecessary 方法。

3.3.1 createTransactionIfNecessary

在这个方法中,可以看到两个重要的数据对象TransactionSatus和TransactionInfo的创建,这两个对象持有的数据是事务处理器对事务进行处理的重要依据,这两耳光对象的使用贯穿整个事务处理的过程。在这个方法的最后一行是对另外一个准备方法的调用,它构造了一个TransactionInfo对象,并把这个对象绑定到了当前线程中,同时在TransactionInfo对象中由一个变量来保存以前的TransactionInfo。这样就有了一连串的TransactionInfo,虽然不一定总是创建新的事务,但是一定会创建这样一个TransactionInfo对象。

protected TransactionInfo createTransactionIfNecessary(
			PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

		// 如果没有指定名字,使用方法特征作为事务名
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {
					return joinpointIdentification;
				}
			};
		}
		// 这个TransactionStatus封装了事务执行的状态信息
		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
				/*
				这里使用定义好的属性信息创建事务
				事务创建通过事务管理器来完成,同时返回状态信息
				*/
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
		// 构造一个TransactionInfo对象封装事务的信息,并把这个对象与线程绑定
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

3.3.2 getTransaction 获取事务

这个方法定义了获取事务的基本方法,在里面根据我们配置的不同,来创建或挂起事务,这些配置就是我们最开始提到的事务传播行为。

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
		Object transaction = doGetTransaction();

		// 缓存log的debug开关,避免总是去检验
		boolean debugEnabled = logger.isDebugEnabled();

		if (definition == null) {
			// 如果没有设置事务属性,那么使用默认的事务
			definition = new DefaultTransactionDefinition();
		}

		if (isExistingTransaction(transaction)) {
			// 如果当前线程已经存在一个事务,那么就按照存在的方式去处理
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}
		//当前线程不存在事务,就会创建一个新的事务
		// Check definition settings for new transaction.
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}

		// 如果指定了事务传播为PROPAGATION_MANDATORY 那么又没有当前事务,就会抛出异常
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		//如果指定了传播为PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 就创建一个新的事务
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			//不需要挂起任何事务
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
			}
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				//创建与开始事务的实际调用,这是由具体的事务管理器来完成的,例如HibernateTransactionManager
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException ex) {
				resume(null, suspendedResources);
				throw ex;
			}
			catch (Error err) {
				resume(null, suspendedResources);
				throw err;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}

对于创建事务,当然还存在一种情况就是当前线程已经存在了一个事务,那么就需要handleExistingTransaction这个方法去解决这一问题。

private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {
		//如果设置为PROPAGATION_NEVER,又存在一个事务,就要抛出异常
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}
		//如果设置为PROPAGATION_NOT_SUPPORTED,又存在一个事务,就将这个事务挂起
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			//这里的前两个参数为null和false,说明事务不需要放在事务环境中,同时挂起的事务也被保存在这里
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}
		//如果设置为PROPAGATION_REQUIRES_NEW ,则创建新的事务,把当前事务挂起
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				// 这里保存了新的事务信息,同时也保存了挂起的事务信息
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
			catch (Error beginErr) {
				resumeAfterBeginException(transaction, suspendedResources, beginErr);
				throw beginErr;
			}
		}
		//创建嵌套的事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
		}

		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}

4 基于Hibernate的事务管理器

4.1事务的开启

在前面的getTransaction方法中有一个doBegin方法,它是事务开启的实际调用方法。这个方法也是由不同的平台去实现的,来看一看HibernateTransactionManager是如何实现这个方法的。

这个过程大致分为两个阶段,第一构造一个sessionHolder对象,这里面封装了HibernateSession 以及HibernateTransaction ,如果session或Transaction不存在,需要通过sessionFactory获得;第二个阶段就是开启transaction,即transaction.begin();同时把sessionHolder绑定到线程。

在这里有一个步骤是在设置flushmode,FlushMode是session的刷新模式,它指定了session在查询、flush或commit方法时的动作,设置为auto的时候,以前三个动作都会清理session缓存,如果设置为NEVER(MANUAL)时,只有在flush时清理缓存。

protected void doBegin(Object transaction, TransactionDefinition definition) {
		//将事务转换为Hibernage事务组件
		HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;

		if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			throw new IllegalTransactionStateException(
					"Pre-bound JDBC Connection found! HibernateTransactionManager does not support " +
					"running within DataSourceTransactionManager if told to manage the DataSource itself. " +
					"It is recommended to use a single HibernateTransactionManager for all transactions " +
					"on a single DataSource, no matter whether Hibernate or JDBC access.");
		}

		Session session = null;
		//如果SessionHolder还没有被创建,就创建在一个新的Hibernate session,并放入到SessionHolder中
		//这个SessionHolder稍后会绑定到线程中
		try {
			if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) {
				Interceptor entityInterceptor = getEntityInterceptor();
				Session newSession = (entityInterceptor != null ?
						getSessionFactory().withOptions().interceptor(entityInterceptor).openSession() :
						getSessionFactory().openSession());
				if (logger.isDebugEnabled()) {
					logger.debug("Opened new Session [" + newSession + "] for Hibernate transaction");
				}
				txObject.setSession(newSession);
			}
			//从SessionHolder中得到Session
			session = txObject.getSessionHolder().getSession();

			if (this.prepareConnection && isSameConnectionForEntireSession(session)) {
				// We're allowed to change the transaction settings of the JDBC Connection.
				if (logger.isDebugEnabled()) {
					logger.debug("Preparing JDBC Connection of Hibernate Session [" + session + "]");
				}
				Connection con = ((SessionImplementor) session).connection();
				Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
				txObject.setPreviousIsolationLevel(previousIsolationLevel);
				if (this.allowResultAccessAfterCompletion && !txObject.isNewSession()) {
					int currentHoldability = con.getHoldability();
					if (currentHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
						txObject.setPreviousHoldability(currentHoldability);
						con.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
					}
				}
			}
			else {
				// Not allowed to change the transaction settings of the JDBC Connection.
				if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
					// We should set a specific isolation level but are not allowed to...
					throw new InvalidIsolationLevelException(
							"HibernateTransactionManager is not allowed to support custom isolation levels: " +
							"make sure that its 'prepareConnection' flag is on (the default) and that the " +
							"Hibernate connection release mode is set to 'on_close' (the default for JDBC).");
				}
				if (logger.isDebugEnabled()) {
					logger.debug("Not preparing JDBC Connection of Hibernate Session [" + session + "]");
				}
			}
			//如果事务方法定义为只读并且这是一个新的Session  那么设置session的FlushMode 为Never/MANUAL

			if (definition.isReadOnly() && txObject.isNewSession()) {
				// Just set to MANUAL in case of a new Session for this transaction.
				session.setFlushMode(FlushMode.MANUAL);
			}
			//如果事务方法不是只读方法并且这也不是一个新的Session,设置FlushMode 为Auto
			if (!definition.isReadOnly() && !txObject.isNewSession()) {
				// We need AUTO or COMMIT for a non-read-only transaction.
				FlushMode flushMode = session.getFlushMode();
				if (session.getFlushMode().equals(FlushMode.MANUAL)) {
					session.setFlushMode(FlushMode.AUTO);
					txObject.getSessionHolder().setPreviousFlushMode(flushMode);
				}
			}

			Transaction hibTx;

			// 如果指定了timeout,需要设置timeout 然后开始事务
			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				// Use Hibernate's own transaction timeout mechanism on Hibernate 3.1+
				// Applies to all statements, also to inserts, updates and deletes!
				hibTx = session.getTransaction();
				hibTx.setTimeout(timeout);
				hibTx.begin();//开启事务
			}
			//如果没有指定timeout,则直接开启事务
			else {
				// Open a plain Hibernate transaction without specified timeout.
				hibTx = session.beginTransaction();
			}

			// 将这个HibernateTransaction 放入到sessionHolder中
			txObject.getSessionHolder().setTransaction(hibTx);

			// Register the Hibernate Session's JDBC Connection for the DataSource, if set.
			if (getDataSource() != null) {
				Connection con = ((SessionImplementor) session).connection();
				ConnectionHolder conHolder = new ConnectionHolder(con);
				if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
					conHolder.setTimeoutInSeconds(timeout);
				}
				if (logger.isDebugEnabled()) {
					logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]");
				}
				TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
				txObject.setConnectionHolder(conHolder);
			}

			// 将SessionHolder绑定到内存
			if (txObject.isNewSessionHolder()) {
				TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder());
			}
			txObject.getSessionHolder().setSynchronizedWithTransaction(true);
		}

		catch (Throwable ex) {
			if (txObject.isNewSession()) {
				try {
					if (session.getTransaction().isActive()) {
						session.getTransaction().rollback();
					}
				}
				catch (Throwable ex2) {
					logger.debug("Could not rollback Session after failed transaction begin", ex);
				}
				finally {
					SessionFactoryUtils.closeSession(session);
					txObject.setSessionHolder(null);
				}
			}
			throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex);
		}
	}

4.2 事务的挂起

在前面getTransaction的时候,会存在一些情况需要挂起当前事务。调用了suspend方法,而在这个方法中,实际调用的依然是底层的doSuspend方法。HibernateTransactionManager实现了这个方法,也就是说Hibernate事务处理器支持事务挂起。

而这个事务挂起方法实际上就是把与这个事务相关的资源与当前的sessionHolder解除关系,也可以说是与当前线程解除关系,并把它保存在另外一个容器SuspendResourcesHolder中。这样可以方便事务的恢复。

protected Object doSuspend(Object transaction) {
		HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
		//把当前的sessionHolder从线程中和TransactionObject中释放
		txObject.setSessionHolder(null);
		SessionHolder sessionHolder =
				(SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory());
		//把当前的connectionHolder从线程是和TransactionObject中释放
		txObject.setConnectionHolder(null);
		ConnectionHolder connectionHolder = null;
		if (getDataSource() != null) {
			connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
		}
		return new SuspendedResourcesHolder(sessionHolder, connectionHolder);
	}

4.3 事务的提交

在前面的分析中,可以看到事务提交的入口是commitTransactionAfterReturning方法。

if (txInfo != null && txInfo.hasTransaction()) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
			}
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}

它实际调用的依然是AbstractPlatformTransactionManager中的commit方法。这个方法与getTransaction十分相似,它是提交过程的纲领,具体的工作交由processCommit或processRollBack来处理,对应的也就是提交过程中可能出现的两种情况。

4.3.1 commit方法

在这个方法的第一行有一个状态的判断,如果事务已经完成,就会抛出异常。笔者就曾经遇到这种情况,在利用Spring开启事务的情况下,依然选择了提交事务,这个时候就抛出了异常。

public final void commit(TransactionStatus status) throws TransactionException {
		//如果事务已经提交完成 抛出异常
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}
		//如果事务过程中发生异常,就要回滚
		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
			processRollback(defStatus);//处理回滚
			return;
		}
		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus);
			// Throw UnexpectedRollbackException only at outermost transaction boundary
			// or if explicitly asked to.
			if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
				throw new UnexpectedRollbackException(
						"Transaction rolled back because it has been marked as rollback-only");
			}
			return;
		}
		//处理提交
		processCommit(defStatus);
	}

4.3.2 processCommit方法

这个方法是处理提交的方法,在这里完成了事务提交的准备工作。在这个提交的过程中,会对当前事务进行一次判断,如果是一个全新的事务,就交给具体的事务管理器提交,如果不是一个新的事务,则不会发起提交操作,而是把这个提交任务交给已经存在的事务进行。实际的提交动作依然由具体的事务管理器去处理。、

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
				//事务的提交准备工作
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				}
				//如果存在嵌套事务,会首先释放掉这个嵌套事务
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					status.releaseHeldSavepoint();
				}
				// 仅对全新的事务进行提交工作
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					//交给具体的事务管理器去处理
					doCommit(status);
				}
				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (globalRollbackOnly) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}
			catch (Error err) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, err);
				throw err;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
			cleanupAfterCompletion(status);
		}
	}

4.3.3 doCommit方法

对于这个方法已经很简单了,就是从sessionHolder中获得transaction,然后执行提交动作。

protected void doCommit(DefaultTransactionStatus status) {
		HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();
		if (status.isDebug()) {
			logger.debug("Committing Hibernate transaction on Session [" +
					txObject.getSessionHolder().getSession() + "]");
		}
		try {
			//获得transaction并提交
			txObject.getSessionHolder().getTransaction().commit();
		}
		catch (org.hibernate.TransactionException ex) {
			// assumably from commit call to the underlying JDBC connection
			throw new TransactionSystemException("Could not commit Hibernate transaction", ex);
		}
		catch (HibernateException ex) {
			// assumably failed to flush changes to database
			throw convertHibernateAccessException(ex);
		}
	}

4.4 事务的回滚

在前面的invokeWithinTransaction方法中,24行执行了一个completeTransactionAfterThrowing方法,在这里面处理了目标方法执行过程中发生异常的情况。异常发生后,会根据异常的类型决定是否发生回滚操作。这就是第一个回滚的入口,另外一个回滚的入口就是在4.3中提到的回滚情况。回滚的执行与commit很类似,AbstractPlatformTransactionManager中定义了一个processRollBack方法

4.4.1
processRollback方法

这个方法与processCommit是极为相似的。它首先去事务进行了一些判断,处理嵌套事务的回滚。依然只处理当新建事务中回滚,如果不是新建事务,交给前一个事务去处理回滚。实际的回滚动作依然由具体的事务管理器去处理。

private void processRollback(DefaultTransactionStatus status) {
		try {
			try {
				triggerBeforeCompletion(status);
				//处理嵌套事务的回滚
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Rolling back transaction to savepoint");
					}
					status.rollbackToHeldSavepoint();
				}
				//只处理新建事务的回滚
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction rollback");
					}
					//具体动作由具体的事务管理器完成
					doRollback(status);
				}
				//非新建事务的回滚由前一个事务去处理
				else if (status.hasTransaction()) {
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
			}
			catch (RuntimeException ex) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw ex;
			}
			catch (Error err) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw err;
			}
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
		}
		finally {
			cleanupAfterCompletion(status);
		}
	}

4.4.2 doRollback方法

Hibernate事务管理器对事务回滚的处理也是很简单的,从SessionHolder中获得transaction并调用回滚方法。

protected void doRollback(DefaultTransactionStatus status) {
	//获得事务
		HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();
		if (status.isDebug()) {
			logger.debug("Rolling back Hibernate transaction on Session [" +
					txObject.getSessionHolder().getSession() + "]");
		}
		try {
			//Hibernate事务回滚
			txObject.getSessionHolder().getTransaction().rollback();
		}
		catch (org.hibernate.TransactionException ex) {
			throw new TransactionSystemException("Could not roll back Hibernate transaction", ex);
		}
		catch (HibernateException ex) {
			// Shouldn't really happen, as a rollback doesn't cause a flush.
			throw convertHibernateAccessException(ex);
		}
		finally {
			if (!txObject.isNewSession() && !this.hibernateManagedSession) {
				// Clear all pending inserts/updates/deletes in the Session.
				// Necessary for pre-bound Sessions, to avoid inconsistent state.
				txObject.getSessionHolder().getSession().clear();
			}
		}
	}

5 小结

从声明式事务的整个实现可以看到,这个过程就是一个完整的Spring AOP的实现。它根据我们的配置将dao层或service层封装成代理对象,以此所有方法的调用都会触发invoke方法进入AOP环节。在实际业务方法之前,进行了事务的开启动作以及事务的挂起操作,在业务方法之后,进行了事务的提交或回滚操作。而这些实际的操作则是由底层的事务管理器去实现的。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-05 14:50:01

Spring 事务源码分析——Hibernate篇的相关文章

spring事务源码分析结合mybatis源码(一)

最近想提升,苦逼程序猿,想了想还是拿最熟悉,之前也一直想看但没看的spring源码来看吧,正好最近在弄事务这部分的东西,就看了下,同时写下随笔记录下,以备后查. spring tx源码分析 这里只分析简单事务也就是DataSourceTransactionManager 首先肯定找入口了,看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一

spring事务源码分析结合mybatis源码(三)

下面将结合mybatis源码来分析下,这种持久化框架是如何对connection使用,来达到spring事务的控制. 想要在把mybatis跟spring整合都需要这样一个jar包:mybatis-spring-x.x.x.jar,这里面定义了一些主要的整合信息. 在spring配置文件中需要配置如下两个bean: <!-- mybatis配置 --> <bean id="sqlSessionFactory" class="org.mybatis.sprin

Spring事务源码分析总结

Spring事务是我们日常工作中经常使用的一项技术,Spring提供了编程.注解.aop切面三种方式供我们使用Spring事务,其中编程式事务因为对代码入侵较大所以不被推荐使用,注解和aop切面的方式可以基于需求自行选择,我们以注解的方式为例来分析Spring事务的原理和源码实现. //配置事务管理器<tx:annotation-driven transaction-manager="transactionManager"/> <bean id="trans

Spring事务源码分析

首先看例子,这例子摘抄自开涛的跟我学spring3. @Test public void testPlatformTransactionManager() { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); def.setPropagationBehavior(T

spring事务源码分析结合mybatis源码(二)

让我们继续上篇,分析下如果有第二个调用进入的过程. 代码部分主要是下面这个: if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txOb

spring事务源码研读1

转载摘录自:Spring事务源码分析(一)Spring事务入门 有时为了保证一些操作要么都成功,要么都失败,这就需要事务来保证. 传统的jdbc事务如下: @Test public void testAdd(){ Connection con=null; try { con=DriverManager.getConnection(url , username , password ) con.setAutoCommit(false); //操作一 PreparedStatement ps = c

Spring AMQP 源码分析 06 - 手动消息确认

### 准备 ## 目标 了解 Spring AMQP 如何手动确认消息已成功消费 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#message-listener-adapter> Sample code:<https:

结合ThreadLocal来看spring事务源码,感受下清泉般的洗涤!

在我的博客spring事务源码解析中,提到了一个很关键的点:将connection绑定到当前线程来保证这个线程中的数据库操作用的是同一个connection.但是没有细致的讲到如何绑定,以及为什么这么绑定:另外也没有讲到连接池的相关问题:如何从连接池获取,如何归还连接到连接池等等.那么下面就请听我慢慢道来. ThreadLocal 讲spring事务之前,我们先来看看ThreadLocal,它在spring事务中是占据着比较重要的地位:不管你对ThreadLocal熟悉与否,且都静下心来听我唐僧

Spring IoC 源码分析 (基于注解) 之 包扫描

在上篇文章Spring IoC 源码分析 (基于注解) 一我们分析到,我们通过AnnotationConfigApplicationContext类传入一个包路径启动Spring之后,会首先初始化包扫描的过滤规则.那我们今天就来看下包扫描的具体过程. 还是先看下面的代码: AnnotationConfigApplicationContext类 //该构造函数会自动扫描以给定的包及其子包下的所有类,并自动识别所有的Spring Bean,将其注册到容器中 public AnnotationConf