spring事务源码分析结合mybatis源码(一)

最近想提升,苦逼程序猿,想了想还是拿最熟悉,之前也一直想看但没看的spring源码来看吧,正好最近在弄事务这部分的东西,就看了下,同时写下随笔记录下,以备后查。

spring tx源码分析

这里只分析简单事务也就是DataSourceTransactionManager

首先肯定找入口了,看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一个普通的切面类,只要符合AOP规则的调用都会进入此切面。

在invoke方法中最重要的一段代码:这里主要分析一个新的事务的开始过程

    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    final TransactionAttribute txAttr =
            getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);//获取配置的TransactionAttribute信息
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(invocation.getMethod(), targetClass);
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//开启一个新的事务
        Object retVal = null;
        try {
            retVal = invocation.proceed();//原有逻辑执行
        }
        catch (Throwable ex) {
            completeTransactionAfterThrowing(txInfo, ex);//发生异常时候对异常的处理
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);//清理TransactionInfo信息
        }
        commitTransactionAfterReturning(txInfo);//提交事务
        return retVal;

首先开启事务,也就是调用createTransactionIfNecessary方法:

protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                status = tm.getTransaction(txAttr);
            }
            else {
                }
            }
        }
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

这里其实主要就是调用PlatformTransactionManager的getTransactionf方法来获取TransactionStatus来开启一个事务:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        Object transaction = doGetTransaction();
        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }
        if (isExistingTransaction(transaction)) {//这个判断很重要,是否已经存在的一个transaction
            return handleExistingTransaction(definition, transaction, debugEnabled);//如果是存在的将进行一些处理
        }
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation ‘mandatory‘");
        }
        //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED这三种类型将开启一个新的事务
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);//开启新事物
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

这段代码比较长也是比较核心的一段代码,让我们来慢慢分析,首先这里将执行doGetTransaction方法来获取一个transaction

protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        //这一行代码中TransactionSynchronizationManager很重要,是对connection的核心获取、持有、删除等
        txObject.setConnectionHolder(conHolder, false);
        //这里不论获取到或者获取不到都将此设置newConnectionHolder为false
        return txObject;
    }

这段代码中主要是根据this.dataSource来获取ConnectionHolder,这个ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次来获取,肯定得到是null。

接着代码往下将执行到isExistingTransaction(transaction),这里主要是依据下面代码判断:

txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()

如果是第一次开启事务这里必然是false,否则将返回true。

我们这里先讨论第一次进入的情况,也就是false的时候将继续往下执行到了判断事务Propagation的时候了,如果Propagation为:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一个将开启一个新事物,new一个新的DefaultTransactionStatus ,并且

newTransaction设置为true,这个状态很重要,因为后面的不论回滚、提交都是根据这个属性来判断是否在这个TransactionStatus上来进行。

接着将执行doBegin方法:

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
        try {
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();//从dataSource中获取一个Connection
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);//为当前Transaction设置ConnectionHolder,并且设置newConnectionHolder为true
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //这里主要是根据definition对connection进行一些设置
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            if (con.getAutoCommit()) {//开启事务,设置autoCommit为false
                txObject.setMustRestoreAutoCommit(true);
                con.setAutoCommit(false);
            }
            //这里设置transactionActive为true,还记得签名判断是否存在的transaction吧?就是根据这个
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            if (txObject.isNewConnectionHolder()) {
                //这里将当前的connection放入TransactionSynchronizationManager中持有,如果下次调用可以判断为已有的事务
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }
    }

这里其实主要就是从dataSource中获取一个新的connection,形成一个ConnectionHolder,并且放入TransactionSynchronizationManager中持有,记得前面doGetTransaction方法吧,如果同一个线程,再此进入执行的话就会获取到同一个ConnectionHolder,在后面的isExistingTransaction方法也可以判定为是已有的transaction。

接下来将执行prepareSynchronization方法,主要是对TransactionSynchronizationManager的一系列设置。

然后将返回上层代码执行prepareTransactionInfo方法

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            txInfo.newTransactionStatus(status);
        }
        txInfo.bindToThread();
        return txInfo;
    }

这里其实比较简单主要生成一个TransactionInfo并绑定到当前线程的ThreadLocal

    private void bindToThread() {
            this.oldTransactionInfo = transactionInfoHolder.get();
            transactionInfoHolder.set(this);
        }

形成了一个链表,具体啥用我也暂时没看到,唯一看到的就是通过TransactionAspectSupport.currentTransactionStatus()可以获取当前的transaction状态。

然后再返回到上层代码,接着就是执行相应的逻辑代码了

retVal = invocation.proceed();

执行过程的finally代码块将执行cleanupTransactionInfo(txInfo);

    private void restoreThreadLocalStatus() {
            transactionInfoHolder.set(this.oldTransactionInfo);
        }

这里就是将txInfo进行重置工作,让它恢复到前一个状态。

然后就是提交操作(commitTransactionAfterReturning)或者是回滚操作(completeTransactionAfterThrowing)了。

这里就拿提交操作来为例来说明,回滚操作类似:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

实际就是执行的processCommit方法

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    status.releaseHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    doCommit(status);
                }
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

首先将执行一些提交前的准备工作,这里将进行是否有savepoint判断status.hasSavepoint(),如果有的话将进行释放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);

接着就判断是否是新的transaction:status.isNewTransaction(),如果是的话将执行 doCommit(status);

protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

其实也就是调用了Connection的commit()方法。

