EF并发详解

EntityFramework Core高并发深挖详解,一纸长文,你准备好看完了吗?

前言

之前有关EF并发探讨过几次,但是呢,博主感觉还是有问题,为什么会觉得有问题,其实就是理解不够透彻罢了,于是在项目中都是用的存储过程或者SQL语句来实现,利用放假时间好好补补EF Core并发的问题,本文比较长,请耐心点看。

EntityFramework Core并发初级版初探

关于并发无非就两种:乐观并发和悲观并发,悲观并发简言之则是当客户端对数据库中同一值进行修改时会造成阻塞,而乐观并发则任何客户端都可以对可以对数据进行查询或者读取,在EF Core中不支持悲观并发,结果则产生并发冲突,所以产生的冲突则需要我们去解决。

为了便于理解我们从基础内容开始讲起,稍安勿躁,我们循序渐进稍后会讲到并发冲突、并发解决、并发高级三个方面的内容。我们建立实体类如下:

    public class Blog
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public string Url { get; set; }
        public int Count { get; set; }
    }

接下来简单配置下映射:

    public class EFCoreContext : DbContext
    {
        public DbSet<Blog> Blogs { get; set; }
        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
            => optionsBuilder.UseSqlServer(@"Server=.;Database=EFCoreDb;Trusted_Connection=True;");

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            modelBuilder.Entity<Blog>(pc =>
            {
                pc.ToTable("Blog").HasKey(k => k.Id);

                pc.Property(p => p.Name).IsRequired();
                pc.Property(p => p.Url).IsRequired();
                pc.Property(p => p.Count).IsRequired();
            });
        }
    }

接下来我们简单封装下进行查询和更新数据的类 DbQueryCommit

    public class DbQueryCommit : IDisposable
    {

        private readonly EFCoreContext context;

        public DbQueryCommit(EFCoreContext context) => this.context = context;

        public TEntity Query<TEntity>(params object[] keys) where TEntity : class =>
            this.context.Set<TEntity>().Find(keys);

        public int Commit(Action change)
        {
            change();
            return context.SaveChanges();
        }

        public DbSet<TEntity> Set<TEntity>() where TEntity : class => context.Set<TEntity>();

        public void Dispose() => context.Dispose();
    }

接下来我们来看看非并发的情况,进行如下查询和修改:

        public static void NoCheck(
            DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3)
        {
            int id = 1;
            Blog blog1 = readerWriter1.Query<Blog>(id);
            Blog blog2 = readerWriter2.Query<Blog>(id);

            readerWriter1.Commit(() => blog1.Name = nameof(readerWriter1));

            readerWriter2.Commit(() => blog2.Name = nameof(readerWriter2));

            Blog category3 = readerWriter3.Query<Blog>(id);
            Console.WriteLine(category3.Name);
        } 

当前博主VS版本为2017,演示该程序在控制台,之前我们有讲过若要进行迁移需要安装 Microsoft.EntityFrameworkCore.Tools.DotNet 程序包,此时我们会发现根本都安装不上,如下:

