在.NET 1.x中,基本上是通过ADO.NET实现对不同数据库访问的事务。.NET 2.0增加了System.Transactions名称空间,为.NET应用程序带来了一个新的事务变成模型。
所有的事务组件或者类型均定义在System.Transactions程序集中的System.Transactions命名空间下,我们直接称基于此的事务为System.Transactions事务。
System.Transactions事务变成模型使我们可以显式(通过System.Transactions.Transaction)或者隐式(System.Transactions.TransactionScope)的方式进行事务编程。
一、System.Transactions.Transacion
Transaction是所有事务处理类的基类,并定义了所有事务类都可以使用的属性、方法和事件。
CommittableTransaction是唯一一个支持提交的事务类。因为这个类中有一个Commit()方法,可以实现提交事务。其他的事务中都没有Commit()方法,因此只能执行回滚。
DependentTransaction类用于依赖于其他事务的事务,依赖的事务可以依赖从可提交的事务中创建的事务,不管事务处理是否成功,都要把依赖的事务添加到可提交的事务的结果中。
SubordinateTransaction类和分布式事务协调器(DTC)一起使用,这个类表示非根事务,但可以由DTC管理。
1: [Serializable] 2: public class Transaction : IDisposable, ISerializable 3: { 4: public event TransactionCompletedEventHandler TransactionCompleted; 5: 6: public Transaction Clone(); 7: public DependentTransaction DependentClone(DependentCloneOption cloneOption); 8: 9: public Enlistment EnlistDurable(Guid resourceManagerIdentifier, IEnlistmentNotification enlistmentNotification, EnlistmentOptions enlistmentOptions); 10: public Enlistment EnlistDurable(Guid resourceManagerIdentifier, ISinglePhaseNotification singlePhaseNotification, EnlistmentOptions enlistmentOptions); 11: public bool EnlistPromotableSinglePhase(IPromotableSinglePhaseNotification promotableSinglePhaseNotification); 12: public Enlistment EnlistVolatile(IEnlistmentNotification enlistmentNotification, EnlistmentOptions enlistmentOptions); 13: public Enlistment EnlistVolatile(ISinglePhaseNotification singlePhaseNotification, EnlistmentOptions enlistmentOptions); 14: 15: public void Rollback(); 16: public void Rollback(Exception e); 17: 18: void ISerializable.GetObjectData(SerializationInfo serializationInfo, StreamingContext context); 19: 20: public static Transaction Current { get; set; } 21: 22: public IsolationLevel IsolationLevel { get; } 23: public TransactionInformation TransactionInformation { get; } 24: }
1、Transaction是可序列化的
从上面的定义我们可以看到,Transaction类型上面应用的SerializableAttribute特性,并且实现了ISerializable接口,意味着一个Transaction对象时可以被序列化的,Transaction的这一特性在WCF整个分布式事务的意义重大,原因很简单:要让事务能够控制整个服务操作,必须实现事务的传播,而传播的前提就是事务可被序列化。
2、Transaction类的属性和方法
Transaction类的成员 | 说明 |
Current | Current属性是一个静态属性,不需要有实例,Transaction.Current返回一个环境事务处理(如果存在),环境事务处理在后面讨论 |
IsolationLevel |
IsolationLevel属性返回一个IsolationLevel类型的对象,IsolationLevel是一个枚举,它定义了其他事务必须有什么访问权限才能访问事务的临时结果。 它会影响ACID中的“I”;并不是所有的事务处理都是隔离的。 |
TransactionInformation | TransactionInformation属性返回一个TransactionInformation对象,该对象提供了事务的当前状态信息、事务的创建时间和事务标示符。 |
EnlistVolatile() EnlistDurable() EnlistPromotableSinglePhase() |
使用登记方法EnlistVolatile()、EnlistDurable()和EnlistPromotableSinglePhase(),可以登记参与事务处理的自定义资源管理器。 |
RollBack() | 使用RoolBack()方法,可以终止一个事务,撤销所有的改变,把所有的结果设置为事务处理之前的状态。 |
DependentClone() | 使用DependentClone()方法,可以创建一个依赖当前事务的事务。 |
TransactionCompleted |
TransactionCompleted是一个事件,在事务完成时触发---事务可能成功,也可能失败,在TransactionCompletedEventHandler类型的时间处理程序 中,可以访问Transaction对象,并读取其状态。 |
public static class Utilities { public static bool AbortTx() { Console.Write("Abort the Transaction (y/n)?"); return Console.ReadLine().ToLower().Equals("y"); } public static void DisplayTransactionInformation(string title, TransactionInformation ti) { Contract.Requires<ArgumentNullException>(ti != null); Console.WriteLine(title); Console.WriteLine("Creation Time: {0:T}", ti.CreationTime); Console.WriteLine("Status: {0}", ti.Status); Console.WriteLine("Local ID: {0}", ti.LocalIdentifier); Console.WriteLine("Distributed ID: {0}", ti.DistributedIdentifier); Console.WriteLine(); } }
AbortTx()方法---根据用户的输入返回true或false,如果输入y则返回true,如果输入n,则返回false。
DisplayTransactionInformation()方法----讲一个TransactionInformation对象ti作为参数,显示事务中的所有信息:创建时间、状态、本地标识符、分布式标识符。
2、如何登记事务参与者
在Transaction中,定义了五个EnlistXxx方法用于将涉及到的资源管理器登记到当前事务中。其中EnlistDurable和EnlistVolatile分别实现了对持久化资源管理器和可变资源管管理器的事务登记,而EnlistPromotableSinglePhase则针对的是可被提升的资源管理器(比如基于SQL Server 2005和SQL Server 2008)。
事务登记的目的是建立事务提交树,使得处于根节点的事务管理器能够在事务提交的时候能够沿着这棵树将相应的通知发送给所有的事务参与者。这种至上而下的通知机制依赖于具体采用事务提交协议,或者说某个资源要求参与到当前事务之中,必须满足基于协议需要的接收和处理相应通知的能力。System.Transactions将不同事务提交协议对参与者的要求定义在相应的接口中。其中IEnlistmentNotification和ISinglePhaseNotification分别是基于2PC和SPC(关于2PC和SPC,在上篇中有详细的介绍)。
如果我们需要为相应的资源开发能够参与到System.Transactions事务的资源管理器,需要事先实现IEnlistmentNotification接口,对基本的2PC协议提供支持。当满足SPC要求的时候,如果希望采用SPC优化协议,则需要实现ISinglePhaseNotification接口。如果希望像SQL Server 2005或者SQL Server 2008支持事务提升机制,则需要实现IPromotableSinglePhaseNotification接口。
该方法使用System.Transactions.Transaction对象tx作为第二个参数,调用SqlConnection类的EnlistTransaction方法,以便用连接登记tx对象,这样ADO.NET连接就关联到事务上了。
如果使用MySQL,则SqlConnection改为MySqlConnection,其他无需改动。
public async Task AddStudentAsync(Student student, Transaction tx) { // Contract.Requires<ArgumentNullException>(student != null); SqlConnection connection = new SqlConnection( Properties.Settings.Default.CourseManagementConnectionString); await connection.OpenAsync(); try { if (tx != null) connection.EnlistTransaction(tx); SqlCommand command = connection.CreateCommand(); command.CommandText = "INSERT INTO Students (FirstName, LastName, Company) " + "VALUES (@FirstName, @LastName, @Company)"; command.Parameters.AddWithValue("@FirstName", student.FirstName); command.Parameters.AddWithValue("@LastName", student.LastName); command.Parameters.AddWithValue("@Company", student.Company); await command.ExecuteNonQueryAsync(); } finally { connection.Close(); } }
3、环境事务
Transaction定义了一个类型为Transaction的Current静态属性(可读可写),表示当前的事务。作为当前事务的Transaction存储于当前线程的TLS(Thread Local Storage)中(实际上是定义在一个应用了ThreadStaticAttribute特性的静态字段上),所以仅对当前线程有效。如果进行异步调用,当前事务并不能自动事先跨线程传播,将异步操作纳入到当前事务,需要使用到另外一个事务:依赖事务。
这种基于当前线程的当前事务又称环境事务(Ambient Transaction),很多资源管理器都具有对环境事务的感知能力。也就是说,如果我们通过Current属性设置了环境事务,当对某个具有环境事务感知能力的资源管理器进行访问的时候,相应的资源管理器会自动登记到当前事务中来。我们将具有这种感知能力的资源管理器称为System.Transactions资源管理器。
4、事务标识
Transaction具有一个只读的TransactionInformation属性,表示事务一些基本的信息。属性的类型为TransactionInformation,定义如下:
1: public class TransactionInformation 2: { 3: public DateTime CreationTime { get; } 4: public TransactionStatus Status { get; } 5: 6: public string LocalIdentifier { get; } 7: public Guid DistributedIdentifier { get; } 8: }
TransactionInformation的CreationTime和Status表示创建事务的时间和事务的当前状态。事务具有活动(Active)、提交(Committed)、中止(Aborted)和未决(In-Doubt)四种状态,通过TransactionStatus枚举表示。
1: public enum TransactionStatus 2: { 3: Active, 4: Committed, 5: Aborted, 6: InDoubt 7: }
事务具有两个标识符,一个是本地标识,另一个是分布式标识,分别通过TransactionInformation的只读属性LocalIdentifier和DistributedIdentifier表示。本地标识由两部分组成:标识为本地应用程序域分配的轻量级事务管理器(LTM)的GUID和一个递增的整数(表示当前LMT管理的事务序号)。在下面的代码中,我们分别打印出三个新创建的可提交事务(CommittableTransaction,为Transaction的子类,我们后面会详细介绍)的本地标识。
1: using System; 2: using System.Transactions; 3: class Proggram 4: { 5: static void Main() 6: { 7: Console.WriteLine(new CommittableTransaction().TransactionInformation.LocalIdentifier); 8: Console.WriteLine(new CommittableTransaction().TransactionInformation.LocalIdentifier); 9: Console.WriteLine(new CommittableTransaction().TransactionInformation.LocalIdentifier); 10: } 11: }
输出结果:
AC48F192-4410-45fe-AFDC-8A890A3F5634:1 AC48F192-4410-45fe-AFDC-8A890A3F5634:2 AC48F192-4410-45fe-AFDC-8A890A3F5634:3
一旦本地事务提升到基于DTC的分布式事务,系统会为之生成一个GUID作为其唯一标识。当事务跨边界执行的时候,分布式事务标识会随着事务一并被传播,所以在不同的执行上下文中,你会得到相同的GUID。分布式事务标识通过TransactionInformation的只读属性DistributedIdentifier表示,我经常在审核(Audit)中使用该标识。
对于上面Transaction的介绍,细心的读者可能会发现两个问题:Transaction并没有提供公有的构造函数,意味着我们不能直接通过new操作符创建Transaction对象;Transaction只有两个重载的Rollback方法,并没有Commit方法,意味着我们直接通过Transaction进行事务提交。
在一个分布式事务中,事务初始化和提交只能有相同的参与者担当。也就是说只有被最初开始的事务才能被提交,我们将这种能被初始化和提交的事务称作可提交事务(Committable Transaction)。随着分布式事务参与者逐个登记到事务之中,它们本地的事务实际上依赖着这个最初开始的事务,所以我们称这种事务为依赖事务(Dependent Transaction)。
二、可提交事务
Transaction类不能以编程方式提交,它没有提交事务的方法, 基类Transaction只支持事务处理的终止,唯一支持事务提交的类是CommittableTransaction类。
我们来看看CommittableTransaction类的定义:
[Serializable] public sealed class CommittableTransaction : Transaction, IAsyncResult { public CommittableTransaction(); public CommittableTransaction(TimeSpan timeout); public CommittableTransaction(TransactionOptions options); public IAsyncResult BeginCommit(AsyncCallback asyncCallback, object asyncState); public void Commit(); public void EndCommit(IAsyncResult asyncResult); }
1、可提交事务的构造函数
CommittableTransaction直接继承自Transaction,提供了三个共有的构造函数。
(1)默认无参数的构造函数(public CommittableTransaction())
调用默认无参数的构造函数来创建CommittableTransaction对象,意味着采用一个默认的超时时限。这个默认的时间是1分钟,不过它可以通过配置的方式进行指定。事务超时相关的参数定义在<system.tranactions>配置节中(app.config文件),下面的XML体现的是默认的配置。从该段配置哦我们可以看到,我们不但可以通过<defaultSettings>配置事务默认的超时时限,还可以通过<machineSettings>设置最高可被允许的事务超时时限,默认为10分钟,在对这两项进行配置的时候,前者的时间必须小于后者,否则将用后者作为事务默认的超时时限。
1: <?xml version="1.0" encoding="utf-8" ?> 2: <configuration> 3: <system.transactions> 4: <defaultSettings timeout="00:01:00"/> 5: <machineSettings maxTimeout="00:10:00"/> 6: </system.transactions> 7: </configuration>
(2)带timeout参数的构造函数(public CommittableTransaction(TimeSpan timeout))
通过TimeSpan类型的timeout参数指定事务的超时实现,自被初始化那一刻开始算起,一旦超过了该时限,事务会被终止。
(3)带options参数的构造函数
通过TransactionOptions类型的Options可以同时指定事务的超时时间和隔离级别。TransactionOptions是一个定义在System.Transactions命名空间下的结构,定义如下:
public struct TransactionOptions { public static bool operator !=(TransactionOptions x, TransactionOptions y); public static bool operator ==(TransactionOptions x, TransactionOptions y); public IsolationLevel IsolationLevel { get; set; } public TimeSpan Timeout { get; set; } public override bool Equals(object obj); public override int GetHashCode(); }
作为事务ACID四大属性之一的隔离性(Isolation),确保事务操作的中间状态的可见性仅限于事务内部,隔离机制通过对访问的数据进行加锁,防止数据被事务的外部程序操作,从而确保了数据的一致性。但是隔离机制在另一方面又约束了对数据的并发操作,降低数据操作的整体性能,为了权衡两个互相矛盾的两个方面,我们可以根据具体的情况选择相应的隔离级别。
接下来我们先来探讨一下隔离级别:
如果不完全隔离事务外部的作用域,就可能引发以下问题:
- 脏读:另一个事务可以读取一个事务中改变的记录,因为在一个事务中改变的记录可能回滚到最初的状态,所以从另一个事务中读取这个临时状态就称为“脏读”,----数据并没有提交,通过锁定要改变的记录就可以避免这个问题。(读取了别的事务没提交的记录B,而这个记录可能无效,回滚到以前A,但是已经读取的却是B,不是A)
- 不可重复读:当事务在数据中读取,而该事务运行的同时,另一个事务修改了相同的记录,此时就会出现不可重复读操作,如果该记录在十五中读取多次,结果就会不同,----不可重复,锁定读取的记录,即可避免这个问题。(事务1第一次读取了记录A,事务2修改了记录成为记录2,事务1再次读的时候就编程了记录2,两次读取的记录不一样,所以是不重复)
- 幻读:读取一个范围的数据时,另一个事务在添加记录,再次读取的时候,就会多读出一条记录,幻读一般出现在更新一个范围内的记录,一个事务在更新记录,另一个在添加,更新后,事务再读,会发现有没被更新的记录。---可以通过范围锁来避免。
在System.Transactions事务体系中,为事务提供了其中不同的隔离级别,这7种隔离级别分别通过System.Transactions.IsolationLevel的7个枚举项表示。
public enum IsolationLevel { Serializable = 0, RepeatableRead = 1, ReadCommitted = 2, ReadUncommitted = 3, Snapshot = 4, Chaos = 5, Unspecified = 6, }
这七个隔离级别中,Serilizable具有最高隔离级别,代表的是一种完全基于序列化(同步)的数据存取方式,这也是System.Transactions事务默认采用的隔离级别,按照隔离级别至高向低,7个不同的隔离级别代表的含义如下:
- Serializable:可以在事务期间读取可变数据,但是不可以修改,也不可以添加任何新数据。
- RepeatableRead:可以在事务期间读取可变数据,但是不可以修改,可以在事务期间添加新数据。
- ReadCommitted:不可以在事务期间读取可变数据,但是可以修改它;
- ReadUncommitted:可以在事务期间读取和修改可变数据;
- Snapshot:可以读取可变数据,在事务修改数据之前,它验证在它最初读取数据之后另一个事务是否更改过这些数据,如果已经被更新,则会引发错误,这样使事务科获取先前提交的数据值;
- chaos:无法覆盖隔离级别更高的事务中的挂起的更改;
- Unspecified:正在使用与制定隔离级别不同的隔离级别,但是无法确定该级别,如果设置了此值,则会引发异常。
以上事务隔离级别可能引发的问题:
隔离级别 | 脏读 | 不可重复读 | 幻读 |
ReadUnCommitted | Y | Y | Y |
ReadCommitted | N | Y | Y |
RepeatableRead | N | N | Y |
Serializable | N | N | N |
以下代码说明了如何使用TransactionScope类设置隔离级别
var options = new Transactions { IsolationLevel = IsolationLevel.ReadUncommitted, Timeout = TimeSpan.FromSeconds(90); }; using(var scope = new TransactionScope( TransactionScopeOption.Required,options)) { //Read data without waiting for lcoks from other transactions, //dirty reads are possible }
2、事务的提交
(1)异步提交事务
通过Begin Commit|EndCommit方法组合实现对事务的异步提交。
public IAsyncResult BeginCommit(AsyncCallback asyncCallback, object asyncState);
public void EndCommit(IAsyncResult asyncResult);
CommittableTransaction还是实现了IAsyncResult这么一个接口,如果采用异步的方式调用BeginCommit方式提交事务,方法返回的IAsyncResult对象的各属性值会反映在CommittableTransaction同名属性上面。
(2)同步提交事务
通过Commit方法实现对事务的同步提交
public void Commit();
前面我们提到的环境事务已经System.Transactions资源管理器对环境事务的自动感知能力,当创建了CommittableTransaction对象的时候,被创建的事务并不会自动作为环境事务,你需要手动将其指定到Transaction的静态Current属性中,接下来,我们将通过一个简单的例子演示如何通过CommittableTransaction实现一个分布式事务。
3、实例演示:通过CommittableTransaction实现分布式事务
在这个实例演示中,我们沿用介绍事务显式控制时使用到的银行转帐的场景,并且直接使用第一篇中创建的帐户表(T_ACCOUNT)。一个完整的转帐操作本质上有两个子操作完成,提取和存储,即从一个帐户中提取相应的金额存入另一个帐户。为了完成这两个操作,我写了如下两个存储过程:P_WITHDRAW和P_DEPOSIT。
P_WITHDRAW:
1: CREATE Procedure P_WITHDRAW 2: ( 3: @id VARCHAR(50), 4: @amount FLOAT 5: ) 6: AS 7: IF NOT EXISTS(SELECT * FROM [dbo].[T_ACCOUNT] WHERE ID = @id) 8: BEGIN 9: RAISERROR (‘帐户ID不存在‘,16,1) 10: RETURN 11: END 12: IF NOT EXISTS(SELECT * FROM [dbo].[T_ACCOUNT] WHERE ID = @id AND BALANCE > @amount) 13: BEGIN 14: RAISERROR (‘余额不足‘,16,1) 15: RETURN 16: END 17: 18: UPDATE [dbo].[T_ACCOUNT] SET Balance = Balance - @amount WHERE Id = @id 19: GO
P_DEPOSIT
1: CREATE Procedure P_DEPOSIT 2: ( 3: @id VARCHAR(50), 4: @amount FLOAT 5: ) 6: AS 7: IF NOT EXISTS(SELECT * FROM [dbo].[T_ACCOUNT] WHERE Id = @id) 8: BEGIN 9: RAISERROR (‘帐户ID不存在‘,16,1) 10: END 11: UPDATE [dbo].[T_ACCOUNT] SET Balance = Balance + @amount WHERE Id = @id 12: GO
为了确定是否成功转帐,我们需要提取相应帐户的当前余额,我们相应操作实现在下面一个存储过程中。
1: CREATE Procedure P_GET_BALANCE_BY_ID 2: ( 3: @id VARCHAR(50) 4: ) 5: AS 6: IF NOT EXISTS(SELECT * FROM [dbo].[T_ACCOUNT] WHERE Id = @id) 7: BEGIN 8: RAISERROR (‘帐户ID不存在‘,16,1) 9: END 10: SELECT BALANCE FROM [dbo].[T_ACCOUNT] WHERE Id = @id 11: GO
为了执行存储过程的方便,我写了一个简单的工具类DbAccessUtil。ExecuteNonQuery和ExecuteScalar的作用于DbCommand同名方法相同。使用DbAccessUtil的这两个方法,只需要以字符串和字典的方式传入存储过程名称和参数即可。由于篇幅所限,关于具有实现不再多做介绍了,又兴趣的读者,可以参考《WCF技术剖析(卷1)》的最后一章,里面的DbHelper提供了相似的实现。
1: public static class DbAccessUtil 2: { 3: public static int ExecuteNonQuery(string procedureName, IDictionary<string, object> parameters); 4: public static T ExecuteScalar<T>(string procedureName, IDictionary<string, object> parameters); 5: }
借助于DbAccessUtil提供的辅助方法,我们定义两个方法Withdraw和Deposit分别实现提取和存储的操作,已近获取某个帐户当前余额。
1: static void Withdraw(string accountId, double amount) 2: { 3: Dictionary<string, object> parameters = new Dictionary<string, object>(); 4: parameters.Add("id", accountId); 5: parameters.Add("amount", amount); 6: DbAccessUtil.ExecuteNonQuery("P_DEPOSIT", parameters); 7: } 8: static void Deposite(string accountId, double amount) 9: { 10: Dictionary<string, object> parameters = new Dictionary<string, object>(); 11: parameters.Add("id", accountId); 12: parameters.Add("amount", amount); 13: DbAccessUtil.ExecuteNonQuery("P_DEPOSIT", parameters); 14: } 15: private static double GetBalance(string accountId) 16: { 17: Dictionary<string, object> parameters = new Dictionary<string, object>(); 18: parameters.Add("id", accountId); 19: return DbAccessUtil.ExecuteScalar<double>("P_GET_BALANCE_BY_ID", parameters); 20: }
现在假设帐户表中有一个帐号,它们的ID分别为Foo,余额为5000。下面是没有采用事务机制的转帐实现(注意:需要转入的帐户不存在)。
1: using System; 2: using System.Collections.Generic; 3: namespace Artech.TransactionDemo 4: { 5: class Program 6: { 7: static void Main(string[] args) 8: { 9: string accountFoo = "Foo"; 10: string nonExistentAccount = Guid.NewGuid().ToString(); 11: //输出转帐之前的余额 12: Console.WriteLine("帐户\"{0}\"的当前余额为:¥{1}", accountFoo, GetBalance(accountFoo)); 13: //开始转帐 14: try 15: { 16: Transfer(accountFoo, nonExistentAccount, 1000); 17: } 18: catch (Exception ex) 19: { 20: Console.WriteLine("转帐失败,错误信息:{0}", ex.Message); 21: } 22: //输出转帐后的余额 23: Console.WriteLine("帐户\"{0}\"的当前余额为:¥{1}", accountFoo, GetBalance(accountFoo)); 24: } 25: 26: private static void Transfer(string accountFrom, string accountTo, double amount) 27: { 28: Withdraw(accountFrom, amount); 29: Deposite(accountTo, amount); 30: } 31: } 32: }
输出结果:
帐户"Foo"的当前余额为:¥5000 转帐失败,错误信息:帐户ID不存在 帐户"Foo"的当前余额为:¥4000