Spring事务源码分析

首先看例子,这例子摘抄自开涛的跟我学spring3。


@Test

public void testPlatformTransactionManager() {

DefaultTransactionDefinition def = new DefaultTransactionDefinition();

def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);

def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

TransactionStatus status = txManager.getTransaction(def);

try {

jdbcTemplate.update(INSERT_SQL, "test");

txManager.commit(status);

} catch (RuntimeException e) {

txManager.rollback(status);

}

}

重要的代码在上面高亮处。

在执行jdbcTemplate.update的时候使用的是datasource.getConection获取连接。

实际上,

  • 在执行txManager.getTransaction(def);的时候,应该会设置:conection.setAutoConmmit(false)。
  • 在执行txManager.commit(status);的时候,应该是执行conection.commit();
  • 在执行txManager. rollback (status);的时候,应该是执行conection. rollback ();

但是,Spring是如何保证,txManager中的conn就是jdbcTemplate中的conn的呢。从这点出发,开始看源代码。

因为是执行的jdbc操作,这里的txManager是DataSourceTransactionManager。我们来看代码:

getTransaction方法:

getTransaction方法在DataSourceTransactionManager的超类中,也就是AbstractPlatformTransactionManager,我们来看方法:


public
final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {

Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.

boolean
debugEnabled = logger.isDebugEnabled();

if (definition == null) {

// Use defaults if no transaction definition given.

definition = new DefaultTransactionDefinition();

}

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

// Check definition settings for new transaction.

if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {

throw
new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

}

// No existing transaction found -> check propagation behavior to find out how to proceed.

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {

throw
new IllegalTransactionStateException(

"No existing transaction found for transaction marked with propagation ‘mandatory‘");

}

else
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean
newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return
status;

}

catch (RuntimeException ex) {

resume(null, suspendedResources);

throw
ex;

}

catch (Error err) {

resume(null, suspendedResources);

throw
err;

}

}

else {

// Create "empty" transaction: no actual transaction, but potentially synchronization.

boolean
newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);

}

}

先看第一句,

Object transaction = doGetTransaction();

方法在AbstractPlatformTransactionManager中,方法为:

protected
abstract Object doGetTransaction() throws TransactionException;

这是典型的模板方法设计模式,AbstractPlatformTransactionManager作为抽象类,定义了getTransaction方法,并且设置为final,然后方法内部调用的部分方法是protected
abstract的,交给子类去实现。

我们来看在DataSourceTransactionManager类中的doGetTransaction方法的定义:


@Override

protected Object doGetTransaction() {

DataSourceTransactionObject txObject = new DataSourceTransactionObject();

txObject.setSavepointAllowed(isNestedTransactionAllowed());

ConnectionHolder conHolder =

(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);

txObject.setConnectionHolder(conHolder, false);

return
txObject;

}

注意这里,是new了一个DataSourceTransactionObject对象,重要的是高亮的两句。txObject中有一个ConnectionHolder对象,这么说来,在这一步的时候有可能已经在事务对象(DataSourceTransactionObject)中,保存了一个ConnectionHolder对象,顾名思义,ConnectionHolder中必然有Connection。如果是这样,我们只要确定,在执行jdbc操作的时候使用的Connection和这个ConnectionHolder中的是同一个就可以了。我们先看ConnectionHolder的结构。

确实如我们所想。

我们再看TransactionSynchronizationManager.getResource(this.dataSource);代码如何获取ConnectionHolder的。

TransactionSynchronizationManager这个名字,应该是支持多线程并发读取的。我们看代码。


public
static Object getResource(Object key) {

Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);

Object value = doGetResource(actualKey);

if (value != null && logger.isTraceEnabled()) {

logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +

Thread.currentThread().getName() + "]");

}

return
value;

}

看Object value = doGetResource(actualKey);代码:


private
static Object doGetResource(Object actualKey) {

Map<Object, Object> map = resources.get();

if (map == null) {

return
null;

}

Object value = map.get(actualKey);

// Transparently remove ResourceHolder that was marked as void...

if (value
instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {

map.remove(actualKey);

// Remove entire ThreadLocal if empty...

if (map.isEmpty()) {

resources.remove();

}

value = null;

}

return
value;

}

高亮代码,看起来就是从一个map中获取了返回的结果,获取的时候使用的key是上一个方法传入的datasource。

看看这个map是什么。


private
static
final ThreadLocal<Map<Object, Object>> resources =

new
NamedThreadLocal<Map<Object, Object>>("Transactional resources");

看来是ThreadLocal对象。

那么这个对象是在什么时候初始化的呢。

经过查看是在这个方法:


public
static
void bindResource(Object key, Object value) throws IllegalStateException {

那么那个地方调了这个方法呢?

经过查看,又回到了DataSourceTransactionManager类:


@Override

protected
void doResume(Object transaction, Object suspendedResources) {

ConnectionHolder
conHolder = (ConnectionHolder) suspendedResources;

TransactionSynchronizationManager.bindResource(this.dataSource, conHolder);

}

但是这个是在事务执行完毕的时候执行的,所以如果我们是第一次在当前线程执行事务,那么回到最初的代码:


public
final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {

Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.

boolean
debugEnabled = logger.isDebugEnabled();

if (definition == null) {

// Use defaults if no transaction definition given.

definition = new DefaultTransactionDefinition();

}

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

// Check definition settings for new transaction.

if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {

throw
new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

}

// No existing transaction found -> check propagation behavior to find out how to proceed.

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {

throw
new IllegalTransactionStateException(

"No existing transaction found for transaction marked with propagation ‘mandatory‘");

}

else
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean
newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return
status;

}

catch (RuntimeException ex) {

resume(null, suspendedResources);

throw
ex;

}

catch (Error err) {

resume(null, suspendedResources);

throw
err;

}

}

else {

// Create "empty" transaction: no actual transaction, but potentially synchronization.

boolean
newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);

}

}