不知为何错误,此时我们需要在项目文件中手动添加如上程序包,(解决方案来源于:https://docs.microsoft.com/en-us/ef/core/miscellaneous/cli/dotnet)如下:

<ItemGroup>
    <DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
  </ItemGroup>

然后添加程序包 Microsoft.EntityFrameworkCore.Design ,此时我们再来 dotnet restore 则会看到如下执行EF的命令:

接下来我们实例化上下文进行修改数据。

            var efContext1 = new EFCoreContext();
            var d1 = new DbQueryCommit(efContext1);

            var efContext2 = new EFCoreContext();
            var d2 = new DbQueryCommit(efContext2);

            var efContext3 = new EFCoreContext();
            var d3 = new DbQueryCommit(efContext3);

            Concurrency.NoCheck(d1, d2, d3);       

此时我们在数据库中默认插入一条数据:

此时界面打印最后读取到的Name值如下:

数据库也对应进行了更新,这也充分说明EF Core对于并发为乐观并发:

接下来我们对Name属性定义为并发Token。

 pc.Property(p => p.Name).IsRequired().IsConcurrencyToken();

此时为了很好演示各个方法,我们同样再来定义并发方法,如下:

        public static void ConcurrencyCheck(DbQueryCommit readerWriter1, DbQueryCommit readerWriter2)
        {
            int id = 1;
            Blog blog1 = readerWriter1.Query<Blog>(id);
            Blog blog2 = readerWriter2.Query<Blog>(id);

            readerWriter1.Commit(() =>
            {
                blog1.Name = nameof(readerWriter1);
                blog1.Count = 2;
            });

            readerWriter2.Commit(() =>
            {
                blog2.Name = nameof(readerWriter2);
                blog2.Count = 2;
            });
        }

此时再来调用该方法:

            var efContext1 = new EFCoreContext();
            var d1 = new DbQueryCommit(efContext1);

            var efContext2 = new EFCoreContext();
            var d2 = new DbQueryCommit(efContext2);

            //var efContext3 = new EFCoreContext();
            //var d3 = new DbQueryCommit(efContext3);

            Concurrency.ConcurrencyCheck(d1, d2);

当我们利用两个上下文1和2去读取数据时此时Name = ‘Jeffcky‘,当上下文1更新时在快照中根据主键和Name去查找数据库,查找到Name后并令Name = ‘readerWriter1‘成功更新,但是上下2去更新Name = ‘readerWriter2‘时,此时在快照中根据主键和Name去查找数据库,发现不存在该条数据,同时我们设置了并发Token,最终导致出现 DbUpdateConcurrencyException 并发更新异常。解决并发个两点一个是上述设置并发Token,另外一个则是设置行版本,下面我们也来看下,首先我们在类中增加一个行版本的字节属性。

public byte[] RowVersion { get; set; }

同时对该行版本进行映射标识。

 pc.Property(p => p.RowVersion).IsRequired().IsRowVersion().ValueGeneratedOnAddOrUpdate();

为了很好演示行版本并发,我们增加一个属性来打印行版本字符串。

  public string RowVersionString =>
       $"0x{BitConverter.ToUInt64(RowVersion.Reverse().ToArray(), 0).ToString("X16")}";

同样我们定义一个调用行版本的方法:

        public static void RowVersion(DbQueryCommit readerWriter1, DbQueryCommit readerWriter2)
        {
            int id = 1;
            Blog blog1 = readerWriter1.Query<Blog>(id);
            Console.WriteLine(blog1.RowVersionString);

            Blog blog2 = readerWriter2.Query<Blog>(id);
            Console.WriteLine(blog2.RowVersionString);

            readerWriter1.Commit(() => blog1.Name = nameof(readerWriter1));
            Console.WriteLine(blog1.RowVersionString);

            readerWriter2.Commit(() => readerWriter2.Set<Blog>().Remove(blog2));
        }

接下来我们调用演示看看。

            var efContext1 = new EFCoreContext();
            var d1 = new DbQueryCommit(efContext1);

            var efContext2 = new EFCoreContext();
            var d2 = new DbQueryCommit(efContext2);

            //var efContext3 = new EFCoreContext();
            //var d3 = new DbQueryCommit(efContext3);

            Concurrency.RowVersion(d1, d2); 

我们从上可以明显看出当查出数据库中的行版本值为 0x000000000073 ,接着readerWriter1更新后其行版本增加为 0x000000000074 ,当我们利用readerWriter2去删除查询出id = 1的数据时,此时会根据当前主键和行版本为 0x000000000073 去查找数据库,但是此时没有找到数据,导致同样如上述并发Token一样出现并发异常。

EntityFramework Core并发中级版解析

并发异常我们可以通过 DbUpdateConcurrencyException  来获取,该类继承自 DbUpdateException ,该类中的参数 EntityEntry 为一个集合,利用它则可以获取到对应的数据库中的值以及当前更新值等,所以我们可以自定义并发异常解析,如下:

    public class DbUpdateException : Exception
    {
        public virtual IReadOnlyList<EntityEntry> Entries { get; }

    }

    public class DbUpdateConcurrencyException : DbUpdateException
    {
        //TODO
    }

这里我们需要弄明白存在EntityEntry中的值类型,比如DbUpdateConcurrencyException的参数为exception。我们通过如下则可以获取到被跟踪的实体状态。

var tracking = exception.Entries.Single();

此时存在数据库中的原始值则为如下:

var original = tracking.OriginalValues.ToObject();

而当前需要更新的值则为如下:

var current = tracking.CurrentValues.ToObject();

而数据库中的值则为已经提交更新的值:

var database = ‘第一次已经更新的对象‘;

上述既然出现并发异常,接下来我们则需要解析并发异常并解决异常,大部分情况下无论是提交事务失败也好还是对数据进行操作也好都会进行重试机制,所以这里我们解析到并发异常并采取重试机制。之前我们进行提交时定义如下:

        public int Commit(Action change)
        {
            change();
            return context.SaveChanges();
        }

此时我们对该方法进行重载,遇到并发异常后并采取重试机制重试三次,如下:

        public int Commit(Action change, Action<DbUpdateConcurrencyException> handleException, int retryCount = 3)
        {
            change();
            for (int retry = 0; retry < retryCount; retry++)
            {
                try
                {
                    return context.SaveChanges();
                }
                catch (DbUpdateConcurrencyException exception)
                {
                    handleException(exception);
                }
            }
            return context.SaveChanges();
        }

然后我们定义一个需要出现并发并调用上述重试机制的更新方法,如下:

        public static void UpdateBlog(
    DbQueryCommit readerWriter1, DbQueryCommit readerWriter2,
    DbQueryCommit readerWriter3,
    Action<EntityEntry> resolveConflict)
        {
            int id = 1;
            Blog blog1 = readerWriter1.Query<Blog>(id);
            Blog blog2 = readerWriter2.Query<Blog>(id);
            Console.WriteLine($"查询行版本:{blog1.RowVersionString}");
            Console.WriteLine("----------------------------------------------------------");
            Console.WriteLine($"查询行版本:{blog2.RowVersionString}");
            Console.WriteLine("----------------------------------------------------------");

            readerWriter1.Commit(() =>
            {
                blog1.Name = nameof(readerWriter1);
                blog1.Count = 2;
            });

            Console.WriteLine($"更新blog1后行版本:{blog1.RowVersionString}");
            Console.WriteLine("----------------------------------------------------------");

            readerWriter2.Commit(
            change: () =>
            {
                blog2.Name = nameof(readerWriter2);
                blog2.Count = 1;
            },
            handleException: exception =>
            {
                EntityEntry tracking = exception.Entries.Single();
                Blog original = (Blog)tracking.OriginalValues.ToObject();
                Blog current = (Blog)tracking.CurrentValues.ToObject();
                Blog database = blog1;

                var origin = $"原始值:({original.Name},{original.Count},{original.Id},{original.RowVersionString})";
                Console.WriteLine(original);
                Console.WriteLine("----------------------------------------------------------");

                var databaseValue = $"数据库中值:({database.Name},{database.Count},{database.Id},{database.RowVersionString})";
                Console.WriteLine(databaseValue);
                Console.WriteLine("----------------------------------------------------------");

                var update = $"更新的值:({current.Name},{current.Count},{current.Id},{current.RowVersionString})";
                Console.WriteLine(update);
                Console.WriteLine("----------------------------------------------------------");

                resolveConflict(tracking);
            });

            Blog resolved = readerWriter3.Query<Blog>(id);

            var resolvedValue = $"查询并发解析后中的值:  ({resolved.Name}, {resolved.Count}, {resolved.Id},{resolved.RowVersionString})";
            Console.WriteLine(resolvedValue);
        }

接下来我们实例化三个上下文,稍后我会一一进行解释,避免看文章的童鞋看晕了几个值。

            var efContext1 = new EFCoreContext();
            var d1 = new DbQueryCommit(efContext1);

            var efContext2 = new EFCoreContext();
            var d2 = new DbQueryCommit(efContext2);

            var efContext3 = new EFCoreContext();
            var d3 = new DbQueryCommit(efContext3);

【温馨提示】:有很多童鞋在用.net core时控制台时会遇见中文乱码的问题,主要是.net core都需要安装包来进行,以此来说明不再依赖本地程序集达到更好的跨平台,真正实现模块化,所以需要在控制台注册中文编码包,如下:

Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);

同时需要安装如下包:

System.Text.Encoding.CodePages

在注册后还需要设置控制台输出行编码为GB2312

 Console.OutputEncoding = Encoding.GetEncoding("GB2312");

接下来进行方法调用:

            Concurrency.UpdateBlog(d1, d2, d3, (d) =>
            {
                PropertyValues databaseValues = d.GetDatabaseValues();
                if (databaseValues == null)
                {
                    d.State = EntityState.Detached;
                }
                else
                {
                    d.OriginalValues.SetValues(databaseValues);
                }
            });

此时控制台打印和数据库更新值如下:

我们从调用方法开始解释:

当执行到更新上下文一中的Name值时为readerWriter1都没有问题,此时行版本将变为 0x0000000000833 ,当执行到上下文2中时此时去更新Name值时,此时会根据主键和行版本 0x0000000000832 去数据库中查找值,此时却没找到,然后将执行并发异常,最终开始执行 resolveConflict(tracking); 来解析冲突,然后就来到了上述图片所示,此时databaseValues中所存的值就是readrWriter1,也就是说此时数据库中的原始值变为了Name = ‘readerWriter1‘,我们需要做的是将数据库中的Name = ‘readerWriter1‘设置为原始值,这样下次再去解析冲突会根据主键id = 1和行版本0x00000000833去查找,此时找到该行数据最终进行更新到数据库,所以结果如上图所给。到这里你是不是就觉得难道就这么结束了吗?NO,这个是最简单的一个场景,上述还只是两个并发客户端,如果是多个根本无法保证能够完全解析并发,同时中间还存在一个问题,我们到底是让客户端更新值获胜还是让数据库中原始值获胜呢,这又是一个问题,如果我们完全不借助SQL语句或者存储过程来执行事务的话,这个将是个很严重的问题,比如在秒杀场景中,产品只有1000个,那么每次都让客户端获胜,好吧,那就导致库存溢出的问题,那就呵呵了,还有一个很大的问题则是合并,如果有多个并发请求过来可能我们只需要对于产品中的数量进行并发控制,其他的数据更新完全可以进行合并,这又是一个问题,那么到底该如何解决呢,请继续往下看终极解决方案。

EntityFramweork Core高并发获胜者初级版解析

EntityFramework Core并发数据库获胜

既然是数据库中获胜那么对于客户端出现的并发异常我们就不需要进行解析,此时我们只需要终止异常直接返回值即可,如下定义方法:

        public int DatabaseWin(Action change, Action<DbUpdateConcurrencyException> handleException)
        {
            change();
            try
            {
                return context.SaveChanges();
            }
            catch (DbUpdateConcurrencyException exception)
            {
                return 0;
            }

        }

接着我们在UpdateBlog方法中在上下文2中提交数据时调用上述方法并无需再进行并发解析,如下:

            readerWriter2.DatabaseWin(
            change: () =>
            {
                blog2.Name = nameof(readerWriter2);
                blog2.Count = 1;
            },
            handleException: exception =>
            {
                EntityEntry tracking = exception.Entries.Single();
                Blog original = (Blog)tracking.OriginalValues.ToObject();
                Blog current = (Blog)tracking.CurrentValues.ToObject();
                Blog database = blog1;

                var origin = $"原始值:({original.Name},{original.Count},{original.Id},{original.RowVersionString})";
                Console.WriteLine(original);
                Console.WriteLine("----------------------------------------------------------");

                var databaseValue = $"数据库中值:({database.Name},{database.Count},{database.Id},{database.RowVersionString})";
                Console.WriteLine(databaseValue);
                Console.WriteLine("----------------------------------------------------------");

                var update = $"更新的值:({current.Name},{current.Count},{current.Id},{current.RowVersionString})";
                Console.WriteLine(update);
                Console.WriteLine("----------------------------------------------------------");

                //resolveConflict(tracking);
            });

此时打印和数据库中值如下:

上述就无需再多讲了,根本没有去解析异常。

EntityFramework Core并发客户端获胜

上一大节我们演示的则是客户端获胜,这里我们只需要设置异常解析的值即可解决问题,封装一个方法,如下:

        public static void ClientWins(
    DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3) =>
        UpdateBlog(readerWriter1, readerWriter2, readerWriter3, resolveConflict: tracking =>
        {
            PropertyValues databaseValues = tracking.GetDatabaseValues();
            tracking.OriginalValues.SetValues(databaseValues);

            Console.WriteLine(tracking.State);
            Console.WriteLine(tracking.Property(nameof(Blog.Count)).IsModified);
            Console.WriteLine(tracking.Property(nameof(Blog.Name)).IsModified);
            Console.WriteLine(tracking.Property(nameof(Blog.Id)).IsModified);
        });

结果就不再演示和之前演示结果等同。我们将重点放在客户端和数据库值合并的问题,请继续往下看。

EntityFramework Core并发数据库和客户端合并

当出现并发时我们对前者使其客户端获胜而后者对于前者未有的属性则进行更新,所以我们需要首先对数据库原始值克隆一份,然后将其客户端获胜,然后将原始值和客户端属性进行比较,若数据库中的属性在原始值中的属性中没有,我们则将数据库中的值不进行更新,此时将导致当前并发中的值进行更新则呈现出我们所说客户端和数据库值进行合并更新,如下首先克隆:

            PropertyValues originalValues = tracking.OriginalValues.Clone();
            PropertyValues databaseValues = tracking.GetDatabaseValues();

            tracking.OriginalValues.SetValues(databaseValues);

比较原始值和数据库中的属性进行比较判断,不存在则不更新。

                databaseValues.Properties
                .Where(property => !object.Equals(originalValues[property.Name], databaseValues[property.Name]))
                .ToList()
                .ForEach(property => tracking.Property(property.Name).IsModified = false);

最终我们定义如下合并方法:

        public static void MergeClientAndDatabase(
    DbQueryCommit readerWriter1, DbQueryCommit readerWriter2, DbQueryCommit readerWriter3) =>
        UpdateBlog(readerWriter1, readerWriter2, readerWriter3, resolveConflict: tracking =>
        {
            PropertyValues originalValues = tracking.OriginalValues.Clone();
            PropertyValues databaseValues = tracking.GetDatabaseValues();

            tracking.OriginalValues.SetValues(databaseValues);

#if selfDefine
            databaseValues.PropertyNames
                    .Where(property => !object.Equals(originalValues[property], databaseValues[property]))
                    .ForEach(property => tracking.Property(property).IsModified = false);
#else
            databaseValues.Properties
                .Where(property => !object.Equals(originalValues[property.Name], databaseValues[property.Name]))
                .ToList()
                .ForEach(property => tracking.Property(property.Name).IsModified = false);
#endif

            Console.WriteLine(tracking.State);
            Console.WriteLine(tracking.Property(nameof(Blog.Count)).IsModified);
            Console.WriteLine(tracking.Property(nameof(Blog.Name)).IsModified);
            Console.WriteLine(tracking.Property(nameof(Blog.Id)).IsModified);
        });

此时我们再在UpdateBlog方法添加二者不同的属性,如下:

            readerWriter1.Commit(() =>
            {
                blog1.Name = nameof(readerWriter1);
                blog1.Count = 3;
            });

            Console.WriteLine($"更新blog1后行版本:{blog1.RowVersionString}");
            Console.WriteLine("----------------------------------------------------------");

            readerWriter2.Commit(
            change: () =>
            {
                blog2.Name = nameof(readerWriter2);
                blog2.Count = 4;
                blog2.Url = "http://www.cnblogs.com/CreateMyself";
            }.......

为了便于阅读者观察和对比,我们给出数据库中默认的初始值,如下:

好了到了这里关于EF Core中并发内容算是全部结束,别着急,还剩下最后一点内容,那就是终极解决并发方案,请继续往下看。

EntityFramework Core并发高级终极版解决方案

我们定义一个名为 RefreshConflict 枚举,当提交时定义是否为数据库或者客户端或者数据库和客户端数据合并:

    public enum RefreshConflict
    {
        StoreWins,

        ClientWins,

        MergeClientAndStore
    }

根据上述不同的获胜模式来刷新数据库中的值,我们定义如下刷新状态扩展方法:

    public static class RefreshEFStateExtensions
    {
        public static EntityEntry Refresh(this EntityEntry tracking,
            RefreshConflict refreshMode)
        {
            switch (refreshMode)
            {
                case RefreshConflict.StoreWins:
                    {
                        //当实体被删除时,重新加载设置追踪状态为Detached
                        //当实体被更新时,重新加载设置追踪状态为Unchanged
                        tracking.Reload();
                        break;
                    }
                case RefreshConflict.ClientWins:
                    {
                        PropertyValues databaseValues = tracking.GetDatabaseValues();
                        if (databaseValues == null)
                        {
                            //当实体被删除时,设置追踪状态为Detached,当然此时客户端无所谓获胜
                            tracking.State = EntityState.Detached;
                        }
                        else
                        {
                            //当实体被更新时,刷新数据库原始值
                            tracking.OriginalValues.SetValues(databaseValues);
                        }
                        break;
                    }
                case RefreshConflict.MergeClientAndStore:
                    {
                        PropertyValues databaseValues = tracking.GetDatabaseValues();
                        if (databaseValues == null)
                        {
                            /*当实体被删除时,设置追踪状态为Detached,当然此时客户端没有合并的数据
                             并设置追踪状态为Detached
                             */
                            tracking.State = EntityState.Detached;
                        }
                        else
                        {
                            //当实体被更新时,刷新数据库原始值
                            PropertyValues originalValues = tracking.OriginalValues.Clone();
                            tracking.OriginalValues.SetValues(databaseValues);
                            //如果数据库中对于属性有不同的值保留数据库中的值
#if SelfDefine
                databaseValues.PropertyNames // Navigation properties are not included.
                    .Where(property => !object.Equals(originalValues[property], databaseValues[property]))
                    .ForEach(property => tracking.Property(property).IsModified = false);
#else
                            databaseValues.Properties
                                    .Where(property => !object.Equals(originalValues[property.Name],
                                    databaseValues[property.Name]))
                                    .ToList()
                                    .ForEach(property =>
                                    tracking.Property(property.Name).IsModified = false);
#endif
                        }
                        break;
                    }
            }
            return tracking;
        }
    }

默认重试机制采取自定义重试三次:

        public static int SaveChanges(
       this DbContext context, Action<IEnumerable<EntityEntry>> resolveConflicts, int retryCount = 3)
        {
            if (retryCount <= 0)
            {
                throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必须大于0.");
            }

            for (int retry = 1; retry < retryCount; retry++)
            {
                try
                {
                    return context.SaveChanges();
                }
                catch (DbUpdateConcurrencyException exception) when (retry < retryCount)
                {
                    resolveConflicts(exception.Entries);
                }
            }
            return context.SaveChanges();
        }

另外找到一种重试机制包,安装如下程序包。

Microsoft.Practices.EnterpriseLibrary.TransientFaultHandlin

我们来简单看一个例子。我们自定义实现需要继承自该程序包中重试策略类 RetryStrategy ,此时需要实现内置如下抽象方法:

public abstract ShouldRetry GetShouldRetry();

最终我们自定义如下实现方法:

    public class ConcurrentcyStrategy : RetryStrategy
    {
        public ConcurrentcyStrategy(string name, bool firstFastRetry) : base(name, firstFastRetry)
        {

        }

        private bool ConcurrentcyShouldRetry(int retryCount, Exception lastException, out TimeSpan delay)
        {
            if (retryCount <= 0)
            {
                throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必须大于0");
            }
            if (lastException is ArgumentNullException)
            {
                return true;
            }
            return true;
        }
        public override ShouldRetry GetShouldRetry()
        {
            var shouldRetry = new ShouldRetry(ConcurrentcyShouldRetry);
            return shouldRetry;
        }
    }

上述是定义策略类,接下来我们需要实现 ITransientErrorDetectionStrategy 接口来实现需要获取到的异常类,定义如下:

    public class TransientErrorDetection<TException> : ITransientErrorDetectionStrategy
where TException : Exception
    {
        public bool IsTransient(Exception ex) => ex is TException;
    }

最后则是检测到我们所定义的异常并解析重试解析异常,如下:

    public class TransientDetectionExample
    {
        public int TransientDetectionTest(Func<string, bool> str, RetryStrategy retryStrategy)
        {
            RetryPolicy retryPolicy = new RetryPolicy(
               errorDetectionStrategy: new TransientDetection<ArgumentException>(),
               retryStrategy: retryStrategy);
            retryPolicy.Retrying += (sender, e) =>
                str(((ArgumentNullException)e.LastException).StackTrace);
            return retryPolicy.ExecuteAction(RetryCalcu);
        }
        public int RetryCalcu()
        {
            return -1;
        }
    }

我们给出如下测试数据,并给出参数为空,观察是否结果会执行RetryCalcu并返回-1:

            var stratrgy = new ConcurrentcyStrategy("test", true);
            var isNull = string.Empty;
            var example = new TransientDetectionExample();
            var result = example.TransientDetectionTest(d => isNull.Contains("11"), stratrgy);

所以此时基于上述情况我们可以利用现有的轮子来实现重试机制重载一个SaveChanges方法,最终重试机制我们可以定义如下另一个重载方法:

    public class TransientDetection<TException> : ITransientErrorDetectionStrategy
    where TException : Exception
    {
        public bool IsTransient(Exception ex) => ex is TException;
    }

    public static partial class DbContextExtensions
    {
        public static int SaveChanges(
       this DbContext context, Action<IEnumerable<EntityEntry>> resolveConflicts, int retryCount = 3)
        {
            if (retryCount <= 0)
            {
                throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必须大于0");
            }

            for (int retry = 1; retry < retryCount; retry++)
            {
                try
                {
                    return context.SaveChanges();
                }
                catch (DbUpdateConcurrencyException exception) when (retry < retryCount)
                {
                    resolveConflicts(exception.Entries);
                }
            }
            return context.SaveChanges();
        }
        public static int SaveChanges(
            this DbContext context, Action<IEnumerable<EntityEntry>> resolveConflicts, RetryStrategy retryStrategy)
        {
            RetryPolicy retryPolicy = new RetryPolicy(
                errorDetectionStrategy: new TransientDetection<DbUpdateConcurrencyException>(),
                retryStrategy: retryStrategy);
            retryPolicy.Retrying += (sender, e) =>
                resolveConflicts(((DbUpdateConcurrencyException)e.LastException).Entries);
            return retryPolicy.ExecuteAction(context.SaveChanges);
        }
    }

同上我们最终提交数据也分别对应两个方法,一个是自定义重试三次,一个利用轮子重试机制,如下:

    public static partial class DbContextExtensions
    {
        public static int SaveChanges(this DbContext context, RefreshConflict refreshMode, int retryCount = 3)
        {
            if (retryCount <= 0)
            {
                throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必须大于0");
            }

            return context.SaveChanges(
            conflicts => conflicts.ToList().ForEach(tracking => tracking.Refresh(refreshMode)), retryCount);
        }

        public static int SaveChanges(
            this DbContext context, RefreshConflict refreshMode, RetryStrategy retryStrategy) =>
                context.SaveChanges(
                    conflicts => conflicts.ToList().ForEach(tracking => tracking.Refresh(refreshMode)), retryStrategy);

    }

接下来我们来分别演示客户端获胜、数据库获胜以及客户端和数据库合并情况。首先我们放一张数据库默认数据以便对比:

EntityFramework Core并发客户端获胜

            var efContext1 = new EFCoreContext();
            var efContext2 = new EFCoreContext();

            var b1 = efContext1.Blogs.Find(1);
            var b2 = efContext2.Blogs.Find(1);

            b1.Name = nameof(efContext1);
            efContext1.SaveChanges();

            b2.Name = nameof(efContext2);
            b2.Url = "http://www.cnblogs.com/CreateSelf";
            efContext2.SaveChanges(RefreshConflict.ClientWins);

上述我们看到数据库中的值完全更新为在上下文2中的数据。

EntityFramework Core并发数据库获胜

            var efContext1 = new EFCoreContext();
            var efContext2 = new EFCoreContext();

            var b1 = efContext1.Blogs.Find(1);
            var b2 = efContext2.Blogs.Find(1);

            b1.Name = nameof(efContext1);
            efContext1.SaveChanges();

            b2.Name = nameof(efContext2);
            b2.Url = "http://www.cnblogs.com/CreateSelf";
            efContext2.SaveChanges(RefreshConflict.StoreWins);

此时我们看到数据库中的值为上下文1中的数据。

EntityFramework Core并发客户端和数据库合并

            var efContext1 = new EFCoreContext();
            var efContext2 = new EFCoreContext();

            var b1 = efContext1.Blogs.Find(1);
            var b2 = efContext2.Blogs.Find(1);

            b1.Name = nameof(efContext1);
            b1.Count = 10;
            efContext1.SaveChanges();

            b2.Name = nameof(efContext2);
            b1.Count = 11;
            b2.Url = "http://www.cnblogs.com/CreateSelf";
            efContext2.SaveChanges(RefreshConflict.
MergeClientAndStore);

上述我们看到数据库中的Name和Count是上下文1中的值,而Url则为上下文2中的值。

关于重试机制找到一个比较强大的轮子:https://github.com/App-vNext/Polly 看到一直在更新目前已经支持.net core。看如下加星应该是不错。

没有深入研究该重试机制,就稍微了解了下进行如下重试操作:

        public int Commit(Action change, Action<DbUpdateConcurrencyException> handleException, int retryCount = 3)
        {
            change();

            Policy
            .Handle<DbUpdateConcurrencyException>(ex => ex.Entries.Count > 0)
            .Or<ArgumentException>(ex => ex.ParamName == "exception")
            .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(10))
            .Execute(() => context.SaveChanges());

            return context.SaveChanges();
        }

