分布式事务之消息补偿解决方案

一、数据库本地事务

先看看数据库事务的定义:单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全地不执行

这个比较容易理解,操作过数据库的一般都懂,既是业务需求涉及到多个数据表操作的时候,需要用到事务

要么一起更新,要么一起不更新,不会出现只更新了部分数据表的情况,下边看看数据库事务的使用

1 begin tran
2     begin try
3         update Table1 set Field = 1 where ID = 1
4         update Table2 set Field = 2 where ID = 1
5     end try
6     begin catch
7         rollback tran
8     end catch
9 commit tran

上实例在小型项目中一般是问题不大的,因为小型项目一般是单机系统,数据库、Web服务大都在一台服务器上,甚至可能只有一个数据库文件,

这种情况下使用本地事务没有一点问题;

但是本地事务有很大的缺陷,因为开启事务一般是锁表的,事务执行期间会一直锁着,其他的操作一般都要排队等待,对性能要求比较高的系统是不能忍受的。

特别是涉及改动不同数据库的操作,这会造成跨库事务,性能更加低

如果还涉及到不在同一台服务器、甚至不同网段部署的数据库,那本地事务简直是系统运行的灾难,是首先需要丢弃的解决方案。

那如果遇到上述情况,该怎么做呢,这就涉及到分布式事务了

二、分段式事务的补偿机制

如果有海量数据需要处理、或者要求高并发请求的话,同步的事务机制已经是不现实的了,这种情况下必须采用异步事务机制,既分段式的事务

分段式事务一般做法就是把需求任务分段式地完成,通过事务补偿机制来保证业务最终执行成功,补偿机制一般可以归类为2种:

1 )定时任务补偿:

  通过定时任务去跟进后续任务,根据不同的状态表确定下一步的操作,从而保证业务最终执行成功,

  这种办法可能会涉及到很多的后台服务,维护起来也会比较麻烦,这是应该是早期比较流行的做法

2) 消息补偿:

  通过消息中间件触发下一段任务,既通过实时消息通知下一段任务开始执行,执行完毕后的消息回发通知来保证业务最终完成;

  当然这也是异步进行的,但是能保证数据最终的完整性、一致性,也是近几年比较热门的做法

定时任务补偿就不说了,这篇文章我们来讨论一下通过消息补偿来完成分布式事务的一般做法

三、分布式事务之消息补偿

0)我们以简单的产品下单场景来说明,(不要较真哈)

1)先来看看分布式异步事务处理流程示意图,APP1与APP2需要互相订阅对方消息

2)首先看数据库,2个,一个库存库,一个已下单成功的库

 1 -- 下单通知,主要作用保留已下单操作,消息发送失败可以根据此表重新发送
 2 CREATE TABLE [dbo].[ProductMessage](
 3     [ID] [int] IDENTITY(1,1) NOT NULL,
 4     [Product] [varchar](50) NULL,
 5     [Amount] [int] NULL,
 6     [UpdateTime] [datetime] NULL
 7 )
 8 -- 库存
 9 CREATE TABLE [dbo].[ProductStock](
10     [ID] [int] IDENTITY(1,1) NOT NULL,
11     [Product] [varchar](50) NULL,
12     [Amount] [int] NULL
13 )
14 -- 下单成功
15 CREATE TABLE [dbo].[ProductSell](
16     [ID] [int] IDENTITY(1,1) NOT NULL,
17     [Product] [varchar](50) NULL,
18     [Customer] [int] NULL,
19     [Amount] [int] NULL
20 )
21 -- 下单成功消息,主要作用防止重复消费
22 CREATE TABLE [dbo].[ProductMessageApply](
23     [ID] [int] IDENTITY(1,1) NOT NULL,
24     [MesageID] [int] NULL,
25     [CreateTime] [datetime] NULL
26 )

3)项目架构Demo

数据底层访问使用的是Dapper、使用redis作为消息中间件

4)实体层代码

 1     public class ProductMessage
 2     {
 3         [Key]
 4         [IgnoreProperty(true)]
 5         public int ID { get; set; }
 6         public string Product { get; set; }
 7         public int Amount { get; set; }
 8         public DateTime UpdateTime { get; set; }
 9     }
