Spring框架之事务处理源码分析

1、事务就是以可控的方式对数据资源(数据库,文件系统)进行访问的一组操作。为了保证事务执行前后,数据资源所承载的系统状态处于“正确”状态,事务本身有4个限定属性(ACID):原子性,一致性,隔离性,持久性。

原子性:事务包含的全部操作是一个不可分割的整体,要么全部提交成功,要么全部失败。

一致性:一致性要求事务所包含的操作不能违反数据资源的一致性检查。

隔离性:各个事务之间相互影响的程度,不同的隔离级别决定了各个事物对该数据资源访问的不同行为。隔离性是面向数据资源的并发访问。

        四种隔离级别,由弱到强分别为Read Uncommitted,Read Commited,Repeatable Read,Serializable

           Read Uncommitted: 一个事务可以读取另一个事务没有提交的更新结果,以低的隔离度来寻求较高的性能。事务回滚后,另一个事务看到的就是脏数据。

      Read Committed:默认级别。

      Repeatable Read: 保证在整个事务过程中,对同一笔数据的读取结果是相同的。不管其他事务是否同时在对同一笔数据进行更新,也不管其他事务对同一笔数

                据更新与否。

      Serializable:最严格的隔离级别。所有事务都必须按顺序执行,可以避免所有问题,但是是性能最差的隔离级别,很少场景会使用。通常会使用其他隔离级别

              加上相应的并发锁的机制来控制对数据的访问,这样既保证了系统性能不会损失太大,也能够在一定程度上保证数据的一致性。

      Oracle只支持:Read Committed、Serializable。

      EJB、Spring、JDBC等数据库访问方式,都提供4种隔离级别,但是最终是否以指定的隔离执行,由底层的数据资源来决定。

持久性:一旦事务操作成功提交,对数据所做的变更将被记载并不可逆转。

2、事务管理代码

        Connection connection = null;
        boolean roolback = false;
        DataSource dataSource=null;
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            //jdbc 操作。。。。。。
            connection.commit();
        } catch (SQLException e) {
            roolback = true;
            e.printStackTrace();
        }finally {
            if(connection!=null){
                if (roolback){
                    connection.rollback();
                }
                connection.close();
            }
        }

 Hibernate API的事务管理代码

        try {
            session = sessionFactory.openSession();
            transaction = session.beginTransaction();
            //数据库访问
            session.flush();
            transaction.commit();
        }catch (HibernateException e){
            transaction.rollback();
        }finally {
            session.close();
        }

     

3、JDBC的局部事务控制(事务只涉及到一个数据源)是由同一个java.sql.Connection来完成的,所以要保证两个DAO的数据访问方法会处于一个事务中,我们就得保证它们使用的是同一个java.sql.Connection。

Spring 的事务狂阶设计理念的基本原则是:让事务的关注点和数据访问的关注点相分离。

  当在业务层使用事务的抽象API进行事务界定时,不需要关心事务将要加诸于上的事务资源是什么,对不同的事务资源的管理将由响应的框架实现类来操心。
  当在数据访问层对可能参与事务的数据资源进行访问的时候,只需要使用响应的数据访问API进行数据访问,而不需要关心当前事务资源如何参与事务或是是否需要参与事务。这同样将由事务框架类来打理。

import org.springframework.dao.DataAccessException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
public class Service {
    private PlatformTransactionManager transactionManager;
    public  void serviceMethod(){
        TransactionDefinition definition=null;
        TransactionStatus txStatus = getTransactionManager().getTransaction(definition);
        try {
            //数据库操作  dao1.doDataAccess(); dao2.doDataAccess();
        }catch (DataAccessException e){
            getTransactionManager().rollback(txStatus);
            throw e;
        }catch (Exception e){
            getTransactionManager().rollback(txStatus);
            throw e;
        }
        getTransactionManager().commit(txStatus);
    }
    private PlatformTransactionManager getTransactionManager() {
        return  transactionManager;
    }
}

  

public interface PlatformTransactionManager {
    TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException;

    void commit(TransactionStatus var1) throws TransactionException;

    void rollback(TransactionStatus var1) throws TransactionException;
}

  

如何传递Connection?图19-2虽然也可以保证Dao使用同一个connection,但是导致事务管理代码和数据访问代码之间通过Connection直接耦合。现在是JDBC使用Connection,如果是Hibernate的话就要声明对Session的依赖了。

正确方法:要传递Connection,可以将整个事务对应的java.sql.Connection实例放到统一的地方去,无论是谁,要使用该资源,都从这一个地方获取。具体:我们在事务开始之前取得一个java.sql.Connection,然后将该Connection绑定到当前的调用线程,之后,数据访问对象在使用Connection时,就可以从当前线程上获取这个事务开始的时候绑定的Connection实例。当所有的数据访问对象全部使用这个绑定当前线程的Connection完成了数据访问工作时,我们就使用这个Connection实例提交或者回滚事务,然后解除它到当前线程的绑定。

原型代码(不用于生产环境):

public class TransactionResourceManager {
    private static ThreadLocal resources = new ThreadLocal();
    public static Object getResource(){
        return resources.get();
    }
    public static void bindResource(Object resource){
        resources.set(resource);
    }
    public static Object unbindResource(){
        Object res = getResource();
        resources.set(null);
        return res;
    }
}
public class MyPlatformTransactionManager implements PlatformTransactionManager {
    private DataSource dataSource;
    public MyPlatformTransactionManager(DataSource dataSource){
        this.dataSource = dataSource;
    }
    @Override
    public TransactionStatus getTransaction(TransactionDefinition transactionDefinition) throws TransactionException {
        Connection connection;
        try {
            connection=dataSource.getConnection();
            TransactionResourceManager.bindResource(connection);
            return new DefaultTransactionStatus(connection,true,true,false,true,null);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void commit(TransactionStatus transactionStatus) throws TransactionException {
        Connection connection = (Connection) TransactionResourceManager.unbindResource();
        try {
            connection.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void rollback(TransactionStatus transactionStatus) throws TransactionException {
        Connection connection = (Connection) TransactionResourceManager.unbindResource();

        try {
            connection.rollback();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

 Spring 框架中的DataSourceUtils工具类的最主要的工作是对connection管理。DataSourceUtils会从TransactionResourceManager(类似TransactionResourceManager类)那里获取Connection资源。如果当前线程没有绑定任何connection,那么就通过数据库访问对象的DataSource引用获取新的connection,否则就必须使用绑定的connection,这就是为什么强调,当我们使用Spring提供的事务支持的时候,必须通过DataSourceUtils来获取连接。而jdbcTemplate等类内部已经使用DataSourceUtils来管理连接了,所以我们不用操心细节。

4、Spring的事务抽象包括3个主要接口:即PlatformTransactionManager、TransactionDefinition、TransactionStatus:

PlatformTransactionManager 负责界定事务边界

TransactionDefinition 负责定义事务相关属性,包括隔离级别、传播行为等。

PlatformTransactionManager 将参照TransactionDefinition 的属性定义来开启相关事务。事务开启之后到事务结束期间的事务状态由TranscationStatus负责,我们也可以通过TransactionStatus对事务进行有限的控制。

5、DataSourceTransactionManager 简单介绍:

  

AbstractPlatformTransactionManager:

  • 确定如果有现有的事务;
  • 应用适当的传播行为;
  • 如果有必要暂停和恢复事务;
  • 提交时检查rollback-only标记;
  • 应用适当的修改当回滚(实际回滚或设置rollback-only);
  • 触发同步回调注册(如果事务同步是激活的)

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {     //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同        Object transaction = this.doGetTransaction();

1      public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     Object transaction = this.doGetTransaction();  //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同  
     boolean debugEnabled = this.logger.isDebugEnabled();//获取Log类的debug信息,避免之后的代码重复
     //如果definition参数为空,则创建一个默认的事务定义数据
        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }
     //根据先前获取的transaction object 判断是否存在当前事务,根据判定结果采取不同的处理方式
        if (this.isExistingTransaction(transaction)) {
            return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);(2)
        } else if (((TransactionDefinition)definition).getTimeout() < -1) {
            throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
        } else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {
            throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation ‘mandatory‘");
        } else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
            if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
                this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
            }

            boolean newSynchronization = this.getTransactionSynchronization() == 0;
            return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
            if (debugEnabled) {
                this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
            }

            try {
                boolean newSynchronization = this.getTransactionSynchronization() != 2;
                DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                this.doBegin(transaction, (TransactionDefinition)definition);
                this.prepareSynchronization(status, (TransactionDefinition)definition);
                return status;
            } catch (RuntimeException var7) {
                this.resume((Object)null, suspendedResources);
                throw var7;
            } catch (Error var8) {
                this.resume((Object)null, suspendedResources);
                throw var8;
            }
        }
    }

  return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);//  如果已经存在事务,则做的处理:

2    private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
        if (definition.getPropagationBehavior() == 5) {//如果definition定义的传播行为是propagation_never,则抛出异常并退出
            throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation ‘never‘");
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
            boolean newSynchronization;
            if (definition.getPropagationBehavior() == 4) {//如果definition定义的传播行为是PROPAGATION_NOT_SUPPORTED,则挂起当前事务然后返回
                if (debugEnabled) {
                    this.logger.debug("Suspending current transaction");
                }

                suspendedResources = this.suspend(transaction); (3)
                newSynchronization = this.getTransactionSynchronization() == 0;
                return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
            } else if (definition.getPropagationBehavior() == 3) {
                if (debugEnabled) {
                    this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
                }

                suspendedResources = this.suspend(transaction);

                try {
                    newSynchronization = this.getTransactionSynchronization() != 2;
                    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    this.doBegin(transaction, definition);
                    this.prepareSynchronization(status, definition);
                    return status;
                } catch (RuntimeException var7) {
                    this.resumeAfterBeginException(transaction, suspendedResources, var7);
                    throw var7;
                } catch (Error var8) {
                    this.resumeAfterBeginException(transaction, suspendedResources, var8);
                    throw var8;
                }
            } else {
                boolean newSynchronization;
                if (definition.getPropagationBehavior() == 6) {
                    if (!this.isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify ‘nestedTransactionAllowed‘ property with value ‘true‘");
                    } else {
                        if (debugEnabled) {
                            this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                        }

                        if (this.useSavepointForNestedTransaction()) {
                            DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
                            status.createAndHoldSavepoint();
                            return status;
                        } else {
                            newSynchronization = this.getTransactionSynchronization() != 2;
                            DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
                            this.doBegin(transaction, definition);
                            this.prepareSynchronization(status, definition);
                            return status;
                        }
                    }
                } else {
                    if (debugEnabled) {
                        this.logger.debug("Participating in existing transaction");
                    }

                    if (this.isValidateExistingTransaction()) {
                        if (definition.getIsolationLevel() != -1) {
                            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                                Constants isoConstants = DefaultTransactionDefinition.constants;
                                throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)"));
                            }
                        }

                        if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                            throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
                        }
                    }

                    newSynchronization = this.getTransactionSynchronization() != 2;
                    return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
                }
            }
        }
    }

  

事务挂起:如果当前线程存在事务,但事务传播特性又要求开启新事务,需要将已有的事务进行挂起,事务的挂起及涉及线程与事务信息的保存,实现源码如下:

3、    protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {     //如果事务是激活的,且当前线程事务同步机制也是激活状态
        if (TransactionSynchronizationManager.isSynchronizationActive()) {         //挂起当前线程中所有同步的事务(4)
            List suspendedSynchronizations = this.doSuspendSynchronization();

            try {
                Object suspendedResources = null;
                if (transaction != null) {
                    suspendedResources = this.doSuspend(transaction);
                }
          //在线程中保存与事务处理有关的信息,并将线程里有关的线程局部变量重置
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                TransactionSynchronizationManager.setCurrentTransactionName((String)null);
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null);
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);                //将当前线程中事务相关信息保存
                return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            } catch (RuntimeException var8) {
                this.doResumeSynchronization(suspendedSynchronizations);
                throw var8;
            } catch (Error var9) {
                this.doResumeSynchronization(suspendedSynchronizations);
                throw var9;
            }      //如果事务是激活的,但是事务同步机制不是激活的,则只需要保存事务状态,不需要重置事务相关的线程局部变量
        } else if (transaction != null) {
            Object suspendedResources = this.doSuspend(transaction);
            return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources);
        } else {///事务和事务同步机制都不是激活的,则不要想处理
            return null;
        }
    }

  

4    private List<TransactionSynchronization> doSuspendSynchronization() {
        List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
        Iterator var2 = suspendedSynchronizations.iterator();

        while(var2.hasNext()) {
            TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();
            synchronization.suspend();//if (this.holderActive) { TransactionSynchronizationManager.unbindResource(this.resourceKey); }
        }

        TransactionSynchronizationManager.clearSynchronization();
        return suspendedSynchronizations;
    }

  

原文地址:https://www.cnblogs.com/yaohuiqin/p/9440870.html

时间: 2024-10-09 05:49:39

Spring框架之事务处理源码分析的相关文章

java 1.8 动态代理源码分析

JDK8动态代理源码分析 动态代理的基本使用就不详细介绍了: 例子: class proxyed implements pro{ @Override public void text() { System.err.println("本方法"); } } interface pro { void text(); } public class JavaProxy implements InvocationHandler { private Object source; public Jav

java集合框架10——TreeMap和源码分析(一)

前面讨论完了HashMap和HashTable的源码,这一节我们来讨论一下TreeMap.先从整体上把握TreeMap,然后分析其源码,深入剖析TreeMap的实现. 1. TreeMap简介 TreeMap是一个有序的key-value集合,它内部是通过红-黑树实现的,如果对红-黑树不太了解,请先参考下这篇博文:红-黑树.下面我们先来看看TreeMap的继承关系: java.lang.Object ? java.util.AbstractMap<K, V> ? java.util.TreeM

Spring中Bean命名源码分析

Spring中Bean命名源码分析 一.案例代码 首先是demo的整体结构 其次是各个部分的代码,代码本身比较简单,不是我们关注的重点 配置类 /** * @Author Helius * @Create 2019-10-25-20:16 */ @Configuration @ComponentScan(basePackages = {"service"}) public class SpringConfiguration { } 接口的实现类 public interface Use

(转)Spring对注解(Annotation)处理源码分析1——扫描和读取Bean定义

1.从Spring2.0以后的版本中,Spring也引入了基于注解(Annotation)方式的配置,注解(Annotation)是JDK1.5中引入的一个新特性,用于简化Bean的配置,某些场合可以取代XML配置文件.开发人员对注解(Annotation)的态度也是萝卜青菜各有所爱,个人认为注解可以大大简化配置,提高开发速度,同时也不能完全取代XML配置方式,XML 方式更加灵活,并且发展的相对成熟,这种配置方式为大多数 Spring 开发者熟悉:注解方式使用起来非常简洁,但是尚处于发展阶段,

缓存框架OSCache部分源码分析

在并发量比较大的场景,如果采用直接访问数据库的方式,将会对数据库带来巨大的压力,严重的情况下可能会导致数据库不可用状态,并且时间的消耗也是不能容忍的.在这种情况下,一般采用缓存的方式.将经常访问的热点数据提前加载到内存中,这样能够大大降低数据库的压力. OSCache是一个开源的缓存框架,虽然现在已经停止维护了,但是对于OSCache的实现还是值得学习和借鉴的.下面通过OSCache的部分源码分析OSCache的设计思想. 缓存数据结构 通常缓存都是通过<K,V>这种数据结构存储,但缓存都是应

android-----XUtils框架之HttpUtils源码分析

之前我们对Volley框架源码进行了分析,知道了他适用于频繁的网络请求,但是不太适合post较大数据以及文件的上传操作,在项目中为了弥补Volley的这个缺陷,使用了XUtils框架的HttpUtils实现了文件上传的操作,上一篇博客我们通过HttpUtils实现了照片上传的实例,见:android-----基于XUtils客户端以及服务器端实现,当然文件上传的方法类似于照片上传,有时间的话单独写一篇博客介绍,这篇博客我们从源码角度来分析HttpUtils的实现原理,希望对这几天的学习做个总结:

KopDB 框架学习2——源码分析

我的博客:http://mrfufufu.github.io/ 上次我们主要是对 KopDB 框架的使用进行了分析,它是非常简单有用的.这次主要是对它的源码进行分析,进一步的来了解它的真面目. 点击这里去往 "KopDB 框架学习1--使用" 因为 KopDB 采用的是对象关系映射(ORM)模式,即我们使用的编程语言是面向对象语言,而我们使用的数据库则是关系型数据库,那么将面向对象的语言和面向关系的数据库之间建立一种映射关系,这就是对象关系映射了. 使用 ORM 模式,主要是因为我们平

Spring MVC初始化部分源码分析

首先定位到org.springframework.context.support.AbstractApplicationContext中的refresh()方法: public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { // Prepare this context for refreshing. prepareRefresh

spring boot 2.0 源码分析(三)

通过上一章的源码分析,我们知道了spring boot里面的listeners到底是什么(META-INF/spring.factories定义的资源的实例),以及它是创建和启动的,今天我们继续深入分析一下SpringApplication实例变量中的run函数中的其他内容.还是先把run函数的代码贴出来: /** * Run the Spring application, creating and refreshing a new * {@link ApplicationContext}. *