MongoDB 操作手册CRUD 事务 两步提交

执行两步提交

概述

这部分提供了多记录更新或者多记录事务,使用两步提交来完成多记录写入的模板。另外,可以扩展此方法来提供rollback-like功能。

背景

MongoDB对于单条记录的操作是原子性的;但是涉及多条记录的操作却不是原子性的。由于记录可能是相当复杂,并且有内嵌记录,单记录原子性操作提供了实际中常用的必要支持。

除了单记录的原子性操作,还有许多情况需要多记录操作事务,当执行一个包含一些列操作的事务时,就有以下要求:

原子性:如果一个操作失败,事务中之前的操作需要回滚到之前的状态

一致性:如果一个重大失误,比如网络故障,硬件故障,中断了事务,数据库必须能够恢复到之前的状态

对于需要多记录操作的事务,可以在应用中实现两步提交的方法,来提供多记录更新支持。使用这种方法保证了一致性,并且万一出现错误,事务的执行状态是可恢复的。然而在这个过程中,记录处于未定的数据和状态。

注意:因为MongoDB只有单记录操作是原子性的,两步提交只能提供语义上的“类事务”功能。对于应用来说,使其能够回到在两步提交中的某个状态的中间数据或者回滚数据。

模板

考虑以下情景:

要将资金从账户A转移到账户B,在关系型数据库中,可以在一个事务中从A中减去资金,同时在B中加上。在MongoDB中,可以模拟两步提交来获得相同结果。

这个例子使用两个集合

1.accounts,用于存储账户信息

2.transactions,用于存储资金转移事务的信息

初始化账户信息

db.accounts.insert(

[

{ _id: "A", balance: 1000, pendingTransactions: [] },

{ _id: "B", balance: 1000, pendingTransactions: [] }

]

);

初始化转账记录

对于每次资金转移操作,将转账信息添加到transactions集合中,插入的记录包含以下信息:

source和destination字段,引用自ccounts集合中的_id字段

value字段,声明转移数值

state字段,表明当前转移状态,值可以是initial,pending, applied, done, canceling, 或者 canceled.

lastModified字段,反应最后修改日期

从A转账100到B,初始化transactions记录:

db.transactions.insert({ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() });

使用两步提交进行转账

1.从transactions集合中,找到state为initial的记录。此时transactions集合中只有一条记录,即刚插入的那条。在包含其他记录的集合中,除非你声明了其他查询条件,否则这个查询将返回任何state为initial的记录。

var t = db.transactions.findOne( { state: "initial" } );

在MongoDB的shell中输入t,查看t的内容,类似于:

{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified":??}

2.更新事务状态为pending

设置state为pending,lastModified为当前时间

db.transactions.update(

{ _id: t._id, state: "initial" },

{

$set: { state: "pending" },

$currentDate: { lastModified: true }

}

)

在更新操作中,state:‘initial‘确保没有其他进程已经更新了这条记录。如果nMatched和nModified是0,回到第一步,获取一个新的事务,重新开始这个过程。

3.在两个账户中应用该事务

使用update方法将事务t应用到两个账户中。在更新条件中,包含条件pendingTransactions:{$ne:t._id},以避免重复应用该事务。

db.accounts.update(

{ _id: t.source, pendingTransactions: { $ne: t._id } },

{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }

)

db.accounts.update(

{ _id: t.destination, pendingTransactions: { $ne: t._id } },

{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }

)

从A账号减去t.value,给B账户加上t.value,同时给每个账户的pendingTransactions数组添加事务id

4.更新事务状态为applied

db.transactions.update(

{ _id: t._id, state: "pending" },

{

$set: { state: "applied" },

$currentDate: { lastModified: true }

}

)

5.更新账户pendingTransactions数组

db.accounts.update(

{ _id: t.source, pendingTransactions: t._id },

{ $pull: { pendingTransactions: t._id } }

)

db.accounts.update(

{ _id: t.destination, pendingTransactions: t._id },

{ $pull: { pendingTransactions: t._id } }

)

从两个账户中移除已应用的事务。

6.更新事务状态为done

db.transactions.update(

{ _id: t._id, state: "applied" },

{

$set: { state: "done" },

$currentDate: { lastModified: true }

}

)

从失败场景中恢复数据

事务最重要的不是以上这个例子提供的原型,而是当事务没有完全执行成功的时候,从各种失败场景中恢复数据的可能性。

恢复操作

两步提交模型允许应用程序重新执行事务操作序列,以保证数据一致性。在程序启动时,或者定时执行恢复操作,来抓取任何未完成的事务。

恢复到数据一致的状态的时间取决于应用程序多久需要恢复每个事务。

以下恢复操作使用lastModified日期作为pending状态的事务是否需要回滚的标识符。如果pending或者applied状态的事务在最近的30分钟内没有被更新,说明这些事务需要被恢复。可以使用不同的条件来决定是否需要恢复。

Pending状态的事务

恢复事务状态在pending之后,applied之前

例:

获取三十分钟内未成功的事务记录

var dateThreshold = new Date();

dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );

然后回到“3.在两个账户中应用该事务”这一步

Applied状态的事务

例:

获取三十分钟内未成功的事务记录

var dateThreshold = new Date();

dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );

然后回到“5.更新账户pendingTransactions数组”这一步

回滚操作

有些情况下,可能需要回滚,或者撤销操作,比如,应用程序需要取消事务,或者其中一个账户不存在或者被冻结。

Applied状态的事务

在“4.更新事务状态为applied”这一步之后,不应该再回滚事务,而是应该完成当前事务,然后创建一个新的事务来把数据修改回来。

Pending状态的事务

在“2.更新事务状态为pending”这一步之后,“4.更新事务状态为applied”这一步之前,可以通过以下步骤回滚事务:

1.更新事务状态为取消中

db.transactions.update(

{ _id: t._id, state: "pending" },

{

$set: { state: "canceling" },

$currentDate: { lastModified: true }

}

)

2.在两个账户中取消操作

如果事务已经应用,需要回退这个事务以取消在两个账户上的操作。在更新的条件中,包含pendingTransactions:t._id,以便在pending transaction已经被应用的时候更新账户。

更新目标账户,减去事务中给其增加的值,cong pendingTransactions数组中移除事务_id

db.accounts.update(

{ _id: t.destination, pendingTransactions: t._id },

{

$inc: { balance: -t.value },

$pull: { pendingTransactions: t._id }

}

)

如果pending transaction还没有被应用到这个账户中,将不会有记录匹配查询条件。

3.更新事务状态为已取消

db.transactions.update(

{ _id: t._id, state: "canceling" },

{

$set: { state: "cancelled" },

$currentDate: { lastModified: true }

}

)

更新事务状态为cancelled来标志事务已取消。

多应用情景

由于事务的存在,多个应用可以同时创建和执行操作,而不会产生数据不一致或者冲突。在之前的例子中,更新或者回滚记录,包含state字段的更新条件防止不同应用重复提交事务

例如,app1和app2同时获取了一个在initial状态的事务。app1在app2开始前提交了整个事务。当app2试图更新事务状态为pending的时候,包含state:‘initial‘的更新条件将不会匹配任何记录,

同时nMatched和nModified将为0.这就表明app2需要回到第一步,重启一个不同的事务过程。

当多个应用运行的时候,关键在有只有一个应用可以及时处理指定的事务。这样的话,即使在有符合更新条件的记录,也可以在事务记录中创建一个标记来标志应用正在处理这个事务。使用findAndModify()方法来修改事务,并且回退。

t = db.transactions.findAndModify(

{

query: { state: "initial", application: { $exists: false } },

update:

{

$set: { state: "pending", application: "App1" },

$currentDate: { lastModified: true }

},

new: true

}

)

修正后的事务操作确保只有标识符匹配的应用可以提交该事务。

如果app1在事务执行中失败,可以使用之前讲的恢复操作,但是应用程序需要在应用事务之前确保“拥有”该事务。

例如,查找并恢复pending状态的job

var dateThreshold = new Date();

dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);

db.transactions.find(

{

application: "App1",

state: "pending",

lastModified: { $lt: dateThreshold }

}

)

在生产环境中使用两步提交

以上的例子是有意写的很简单。例如,它假设一个账户的回滚操作总是可能的,并且账户可以保存负值。

生产环境可能会更加负值,通常来说,账户需要当前账户值,信用,欠款等多种信息。

对于所有事务,要确保使用的是write concern权限等级。

时间: 2024-08-11 01:36:23

MongoDB 操作手册CRUD 事务 两步提交的相关文章

MongoDB 操作手册CRUD查询

查询操作 基本查询 查询指定集合中的所有记录 db.testData.find()或者db.testData.find({}); 相等条件查询 db.testData.find({num:5});//查询num=5的记录 使用查询操作符声明多个条件 db.testData.find({num:{$in:[2,3,4]}});查询num为2,3,4的记录. 尽管可以使用$or操作符来执行同样的查询,但是当多个or条件都是在同一字段上的相等判断时,请使用$in而不是$or. and条件查询 db.t