10     public class ProductMessageApply
11     {
12         [Key]
13         [IgnoreProperty(true)]
14         public int ID { get; set; }
15         public int MesageID { get; set; }
16         public DateTime CreateTime { get; set; }
17     }
18     public class ProductSell
19     {
20         [Key]
21         [IgnoreProperty(true)]
22         public int ID { get; set; }
23         public string Product { get; set; }
24         public int Customer { get; set; }
25         public int Amount { get; set; }
26     }
27     public class ProductStock
28     {
29         [Key]
30         [IgnoreProperty(true)]
31         public int ID { get; set; }
32         public string Product { get; set; }
33         public int Amount { get; set; }
34     }

5)服务接口层代码

 1     public interface IProductMessageApplyService
 2     {
 3         void Add(ProductMessageApply entity);
 4         ProductMessageApply Get(int id);
 5     }
 6     public interface IProductMessageService
 7     {
 8         void Add(ProductMessage entity);
 9         IEnumerable<ProductMessage> Gets(object paramPairs = null);
10         void Delete(int id);
11     }
12     public interface IProductSellService
13     {
14         void Add(ProductSell entity);
15     }
16     public interface IProductStockService
17     {
18         void ReduceReserve(int id, int amount);
19     }

6)库存、消息通知

 1     public class ProductMessageService : IProductMessageService
 2     {
 3         private IRepository<ProductMessage> repository;
 4
 5         public ProductMessageService(IRepository<ProductMessage> repository)
 6         {
 7             this.repository = repository;
 8         }
 9
10         public void Add(ProductMessage entity)
11         {
12             this.repository.Add(entity);
13         }
14
15         public IEnumerable<ProductMessage> Gets(object paramPairs = null)
16         {
17             return this.repository.Gets(paramPairs);
18         }
19
20         public void Delete(int id)
21         {
22             this.repository.Delete(id);
23         }
24     }
25
26     public class ProductStockService : IProductStockService
27     {
28         private IRepository<ProductStock> repository;
29
30         public ProductStockService(IRepository<ProductStock> repository)
31         {
32             this.repository = repository;
33         }
34
35         public void ReduceReserve(int id, int amount)
36         {
37             var entity = this.repository.Get(id);
38             if (entity == null) return;
39
40             entity.Amount = entity.Amount - amount;
41             this.repository.Update(entity);
42         }
43     }

7)下单、下单成功消息

 1     public class ProductMessageApplyService : IProductMessageApplyService
 2     {
 3         private IRepository<ProductMessageApply> repository;
 4
 5         public ProductMessageApplyService(IRepository<ProductMessageApply> repository)
 6         {
 7             this.repository = repository;
 8         }
 9
10         public void Add(ProductMessageApply entity)
11         {
12             this.repository.Add(entity);
13         }
14
15         public ProductMessageApply Get(int id)
16         {
17             return this.repository.Get(id);
18         }
19     }
20
21     public class ProductSellService : IProductSellService
22     {
23         private IRepository<ProductSell> repository;
24
25         public ProductSellService(IRepository<ProductSell> repository)
26         {
27             this.repository = repository;
28         }
29
30         public void Add(ProductSell entity)
31         {
32             this.repository.Add(entity);
33         }
34     }

8)下单减库存测试

 1 namespace Demo.Reserve.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Console.WriteLine(string.Format("{0} 程序已启动", DateTime.Now.ToString()));
 8
 9             Send();
