.NET Core 事件总线,分布式事务解决方案:CAP

背景

相信前面几篇关于微服务的文章也介绍了那么多了,在构建微服务的过程中确实需要这么一个东西,即便不是在构建微服务,那么在构建分布式应用的过程中也会遇到分布式事务的问题,那么 CAP 就是在这样的背景下诞生的。

最初打算做这个东西是在去年(2016)年底,最初是为了解决分布式系统中的分布式事务的问题,然后当时有了一个大概的概念轮廓,当时我对于前面两篇文章中关于异步消息和微服务之间通讯还不是太了解,只是觉得这样能够解决这一系列的问题,然后就着手做了,最后发现和这些概念竟然不谋而合。

经过大半年的不断重构以及修改,最终 CAP 1.0 版本发布了。作为一个开源项目,最初项目是在我的个人Github下,然后于上个月已经贡献给了 .NET China Foundation 组织,目前该项目由我和 DotNetCore 项目组共同维护。

CAP 介绍

Github:https://github.com/dotnetcore/CAP

开源协议:MIT

CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。

CAP 具有 Event Bus 的所有功能,并且CAP提供了更加简化的方式来处理EventBus中的发布/订阅。

CAP 具有消息持久化的功能,也就是当你的服务进行重启或者宕机时,她可以保证消息的可靠性。

CAP 实现了分布式事务中的最终一致性,你不用再去处理这些琐碎的细节。

CAP 提供了基于 Microsoft DI 的 API 服务,她可以和你的 ASP.NET Core 系统进行无缝结合,并且能够和你的业务代码集成支持强一致性的事务处理。

CAP 是开源免费的。CAP基于MIT协议开源,你可以免费的在你的私人或者商业项目中使用,不会有人向你收取任何费用。

Getting Started

目前, CAP 同时支持使用 RabbitMQ,Kafka,Azure Service Bus 等进行底层之间的消息发送,你不需要具备这些消息队列的使用经验,仍然可以轻松的集成到项目中。

CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 数据库的项目。

CAP 同时支持使用 EntityFrameworkCore 和 ADO.NET 的项目,你可以根据需要选择不同的配置方式。

下面是CAP在系统中的一个不完全示意图:

图中实线部分代表用户代码,虚线部分代表CAP内部实现。

下面,我们看一下 CAP 怎么集成到项目中:

Step 1:

你可以运行下面的命令来安装CAP NuGet 包:

PM> Install-Package DotNetCore.CAP

根据底层消息队列,你可以选择引入不同的包:

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 目前支持使用 SQL Server, PostgreSql, MySql, MongoDB 的项目,你可以选择引入不同的包:

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB     //需要 MongoDB 4.0+ 集群

Step 2:

在 Startup.cs 文件中,添加如下配置:

public void ConfigureServices(IServiceCollection services)
{
    ......

    services.AddDbContext<AppDbContext>();

    services.AddCap(x =>
    {
        //如果你使用的 EF 进行数据操作,你需要添加如下配置:
        x.UseEntityFramework<AppDbContext>();  //可选项,你不需要再次配置 x.UseSqlServer 了

        //如果你使用的ADO.NET,根据数据库选择进行配置:
        x.UseSqlServer("数据库连接字符串");
        x.UseMySql("数据库连接字符串");
        x.UsePostgreSql("数据库连接字符串");

        //如果你使用的 MongoDB,你可以添加如下配置:
        x.UseMongoDB("ConnectionStrings");  //注意,仅支持MongoDB 4.0+集群

        //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
        x.UseRabbitMQ("ConnectionStrings");
        x.UseKafka("ConnectionStrings");
        x.UseAzureServiceBus("ConnectionStrings");
    });
}

发布事件/消息

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发布:

public class PublishController : Controller
{
    private readonly ICapPublisher _capBus;

    public PublishController(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }

    //不使用事务
    [Route("~/without/transaction")]
    public IActionResult WithoutTransaction()
    {
        _capBus.Publish("xxx.services.show.time", DateTime.Now);

        return Ok();
    }