MongoDB 操作手册CRUD 删除 remove

删除记录 概述 在MongoDB中,db.collection.remove()方法用于删除集合中的记录.可以删除所有记录,删除所有符合条件的记录,或者是仅删除一条记录. 删除所有记录 删除一个集合中的所有记录,只要将一个空的查询对象{}传给remove()方法即可.remove()方法不删除索引. 例:db.testData.remove({}); 使用remove()方法删除一个集合中的所有记录,可能比使用drop()方法删除包含索引的整个集合,再重建集合和索引更高效. 删除符合条件的记录

MongoDB 操作手册CRUD 更新 update

修改记录 概述 MongoDB提供了update()方法用于更新记录.这个方法接受以下参数: 一个更新条件的JSON对象用于匹配记录,一个更新操作JSON对象用于声明更新操作,和一个选项JSON对象 声明查询条件,使用和查询一样的结构和语法. 默认情况下,update()更新单条记录,若要更新多条记录, 请使用multi选项. 更新记录中的指定字段 用于更新某个字段的某个值,MongoDB提供了update操作符,比如$set. 在执行更新操作时,一些操作符回创建没有的字段,如$set. 测试数

MongoDB 操作手册CRUD查询指针

枚举遍历指针 概述 前面已经讲过,db.collection.find()如果没有指定给一个var声明的变量,将自动枚举前20条记录. 手动枚举指针 在mongo控制台中,将查询赋给一个var声明的变量,让其不自动枚举. var cur = db.testData.find(); 然后每次调用这个指针,将自动遍历20条 cur; 也可以使用指针的next()方法来获取下一条记录 var cur = db.testData.find(); while(cur.hasNext()) { print(

MongoDB 操作手册CRUD插入

插入操作 插入记录 1.插入一条记录 db.testData.insert({num:1,name:'a'}); 结果 WriteResult({ "nInserted" : 1 }) 2.查看插入的记录 db.testData.find(); 插入数组 1.定义数组 var arr = [{num:1,name:'a'},{num:2,name:'b'},{num:3,name:'c'}]; 2.插入记录 db.testData.insert(arr); 结果 nInserted显示

MongoDB 操作手册CRUD 查询性能分析

分析查询性能 explain() cursor方法允许观察查询系统执行的操作.这个方法对于分析高效查询和决定如何使用索引进行查询是十分有用的.这个方法检测的是查询的操作,而不是查询执行时间.因为这个方法尝试多个查询计划,它并不能准确的反映出查询执行时间. 评估一个查询的性能 使用explain()方法,调用find()返回的指针的该方法即可. 例: 在type字段创建索引 db.testData.ensureIndex({'type':1}); 评估一个在type字段上的查询. db.testD

分布式事务 - 两阶段提交与三阶段提交

在分布式系统中,著有CAP理论,该理论由加州大学伯克利分校的Eric Brewer教授提出,该理论阐述了在一个分布式系统中不可能同时满足一致性(Consistency).可用性(Availability),以及分区 容错性(Partition tolerance). 一致性在分布式系统中数据往往存在多个副本,一致性描述的是这些副本中的数据在内容和组织上的一致. 可用性可用性描述了系统对用户的服务能力,所谓可用是指在用户容忍的时间范围内返回用户期望的结果. 分区容错性分布式系统通常由多个节点构成,

聊一聊 MySQL 中的数据编辑过程中涉及的两阶段提交

MySQL 数据库中的两阶段提交,不知道您知道不?这篇文章就简单的聊一聊 MySQL 数据库中的两阶段提交,两阶段提交发生在数据变更期间(更新.删除.新增等),两阶段提交过程中涉及到了 MySQL 数据库中的两个日志系统:redo 日志和 binlog 文件. redo 日志前面已经介绍过了,就不再介绍了,简单的聊一聊 binlog 文件,binlog 是 MySQL server 层提供的二进制文件,因此所有的存储引擎都可以使用 binlog 功能,binlog 是追加写的逻辑日志,记录了执行

分布式事务:两段式提交(最终一致性)

[MySQL如何实现分布式事务?] http://www.linuxidc.com/Linux/2013-10/91925.htm Innodb存储引擎支持XA事务,通过XA事务可以支持分布式事务的实现.分布式事务指的是允许多个独立的事务资源(transac tional resources)参与一个全局的事务中.事务资源通常是关系型数据库系统,也可以是其它类型的资源. 全局事务要求在其中所有参与的事务要么全部提交,要么全部回滚,这对于事务原有的ACID要求又有了提高.另外,在使用分布式事务时候