10             Subscribe();
11
12             Console.ReadKey();
13         }
14
15         private static void Send()
16         {
17             var unitOfWork = new UnitOfWork(Enums.Reserve);
18
19             try
20             {
21                 var productStockRepository = new BaseRepository<ProductStock>(unitOfWork);
22                 var productStockServic = new ProductStockService(productStockRepository);
23                 var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
24                 var productMessageService = new ProductMessageService(productMessageRepository);
25
26                 var id = 1;
27                 var amount = 2;
28                 var productMessage = new ProductMessage()
29                 {
30                     Product = "ProductCode",
31                     Amount = amount,
32                     UpdateTime = DateTime.Now
33                 };
34
35                 productStockServic.ReduceReserve(id, amount);
36                 productMessageService.Add(productMessage);
37                 unitOfWork.Commit();
38                 Console.WriteLine(string.Format("{0} 减库存完成", DateTime.Now.ToString()));
39                 Thread.Sleep(1000);
40
41                 var message = JsonConvert.SerializeObject(productMessage);
42                 RedisConfig.Instrace.Publish("channel.Send", message);
43                 Console.WriteLine(string.Format("{0} 发送减库存消息: {1}", DateTime.Now.ToString(), message));
44             }
45             catch (Exception ex)
46             {
47                 //Logger.Error(ex);
48                 unitOfWork.Rollback();
49             }
50         }
51
52         private static void Subscribe()
53         {
54             var client = RedisConfig.Instrace.NewClient();
55             var subscriber = client.GetSubscriber();
56
57             subscriber.Subscribe("channel.Success", (chl, message) =>
58             {
59                 try
60                 {
61                     var unitOfWork = new UnitOfWork(Enums.Reserve);
62                     var productMessageRepository = new BaseRepository<ProductMessage>(unitOfWork);
63                     var productMessageService = new ProductMessageService(productMessageRepository);
64
65                     var messageID = message.ToString().ToInt();
66                     if (messageID > 0)
67                     {
68                         productMessageService.Delete(messageID);
69                         Console.WriteLine(string.Format("{0} 收到消费成功消息:{1}", DateTime.Now.ToString(), message));
70                     }
71                 }
72                 catch (Exception ex)
73                 {
74                     //Logger.Error(ex);
75                 }
76             });
77         }
78     }
79 }

9)下单成功及消息回发测试

 1 namespace Demo.Sell.App
 2 {
 3     class Program
 4     {
 5         static void Main(string[] args)
 6         {
 7             Subscribe();
 8
 9             Console.WriteLine(string.Format("{0} 程序已启动", DateTime.Now.ToString()));
10             Console.ReadKey();
11         }
12
13         private static void Subscribe()
14         {
15             var client = RedisConfig.Instrace.NewClient();
16             var subscriber = client.GetSubscriber();
17
18             subscriber.Subscribe("channel.Send", (chl, message) =>
19             {
20                 Consume(message);
21             });
22         }
23
24         private static void Consume(string message)
25         {
26             var unitOfWork = new UnitOfWork(Enums.Sell);
27
28             try
29             {
30                 Console.WriteLine(string.Format("{0} 收到减库存消息: {1}", DateTime.Now.ToString(), message));
31
32                 var productMessage = JsonConvert.DeserializeObject<ProductMessage>(message);
33
34                 var productSellRepository = new BaseRepository<ProductSell>(unitOfWork);
35                 var productSellService = new ProductSellService(productSellRepository);
36
37                 var productMessageApplyRepository = new BaseRepository<ProductMessageApply>(unitOfWork);
38                 var productMessageApplyService = new ProductMessageApplyService(productMessageApplyRepository);
39
40                 var noExists = productMessageApplyService.Get(productMessage.ID) == null;
41                 if (noExists)
42                 {
43                     productSellService.Add(new ProductSell()
44                     {
45                         Product = productMessage.Product,
46                         Amount = productMessage.Amount,
47                         Customer = 123
48                     });
49
50                     productMessageApplyService.Add(new ProductMessageApply()
51                     {
52                         MesageID = productMessage.ID,
53                         CreateTime = DateTime.Now
54                     });
55
56                     unitOfWork.Commit();
57                     Console.WriteLine(string.Format("{0} 消息消费完成", DateTime.Now.ToString()));
58                     Thread.Sleep(1000);
59                 }
60
61                 RedisConfig.Instrace.Publish("channel.Success", productMessage.ID.ToString());
62                 Console.WriteLine(string.Format("{0} 发送消费完成通知:{1}", DateTime.Now.ToString(), productMessage.ID.ToString()));
63             }
64             catch (Exception ex)
65             {
66                 //Logger.Error(ex);
67                 unitOfWork.Rollback();
68             }
69         }
70     }
71 }

10)好了,到了最后检验成果的时候了

先打开Demo.Sell.App.exe、然后打开Demo.Reserve.App.exe

大功告成!

原文地址:https://www.cnblogs.com/lanxiaoke/p/8321657.html

时间: 2024-08-05 10:19:48

分布式事务之消息补偿解决方案的相关文章

分布式事务及分布式系统一致性解决方案