    //Ado.Net 中使用事务,自动提交
    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
            {
                //业务代码

                _capBus.Publish("xxx.services.show.time", DateTime.Now);
            }
        }
        return Ok();
    }

    //EntityFramework 中使用事务,自动提交
    [Route("~/ef/transaction")]
    public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
    {
        using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //业务代码

            _capBus.Publish("xxx.services.show.time", DateTime.Now);
        }
        return Ok();
    }
}

订阅事件/消息

在 Controller 中:
如果是在Controller中,直接添加[CapSubscribe("")] 来订阅相关消息。

public class PublishController : Controller
{
    [NoAction]
    [CapSubscribe("xxx.services.show.time")]
    public async Task CheckReceivedMessage(DateTime time)
    {
        Console.WriteLine(time);
        return Task.CompletedTask;
    }
}

在 xxxService中:
如果你的方法没有位于Controller 中,那么你订阅的类需要继承 ICapSubscribe,然后添加[CapSubscribe("")]标记:

namespace xxx.Service
{
    public interface ISubscriberService
    {
        public void CheckReceivedMessage(DateTime time);
    }

    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
        [CapSubscribe("xxxx.services.show.time")]
        public void CheckReceivedMessage(DateTime time)
        {

        }
    }
}

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService 类

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

结束了,怎么样,是不是很简单?

鸣谢

感谢 lan Ye 同学对本项目的英文翻译工作。

感谢 AlexLEWIS 同学对本项目的其他支持。

感谢 .NET China Foundation 团队成员对本项目的支持。

总结

如果你有任何问题,都可以去 Github 给我们提交 Issue,我们会在第一时间处理。

如果你觉得这个开源项目还不错,给个Github Star 支持一下那就太好了。

如果你觉得本篇文章对你有帮助的话,可以关注一下博主,顺手点个【推荐】哦。



本文地址:http://www.cnblogs.com/savorboard/p/cap.html
作者博客:Savorboard
欢迎转载,请在明显位置给出出处及链接

posted @ 2017-07-21 09:03 Savorboard 阅读(44774) 评论(103) 编辑 收藏

原文地址:https://www.cnblogs.com/lhxsoft/p/11880184.html

时间: 2024-10-11 15:43:56

.NET Core 事件总线,分布式事务解决方案:CAP的相关文章

分布式事务解决方案

分布式理论 当我们的单个数据库的性能产生瓶颈的时候,我们可能会对数据库进行分区,这里所说的分区指的是物理分区,分区之后可能不同的库就处于不同的服务器上了,这个时候单个数据库的ACID已经不能适应这种情况了,而在这种ACID的集群环境下,再想保证集群的ACID几乎是很难达到,或者即使能达到那么效率和性能会大幅下降,最为关键的是再很难扩展新的分区了,这个时候如果再追求集群的ACID会导致我们的系统变得很差,这时我们就需要引入一个新的理论原则来适应这种集群的情况,就是 CAP 原则或者叫CAP定理,那

分布式事务解决方案---阅读--篇1--关于分布式系统的数据一致性问题

self: 这篇文章逻辑不算很清晰,但讲到的点还算是比较好的.自己总结一下可以做不错的参考: 1. 这边文章主要讲了两个方面,一方面是MQ的消息可靠性问题,另一方面是MQ可以被利用来做补偿机制的最终一致性分布式事务解决方案. 2. 关于MQ消息的问题大致有下面三个 2.1 如何保证A->M的消息,M一定接收到了,同样,如何保证M->A的消息,M一定接收到了 2.2 如果数据需要一致性更新,比如A发送了三条消息给M,M要么全部保存,要么全部不保存,不能够只保存其中的几条记录.我们假设更新的数据是

常用的分布式事务解决方案

众所周知,数据库能实现本地事务,也就是在同一个数据库中,你可以允许一组操作要么全都正确执行,要么全都不执行.这里特别强调了本地事务,也就是目前的数据库只能支持同一个数据库中的事务.但现在的系统往往采用微服务架构,业务系统拥有独立的数据库,因此就出现了跨多个数据库的事务需求,这种事务即为"分布式事务".那么在目前数据库不支持跨库事务的情况下,我们应该如何实现分布式事务呢?本文首先会为大家梳理分布式事务的基本概念和理论基础,然后介绍几种目前常用的分布式事务解决方案.废话不多说,那就开始吧-