Object transaction = doGetTransaction();

这里的transaction中应该是没有connection的。

继续往下看:


if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

其中,isExistingTransaction:


@Override

protected
boolean
isExistingTransaction(Object transaction) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());

}

这是是判断txObject种有没有ConnectionHolder,也就是当前线程是否已经执行过事务。

我们忽略有的情况,主要看没有的情况,也就是说当前线程第一次处理事务的情况。

继续看最初的代码,主要看这段:


else
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean
newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

DefaultTransactionStatus status = newTransactionStatus(

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return
status;

}

catch (RuntimeException ex) {

resume(null, suspendedResources);

throw
ex;

}

catch (Error err) {

resume(null, suspendedResources);

throw
err;

}

}

看doBegin(transaction, definition);


@Override

protected
void
doBegin(Object transaction, TransactionDefinition definition) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

Connection con = null;

try {

if (txObject.getConnectionHolder() == null ||

txObject.getConnectionHolder().isSynchronizedWithTransaction()) {

Connection newCon = this.dataSource.getConnection();

if (logger.isDebugEnabled()) {

logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");

}

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);

con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);

txObject.setPreviousIsolationLevel(previousIsolationLevel);

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,

// so we don‘t want to do it unnecessarily (for example if we‘ve explicitly

// configured the connection pool to set it already).

if (con.getAutoCommit()) {

txObject.setMustRestoreAutoCommit(true);

if (logger.isDebugEnabled()) {

logger.debug("Switching JDBC Connection [" + con + "] to manual commit");

}

con.setAutoCommit(false);

}

txObject.getConnectionHolder().setTransactionActive(true);

int
timeout = determineTimeout(definition);

if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {

txObject.getConnectionHolder().setTimeoutInSeconds(timeout);

}

// Bind the session holder to the thread.

if (txObject.isNewConnectionHolder()) {

TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());

}

}

catch (Throwable ex) {

if (txObject.isNewConnectionHolder()) {

DataSourceUtils.releaseConnection(con, this.dataSource);

txObject.setConnectionHolder(null, false);

}

throw
new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);

}

}

这里新建了一个Connection,并且将这个Connection绑定到了TransactionSynchronizationManager中,也就是上面的:


private
static
final ThreadLocal<Map<Object, Object>> resources =

new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

至此,我们只需要确定,我们使用datasource.getConction()的时候,也是从TransactionSynchronizationManager获取的就好。

时间: 2024-08-01 10:44:22

Spring事务源码分析的相关文章

spring事务源码分析结合mybatis源码(一)

最近想提升,苦逼程序猿,想了想还是拿最熟悉,之前也一直想看但没看的spring源码来看吧,正好最近在弄事务这部分的东西,就看了下,同时写下随笔记录下,以备后查. spring tx源码分析 这里只分析简单事务也就是DataSourceTransactionManager 首先肯定找入口了,看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一

Spring 事务源码分析——Hibernate篇

在Spring与Hibernate整合的时候,可以利用Spring的事务管理机制,为我们管理事务的开启.提交.回滚等操作.这样的方式极大的减少了我们的代码量,让我们只专注于业务代码的编写.在使用Hibernate的时候,每一个操作都要经历事务开启与提交这样的操作,他们在业务代码的周围,这样来看是不是就想到了AOP编程,把这部分代码抽取出来.没错,Spring正是这样做的,Spring的事务管理就是基于AOP的. 1 Spring的事务隔离与传播 Srping的事务定义了五个隔离等级(isolat

spring事务源码分析结合mybatis源码(三)

下面将结合mybatis源码来分析下,这种持久化框架是如何对connection使用,来达到spring事务的控制. 想要在把mybatis跟spring整合都需要这样一个jar包:mybatis-spring-x.x.x.jar,这里面定义了一些主要的整合信息. 在spring配置文件中需要配置如下两个bean: <!-- mybatis配置 --> <bean id="sqlSessionFactory" class="org.mybatis.sprin

Spring事务源码分析总结

Spring事务是我们日常工作中经常使用的一项技术,Spring提供了编程.注解.aop切面三种方式供我们使用Spring事务,其中编程式事务因为对代码入侵较大所以不被推荐使用,注解和aop切面的方式可以基于需求自行选择,我们以注解的方式为例来分析Spring事务的原理和源码实现. //配置事务管理器<tx:annotation-driven transaction-manager="transactionManager"/> <bean id="trans

spring事务源码分析结合mybatis源码(二)

让我们继续上篇,分析下如果有第二个调用进入的过程. 代码部分主要是下面这个: if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txOb

spring事务源码研读1

转载摘录自:Spring事务源码分析(一)Spring事务入门 有时为了保证一些操作要么都成功,要么都失败,这就需要事务来保证. 传统的jdbc事务如下: @Test public void testAdd(){ Connection con=null; try { con=DriverManager.getConnection(url , username , password ) con.setAutoCommit(false); //操作一 PreparedStatement ps = c

Spring AMQP 源码分析 05 - 异常处理

### 准备 ## 目标 了解 Spring AMQP Message Listener 如何处理异常 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#exception-handling> Sample code:<ht

Spring AMQP 源码分析 07 - MessageListenerAdapter

### 准备 ## 目标 了解 Spring AMQP 如何用 POJO 处理消息 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#message-listener-adapter> Sample code:<https:

Spring AMQP 源码分析 06 - 手动消息确认

### 准备 ## 目标 了解 Spring AMQP 如何手动确认消息已成功消费 ## 前置知识 <Spring AMQP 源码分析 04 - MessageListener> ## 相关资源 Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#message-listener-adapter> Sample code:<https: