1、事务就是以可控的方式对数据资源(数据库,文件系统)进行访问的一组操作。为了保证事务执行前后,数据资源所承载的系统状态处于“正确”状态,事务本身有4个限定属性(ACID):原子性,一致性,隔离性,持久性。
原子性:事务包含的全部操作是一个不可分割的整体,要么全部提交成功,要么全部失败。
一致性:一致性要求事务所包含的操作不能违反数据资源的一致性检查。
隔离性:各个事务之间相互影响的程度,不同的隔离级别决定了各个事物对该数据资源访问的不同行为。隔离性是面向数据资源的并发访问。
四种隔离级别,由弱到强分别为Read Uncommitted,Read Commited,Repeatable Read,Serializable
Read Uncommitted: 一个事务可以读取另一个事务没有提交的更新结果,以低的隔离度来寻求较高的性能。事务回滚后,另一个事务看到的就是脏数据。
Read Committed:默认级别。
Repeatable Read: 保证在整个事务过程中,对同一笔数据的读取结果是相同的。不管其他事务是否同时在对同一笔数据进行更新,也不管其他事务对同一笔数
据更新与否。
Serializable:最严格的隔离级别。所有事务都必须按顺序执行,可以避免所有问题,但是是性能最差的隔离级别,很少场景会使用。通常会使用其他隔离级别
加上相应的并发锁的机制来控制对数据的访问,这样既保证了系统性能不会损失太大,也能够在一定程度上保证数据的一致性。
Oracle只支持:Read Committed、Serializable。
EJB、Spring、JDBC等数据库访问方式,都提供4种隔离级别,但是最终是否以指定的隔离执行,由底层的数据资源来决定。
持久性:一旦事务操作成功提交,对数据所做的变更将被记载并不可逆转。
2、事务管理代码
Connection connection = null; boolean roolback = false; DataSource dataSource=null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); //jdbc 操作。。。。。。 connection.commit(); } catch (SQLException e) { roolback = true; e.printStackTrace(); }finally { if(connection!=null){ if (roolback){ connection.rollback(); } connection.close(); } }
Hibernate API的事务管理代码
try { session = sessionFactory.openSession(); transaction = session.beginTransaction(); //数据库访问 session.flush(); transaction.commit(); }catch (HibernateException e){ transaction.rollback(); }finally { session.close(); }
3、JDBC的局部事务控制(事务只涉及到一个数据源)是由同一个java.sql.Connection来完成的,所以要保证两个DAO的数据访问方法会处于一个事务中,我们就得保证它们使用的是同一个java.sql.Connection。
Spring 的事务狂阶设计理念的基本原则是:让事务的关注点和数据访问的关注点相分离。
当在业务层使用事务的抽象API进行事务界定时,不需要关心事务将要加诸于上的事务资源是什么,对不同的事务资源的管理将由响应的框架实现类来操心。
当在数据访问层对可能参与事务的数据资源进行访问的时候,只需要使用响应的数据访问API进行数据访问,而不需要关心当前事务资源如何参与事务或是是否需要参与事务。这同样将由事务框架类来打理。
import org.springframework.dao.DataAccessException; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; public class Service { private PlatformTransactionManager transactionManager; public void serviceMethod(){ TransactionDefinition definition=null; TransactionStatus txStatus = getTransactionManager().getTransaction(definition); try { //数据库操作 dao1.doDataAccess(); dao2.doDataAccess(); }catch (DataAccessException e){ getTransactionManager().rollback(txStatus); throw e; }catch (Exception e){ getTransactionManager().rollback(txStatus); throw e; } getTransactionManager().commit(txStatus); } private PlatformTransactionManager getTransactionManager() { return transactionManager; } }
public interface PlatformTransactionManager { TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException; void commit(TransactionStatus var1) throws TransactionException; void rollback(TransactionStatus var1) throws TransactionException; }
如何传递Connection?图19-2虽然也可以保证Dao使用同一个connection,但是导致事务管理代码和数据访问代码之间通过Connection直接耦合。现在是JDBC使用Connection,如果是Hibernate的话就要声明对Session的依赖了。
正确方法:要传递Connection,可以将整个事务对应的java.sql.Connection实例放到统一的地方去,无论是谁,要使用该资源,都从这一个地方获取。具体:我们在事务开始之前取得一个java.sql.Connection,然后将该Connection绑定到当前的调用线程,之后,数据访问对象在使用Connection时,就可以从当前线程上获取这个事务开始的时候绑定的Connection实例。当所有的数据访问对象全部使用这个绑定当前线程的Connection完成了数据访问工作时,我们就使用这个Connection实例提交或者回滚事务,然后解除它到当前线程的绑定。
原型代码(不用于生产环境):
public class TransactionResourceManager { private static ThreadLocal resources = new ThreadLocal(); public static Object getResource(){ return resources.get(); } public static void bindResource(Object resource){ resources.set(resource); } public static Object unbindResource(){ Object res = getResource(); resources.set(null); return res; } }
public class MyPlatformTransactionManager implements PlatformTransactionManager { private DataSource dataSource; public MyPlatformTransactionManager(DataSource dataSource){ this.dataSource = dataSource; } @Override public TransactionStatus getTransaction(TransactionDefinition transactionDefinition) throws TransactionException { Connection connection; try { connection=dataSource.getConnection(); TransactionResourceManager.bindResource(connection); return new DefaultTransactionStatus(connection,true,true,false,true,null); } catch (SQLException e) { e.printStackTrace(); } return null; } @Override public void commit(TransactionStatus transactionStatus) throws TransactionException { Connection connection = (Connection) TransactionResourceManager.unbindResource(); try { connection.commit(); } catch (SQLException e) { e.printStackTrace(); }finally { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } @Override public void rollback(TransactionStatus transactionStatus) throws TransactionException { Connection connection = (Connection) TransactionResourceManager.unbindResource(); try { connection.rollback(); } catch (SQLException e) { e.printStackTrace(); }finally { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
Spring 框架中的DataSourceUtils工具类的最主要的工作是对connection管理。DataSourceUtils会从TransactionResourceManager(类似TransactionResourceManager类)那里获取Connection资源。如果当前线程没有绑定任何connection,那么就通过数据库访问对象的DataSource引用获取新的connection,否则就必须使用绑定的connection,这就是为什么强调,当我们使用Spring提供的事务支持的时候,必须通过DataSourceUtils来获取连接。而jdbcTemplate等类内部已经使用DataSourceUtils来管理连接了,所以我们不用操心细节。
4、Spring的事务抽象包括3个主要接口:即PlatformTransactionManager、TransactionDefinition、TransactionStatus:
PlatformTransactionManager 负责界定事务边界
TransactionDefinition 负责定义事务相关属性,包括隔离级别、传播行为等。
PlatformTransactionManager 将参照TransactionDefinition 的属性定义来开启相关事务。事务开启之后到事务结束期间的事务状态由TranscationStatus负责,我们也可以通过TransactionStatus对事务进行有限的控制。
5、DataSourceTransactionManager 简单介绍:
AbstractPlatformTransactionManager:
- 确定如果有现有的事务;
- 应用适当的传播行为;
- 如果有必要暂停和恢复事务;
- 提交时检查rollback-only标记;
- 应用适当的修改当回滚(实际回滚或设置rollback-only);
- 触发同步回调注册(如果事务同步是激活的)
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同 Object transaction = this.doGetTransaction();
1 public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = this.doGetTransaction(); //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同
boolean debugEnabled = this.logger.isDebugEnabled();//获取Log类的debug信息,避免之后的代码重复
//如果definition参数为空,则创建一个默认的事务定义数据 if (definition == null) { definition = new DefaultTransactionDefinition(); } //根据先前获取的transaction object 判断是否存在当前事务,根据判定结果采取不同的处理方式 if (this.isExistingTransaction(transaction)) { return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);(2) } else if (((TransactionDefinition)definition).getTimeout() < -1) { throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout()); } else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) { throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation ‘mandatory‘"); } else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) { if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) { this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition); } boolean newSynchronization = this.getTransactionSynchronization() == 0; return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null); } else { AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null); if (debugEnabled) { this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition); } try { boolean newSynchronization = this.getTransactionSynchronization() != 2; DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); this.doBegin(transaction, (TransactionDefinition)definition); this.prepareSynchronization(status, (TransactionDefinition)definition); return status; } catch (RuntimeException var7) { this.resume((Object)null, suspendedResources); throw var7; } catch (Error var8) { this.resume((Object)null, suspendedResources); throw var8; } } }
return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);// 如果已经存在事务,则做的处理:
2 private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == 5) {//如果definition定义的传播行为是propagation_never,则抛出异常并退出 throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation ‘never‘"); } else { AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources; boolean newSynchronization; if (definition.getPropagationBehavior() == 4) {//如果definition定义的传播行为是PROPAGATION_NOT_SUPPORTED,则挂起当前事务然后返回 if (debugEnabled) { this.logger.debug("Suspending current transaction"); } suspendedResources = this.suspend(transaction); (3) newSynchronization = this.getTransactionSynchronization() == 0; return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources); } else if (definition.getPropagationBehavior() == 3) { if (debugEnabled) { this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } suspendedResources = this.suspend(transaction); try { newSynchronization = this.getTransactionSynchronization() != 2; DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); this.doBegin(transaction, definition); this.prepareSynchronization(status, definition); return status; } catch (RuntimeException var7) { this.resumeAfterBeginException(transaction, suspendedResources, var7); throw var7; } catch (Error var8) { this.resumeAfterBeginException(transaction, suspendedResources, var8); throw var8; } } else { boolean newSynchronization; if (definition.getPropagationBehavior() == 6) { if (!this.isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify ‘nestedTransactionAllowed‘ property with value ‘true‘"); } else { if (debugEnabled) { this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (this.useSavepointForNestedTransaction()) { DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null); status.createAndHoldSavepoint(); return status; } else { newSynchronization = this.getTransactionSynchronization() != 2; DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null); this.doBegin(transaction, definition); this.prepareSynchronization(status, definition); return status; } } } else { if (debugEnabled) { this.logger.debug("Participating in existing transaction"); } if (this.isValidateExistingTransaction()) { if (definition.getIsolationLevel() != -1) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)")); } } if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } newSynchronization = this.getTransactionSynchronization() != 2; return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null); } } } }
事务挂起:如果当前线程存在事务,但事务传播特性又要求开启新事务,需要将已有的事务进行挂起,事务的挂起及涉及线程与事务信息的保存,实现源码如下:
3、 protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException { //如果事务是激活的,且当前线程事务同步机制也是激活状态 if (TransactionSynchronizationManager.isSynchronizationActive()) { //挂起当前线程中所有同步的事务(4) List suspendedSynchronizations = this.doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = this.doSuspend(transaction); } //在线程中保存与事务处理有关的信息,并将线程里有关的线程局部变量重置 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName((String)null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); //将当前线程中事务相关信息保存 return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException var8) { this.doResumeSynchronization(suspendedSynchronizations); throw var8; } catch (Error var9) { this.doResumeSynchronization(suspendedSynchronizations); throw var9; } //如果事务是激活的,但是事务同步机制不是激活的,则只需要保存事务状态,不需要重置事务相关的线程局部变量 } else if (transaction != null) { Object suspendedResources = this.doSuspend(transaction); return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources); } else {///事务和事务同步机制都不是激活的,则不要想处理 return null; } }
4 private List<TransactionSynchronization> doSuspendSynchronization() { List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations(); Iterator var2 = suspendedSynchronizations.iterator(); while(var2.hasNext()) { TransactionSynchronization synchronization = (TransactionSynchronization)var2.next(); synchronization.suspend();//if (this.holderActive) { TransactionSynchronizationManager.unbindResource(this.resourceKey); } } TransactionSynchronizationManager.clearSynchronization(); return suspendedSynchronizations; }
原文地址:https://www.cnblogs.com/yaohuiqin/p/9440870.html