分布式事务 解决方案

事务的概念来源于业务过程.在许多情况下我们都希望能够确保在一个过程中执行的所有操作是完全成功的.在集中式系统中,事务被广泛用于服务器端和数据库系统,控制数据的操作.随着分布式计算的发展,事务在分布式计算领域中也得到了广泛的应用,但是分布式系统架构中,分布式事务问题是一个绕不过去的挑战.而微服务架构的流行,让分布式事问题日益突出! 下面我们以电商购物支付流程中,在各大参与者系统中可能会遇到分布式事务问题的场景进行详细的分析! 如上图所示,假设三大参与平台(电商平台.支付平台.银行)的系统都做了分布

基于金融系统的分布式事务解决方案

分布式系统架构中,分布式事务问题是一个绕不过去的挑战.而微服务架构的流行,让分布式事问题日益突出! 下面我们以电商购物支付流程中,在各大参与者系统中可能会遇到分布式事务问题的场景进行详细的分析! 如上图所示,假设三大参与平台(电商平台.支付平台.银行)的系统都做了分布式系统架构拆分,按上数中的流程步骤进行分析: 1.电商平台中创建订单:预留库存.预扣减积分.锁定优惠券,此时电商平台内各服务间会有分布式事务问题,因为此时已经要跨多个内部服务修改数据: 2.支付平台中创建支付订单(选银行卡支付):查

微服务架构的分布式事务解决方案

微服务架构的分布式事务解决方案 标签:分布式事务,微服务,消息最终一致性,分布式事务解决方案发布于 2016-07-16 18:39:05 分布式系统架构中,分布式事务问题是一个绕不过去的挑战.而微服务架构的流行,让分布式事问题日益突出! 下面我们以电商购物支付流程中,在各大参与者系统中可能会遇到分布式事务问题的场景进行详细的分析! 如上图所示,假设三大参与平台(电商平台.支付平台.银行)的系统都做了分布式系统架构拆分,按上数中的流程步骤进行分析: 1.电商平台中创建订单:预留库存.预扣减积分.

java微服务架构的分布式事务解决方案

java微服务架构的分布式事务解决方案 课程目录如下: 1.课程介绍20分钟2.解决方案的效果演示(结合支付系统真实应用场景)45分钟3.常用的分布式事务解决方案介绍47分钟4.消息发送一致性(可靠消息的前提保障)20分钟5.消息发送一致性的异常流程处理16分钟6.常规MQ队列消息的处理流程和特点12分钟7.消息重复发送问题及业务接口的幂等性设计18分钟8.可靠消息最终一致性方案1(本地消息服务)的设计19分钟9.可靠消息最终一致性方案2(独立消息服务)的设计24分钟10.可靠消息服务的设计与实

微服务架构下分布式事务解决方案——阿里云GTS

https://blog.csdn.net/jiangyu_gts/article/details/79470240 1 微服务的发展 微服务倡导将复杂的单体应用拆分为若干个功能简单.松耦合的服务,这样可以降低开发难度.增强扩展性.便于敏捷开发.当前被越来越多的开发者推崇,很多互联网行业巨头.开源社区等都开始了微服务的讨论和实践.Hailo有160个不同服务构成,NetFlix有大约600个服务.国内方面,阿里巴巴.腾讯.360.京东.58同城等很多互联网公司都进行了微服务化实践.当前微服务的开

分布式事务解决方案——柔性事务与服务模式

在分布式系统中,是无法使用本地事务保证数据的一致性的.一种标准的分布式事务就是全局事务(DTP模型).他是基于2PC来控制的.但是由于2PC自身就存在同步阻塞的问题,这也就导致全局事务效率很低.所以,这种全局事务并不适合解决大型网站的分布式事务问题. 柔性事务在业内,主要用来解决分布式事务的方案是使用柔性事务.所谓柔性事务,相比较与数据库事务中的ACID这种刚性事务来说,柔性事务保证的事"基本可用,最终一致."这其实就是基于BASE理论,保证数据的最终一致性. 虽然柔性事务并不像刚性事