同样调用上述UpdateBlog方法,上下文2中数据如下:

            readerWriter2.Commit(
            change: () =>
            {
                blog2.Name = nameof(readerWriter2);
                blog2.Count = 4;
                blog2.Url = "http://www.cnblogs.com/CreateMyself";
            },
            handleException: exception =>
            ............

时间: 2024-10-29 19:09:46

EF并发详解的相关文章

EntityFramework Core解决并发详解

话题(EntityFramework Core并发) 对于并发问题这个话题相信大家并不陌生,当数据量比较大时这个时候我们就需要考虑并发,对于并发涉及到的内容也比较多,在EF Core中我们将并发分为几个小节来陈述,让大家看起来也不太累,也容易接受,我们由浅入深.首先我们看下给出的Blog实体类.     public class Blog : IEntityBase     {        public int Id { get; set; }        public string Nam

Miniprofiler 监控ef执行详解

首先NuGet添加 相对应ef版本的Miniprofiler.ef引用 web.config文件中添加 <system.webServer> <handlers>  <add name="MiniProfiler" path="mini-profiler-resources/*" verb="*" type="System.Web.Routing.UrlRoutingModule" resourc

Microsoft SQL Server中的事务与并发详解

本篇索引: 1.事务 2.锁定和阻塞 3.隔离级别 4.死锁 一.事务 1.1 事务的概念 事务是作为单个工作单元而执行的一系列操作,比如查询和修改数据等. 事务是数据库并发控制的基本单位,一条或者一组语句要么全部成功,对数据库中的某些数据成功修改; 要么全部不成功,数据库中的数据还原到这些语句执行之前的样子. 比如网上订火车票,要么你定票成功,余票显示就减一张; 要么你定票失败获取取消订票,余票的数量还是那么多.不允许出现你订票成功了,余票没有减少或者你取消订票了,余票显示却少了一张的这种情况

ps -ef|grep详解

ps命令将某个进程显示出来 grep命令是查找 中间的|是管道命令 是指ps命令与grep同时执行 PS是LINUX下最常用的也是非常强大的进程查看命令 grep命令是查找,是一种强大的文本搜索工具,它能使用正则表达式搜索文本,并把匹配的行打印出来. grep全称是Global Regular Expression Print,表示全局正则表达式版本,它的使用权限是所有用户. 以下这条命令是检查java 进程是否存在:ps -ef |grep java 字段含义如下:UID       PID

Linux下高并发socket最大连接数所受的各种限制(详解)

1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄).可使用ulimit命令查看系统允许当前用户进程打开的文件数限制: [[email protected] ~]$ ulimit -n1024 这表示当前用户的每个进程最多允许同时打开1024个文件,这1024个文件中

高并发高流量网站架构详解

(推荐)高并发高流量网站架构详解 Web2.0的兴起,掀起了互联网新一轮的网络创业大潮.以用户为导 向的新网站建设概念,细分了网站功能和用户群,不仅成功的造就了一大批新生的网站,也极大的方便了上网的人们.但Web2.0以用户为导向的理念,使得新 生的网站有了新的特点--高并发,高流量,数据量大,逻辑复杂等,对网站建设也提出了新的要求. 本文围绕高并发高流量的网站架构设计问题,主要研究讨论了以下内容: 首先在整个网络的高度讨论了使用镜像网站,CDN内容分发网络等技术对负载均衡带来的便利及各自的优缺

详解IOS开发应用之并发Dispatch Queues (2011)

详解IOS开发应用之并发Dispatch Queues是本文哟啊介绍的内容,我们几乎可以调度队列去完成所有用线程来完成的任务.调度队列相对于线程代码更简单,易于使用,更高效.下面讲主要简述调度队列,在应用中如何使用调度队列去执行任务. 1.关于调度队列 所有的调度队列都是先进先出队列,因此,队列中的任务的开始的顺序和添加到队列中的顺序相同.GCD自动的为我们提供了一些调度队列,我们也可以创建新的用于具体的目的. 下面列出几种可用的调度队列类型以及如何使用. (1)serial queues(串行

8天掌握EF的Code First开发系列之3 管理数据库创建,填充种子数据以及LINQ操作详解

本篇目录 管理数据库创建 管理数据库连接 管理数据库初始化 填充种子数据 LINQ to Entities详解 什么是LINQ to Entities 使用LINQ to Entities操作实体 LINQ操作 懒加载和预加载 插入数据 更新数据 删除数据 本章小结 本人的实验环境是VS 2013 Update 5,windows 10,MSSQL Server 2008. 上一篇<Code First开发系列之领域建模和管理实体关系>,我们主要介绍了EF中“约定大于配置”的概念,如何创建数据

Java并发编程之---Lock框架详解

Java 并发开发:Lock 框架详解 摘要: 我们已经知道,synchronized 是Java的关键字,是Java的内置特性,在JVM层面实现了对临界资源的同步互斥访问,但 synchronized 粒度有些大,在处理实际问题时存在诸多局限性,比如响应中断等.Lock 提供了比 synchronized更广泛的锁操作,它能以更优雅的方式处理线程同步问题.本文以synchronized与Lock的对比为切入点,对Java中的Lock框架的枝干部分进行了详细介绍,最后给出了锁的一些相关概念. 一