最后无论成功与否都将调用finally块中的cleanupAfterCompletion(status)

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();//TransactionSynchronizationManager清理工作
        }
        if (status.isNewTransaction()) {
            doCleanupAfterCompletion(status.getTransaction());//这个比较重要
        }
        if (status.getSuspendedResources() != null) {
            resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }

首先对TransactionSynchronizationManager进行一系列清理工作,然后就将执行doCleanupAfterCompletion方法:

    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        if (txObject.isNewConnectionHolder()) {
            //从TransactionSynchronizationManager中解绑相应的connectionHolder
            TransactionSynchronizationManager.unbindResource(this.dataSource);
        }
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            //对获取到的Connection进行一些还原
            if (txObject.isMustRestoreAutoCommit()) {
                con.setAutoCommit(true);
            }//对获取到的Connection进行一些还原
            DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
        }
        catch (Throwable ex) {
        }
        if (txObject.isNewConnectionHolder()) {
            //如果是newConnection将这个链接关闭,如果是连接池将还给连接池
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }
        //这里将这只transactionActive为false
        txObject.getConnectionHolder().clear();
    }

其实就是将TransactionSynchronizationManager中持有的connectionHolder释放,并且还原当前Connection 的状态,然后将对当前的transaction进行清理包括设置transactionActive为false等。

至此整个spring的事务过程也就结束了。

时间: 2024-11-03 21:57:35

spring事务源码分析结合mybatis源码(一)的相关文章

spring事务源码分析结合mybatis源码(三)

下面将结合mybatis源码来分析下,这种持久化框架是如何对connection使用,来达到spring事务的控制. 想要在把mybatis跟spring整合都需要这样一个jar包:mybatis-spring-x.x.x.jar,这里面定义了一些主要的整合信息. 在spring配置文件中需要配置如下两个bean: <!-- mybatis配置 --> <bean id="sqlSessionFactory" class="org.mybatis.sprin

spring事务源码分析结合mybatis源码(二)

让我们继续上篇,分析下如果有第二个调用进入的过程. 代码部分主要是下面这个: if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txOb

mybatis源码分析(四) mybatis与spring事务管理分析

mybatis源码分析(四) mybatis与spring事务管理分析 一丶从jdbc的角度理解什么是事务 从mysql获取一个连接之后, 默认是自动提交, 即执行完sql之后, 就会提交事务. 这种事务的范围是一条sql语句. 将该连接设置非自动提交, 可以执行多条sql语句, 然后由程序决定是提交事务, 还是回滚事务. 这也是我们常说的事务. Connection connection = dataSource.getConnection(); // connection.setTransa

【E2LSH源码分析】E2LSH源码综述及主要数据结构

上一小节,我们对p稳定分布LSH的基本原理进行了介绍(http://blog.csdn.net/jasonding1354/article/details/38237353),在接下来的博文中,我将以E2LSH开源代码为基础,对E2LSH的源码进行注解学习,从而为掌握LSH的基本原理以及未来对相似性搜索的扩展学习打下基础. 1.代码概况 E2LSH的核心代码可以分为3部分: LocalitySensitiveHashing.cpp--主要包含基于LSH的RNN(R-near neighbor)数

cocos2d-x 源码分析 : control 源码分析 ( 控制类组件 controlButton)

源码版本来自3.1rc 转载请注明 cocos2d-x源码分析总目录 http://blog.csdn.net/u011225840/article/details/31743129 1.继承结构 control的设计整体感觉挺美的,在父类control定义了整个控制事件的基础以及管理,虽然其继承了Layer,但其本身和UI组件的实现并没有关联.在子类(controlButton,controlSwitch,controlStepper等中实现不同的UI组件).下面通过源码来分析control与

cocos2d-x 源码分析 之 CCTableView源码分析(附使用方法讨论)

cocos2d-x源码总目录 http://blog.csdn.net/u011225840/article/details/31743129 源码来自2.x,转载请注明 1.继承结构 首先来看下CCTableView的继承结构 从继承结构上看,CCTableView是一种CCScrollView,所以为了研究CCTableView的源码,清先去了解CCScrollView的源码http://blog.csdn.net/u011225840/article/details/30033501. 其

【JDK源码分析】通过源码分析CyclicBarrier

前言 CyclicBarrier它是什么?一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点.类似于朋友之间联系要在中午聚个会,几个朋友全部到齐后才开始喝酒吃菜. 源码 CyclicBarrier属性和构造器 public class CyclicBarrier { // 互斥锁 private final ReentrantLock lock = new ReentrantLock(); // 条件等待 private final Condition trip = lock.new

Spring事务管理全面分析

Spring 事务属性分析什么是事物  事务管理对于企业应用而言至关重要.它保证了用户的每一次操作都是可靠的,即便出现了异常的访问情况,也不至于破坏后台数据的完整性.就像银行的自助取款机,通常都能正常为客户服务,但是也难免遇到操作过程中机器突然出故障的情况,此时,事务就必须确保出故障前对账户的操作不生效,就像用户刚才完全没有使用过取款机一样,以保证用户和银行的利益都不受损失.   简单来说:事物指的是逻辑上的一组操作,这组操作要么全部成功,要么全部失败.在 Spring 中,事务是通过 Tran

【MyBatis源码分析】select源码分析及小结

示例代码 之前的文章说过,对于MyBatis来说insert.update.delete是一组的,因为对于MyBatis来说它们都是update:select是一组的,因为对于MyBatis来说它就是select. 本文研究一下select的实现流程,示例代码为: 1 public void testSelectOne() { 2 System.out.println(mailDao.selectMailById(8)); 3 } selectMailById方法的实现为: 1 public M