在分布式系统中,同时满足"一致性"."可用性"和"分区容错性"三者是不可能的.分布式系统的事务一致性是一个技术难题,各种解决方案孰优孰劣? 在OLTP系统领域,我们在很多业务场景下都会面临事务一致性方面的需求,例如最经典的Bob给Smith转账的案例.传统的企业开发,系统往往是以单体应用形式存在的,也没有横跨多个数据库. 我们通常只需借助开发平台中特有数据访问技术和框架(例如Spring.JDBC.ADO.NET),结合关系型数据库自带的事务管理

[转帖]聊聊分布式事务,再说说解决方案

https://www.cnblogs.com/savorboard/p/distributed-system-transaction-consistency.html 需要多学习一下. 前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了. 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样. 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构

分布式事务的四种解决方案

转:https://www.cnblogs.com/mayundalao/p/11798502.html 分布式事务指事务的操作位于不同的节点上,需要保证事务的 AICD 特性. 例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务. 解决方案 在分布式系统中,要实现分布式事务,无外乎那几种解决方案. 一.两阶段提交(2PC) 两阶段提交(Two-phase Commit,2PC),XA协议的实现,通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否

分布式事务之消息队列一致性和幂等问题

原文链接:https://www.cnblogs.com/rjzheng/p/10115798.html 引言 这篇说说分布式事务的问题.企业现在的架构都由传统的架构转向了微服务架构,如下图所示:那么,都不可避免的会遇到跨数据库调用的,分布式事务问题!目前,业内解决分布式事务问题,都基本不用JTA这种强一致性的解决方案,基本是采用如下两套方案 基于TCC的事务框架 消息队列 OK,你们先记住两点(1)图中的服务A和服务B,如果是同步调用,要求一起成功,或者一起失败,那么此时应选用TCC的事务框架

对分布式事务、消息队列的重新认识

本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功.若两张表在同一个数据库实例中,则使用本地事务就好了.否则可以采用分布式事务,或者消息队列. 前阵子从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了. 上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品

spring多数据源分布式事务的分析与解决方案

一.概述 1.业务背景 对老系统进行重构合并,导致新系统需要同时对3个数据库进行管理.由于出现跨库业务,需要实现分布式事务. 2.开发环境 spring框架版本  4.3.10.RELEASE 持久层为结合mybatis写的领域模型,如 每一个entity对应数据库的一张表,@DataSource注解(自定义)了对应数据源的key值.所以一个业务中可能存在数据源的切换. 事务采用注解@Transaction驱动. 二.spring对多数据源的支持 spring框架通过抽象类AbstractRou

分布式事务解决方案(转载)

目前的应用系统,不管是企业级应用还是互联网应用,最终数据的一致性是每个应用系统都要面临的问题,随着分布式的逐渐普及,数据一致性更加艰难,但是也很难有银弹的解决方案,也并不是引入特定的中间件或者特定的开源框架能够解决的,更多的还是看业务场景,根据场景来给出解决方案.根据笔者最近几年的了解,总结了几个点,更多的应用系统在编码的时候,更加关注数据的一致性,这样系统才是健壮的. 一.基础理论 目前关于事务的几大理论包括:ACID事务特性,CAP分布式理论,以及BASE等.ACID在数据库事务中体现,CA

分布式事务,解决方案

聊聊分布式事务,再说说解决方案 分布式事务CAP理解论证-解决方案 分布式系统的2PC.3PC详细分析 github tcc示例 分布式事务.重复消费.顺序消费 一.理论 CAP相关: CAP与BASE相关:我的博客 而对于分布式中的问题的解决方案,CAP原则出现,描述如下: 一致性(Consistency): 像A节点写入一条信息之后,同一时刻,在其他节点都可以读到这条信息 可用性(Availability): 多布一些节点A,B,C-,任何时刻,用户访问,都应该以可预期的结果返回,而不是浏览

猪齿鱼_学习_01_事务(三)_分布式事务解决方案

一.前言 本文承接上一节:猪齿鱼_学习_01_事务(二)_分布式理论 第一节中,我们谈到了本地事务数据库断电时的故障恢复: 我们在执行事务的时候数据库首先会记录下这个事务的redo操作日志,然后才开始真正操作数据库,在操作之前首先会把日志文件写入磁盘,那么当突然断电的时候,即使操作没有完成,在重新启动数据库时候,数据库会根据当前数据的情况进行undo回滚或者是redo前滚,这样就保证了数据的强一致性. 分布式系统的核心就是处理各种异常情况,这也是分布式系统复杂的地方,因为分布式的网络环境很复杂,