asp.net core microservices 架构之Task 事务一致性 事件源 详解

一 aspnetcore之task的任务状态-CancellationToken

我有一篇文章讲解了asp.net的线程方面的知识。我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作也比较简化,人为控制比较弱。那这一节为什么要说线程尼?大家有没有遇到过,部署或者人为的去重启,往往会造成一些不必要的业务中断,web api有这样的情况,后台程序也有这样的情况。异常和系统硬件的故障已经让我们防不胜防了,那么就尽量的人为的情况少那么一点点,系统的健壮性也就高那么一点点。

   目前有两个技巧可以处理这一类事情,第一是让主机graceful方式关闭,并且超时时间设置长一点,这样就有足够的时间,让运行的请求执行完毕,看代码:

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .Build();

    await host.RunAsync();
}

这是官方上的一段话:IHostedService 是执行代码的入口点。 每个 IHostedService 实现都按照 ConfigureServices 中服务注册的顺序执行。 主机启动时,每个 IHostedService 上都会调用 StartAsync,主机正常关闭时,以反向注册顺序调用 StopAsync

//关闭超时值

ShutdownTimeout 设置 StopAsync 的超时值。 默认值为 5 秒。
Program.Main 中的以下选项配置将默认值为 5 秒的关闭超时值增加至 20 秒:
C#

//复制
var host = new HostBuilder()
    .ConfigureServices((hostContext, services) =>
    {
        services.Configure<HostOptions>(option =>
        {
            option.ShutdownTimeout = System.TimeSpan.FromSeconds(20);
        });
    })
    .Build();

而我们看看源码中StopAsync方法:

/// <summary>
        /// Attempts to gracefully stop the host with the given timeout.
        /// </summary>
        /// <param name="host"></param>
        /// <param name="timeout">The timeout for stopping gracefully. Once expired the
        /// server may terminate any remaining active connections.</param>
        /// <returns></returns>
        public static Task StopAsync(this IHost host, TimeSpan timeout)
        {
            return host.StopAsync(new CancellationTokenSource(timeout).Token);
        }

系统接受到Ctrl+c和sign,就会调用这个方法,以比较礼貌的方式关闭。

那么看源码,这两个都是具有阻塞功能的异步方法,对应的非异步方法,都是同步调用的这两个方法:

/// <summary>
        /// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered.
        /// </summary>
        /// <param name="host">The <see cref="IHost"/> to run.</param>
        /// <param name="token">The token to trigger shutdown.</param>
        public static async Task RunAsync(this IHost host, CancellationToken token = default)
        {
            using (host)
            {
                await host.StartAsync(token);

                await host.WaitForShutdownAsync(token);
            }
        }

        /// <summary>
        /// Returns a Task that completes when shutdown is triggered via the given token.
        /// </summary>
        /// <param name="host">The running <see cref="IHost"/>.</param>
        /// <param name="token">The token to trigger shutdown.</param>
        public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default)
        {
            var applicationLifetime = host.Services.GetService<IApplicationLifetime>();
        //当前token执行取消的时候,激发这个委托。
            token.Register(state =>
            {
                ((IApplicationLifetime)state).StopApplication(); //当进程取消的时候,通知注册IApplicationLifetime的进程也取消。
            },
            applicationLifetime);

            var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
            //应用程序生命周期中的停止应用token激发的时候,执行这个委托,去释放阻塞,执行host的停止方法。
            applicationLifetime.ApplicationStopping.Register(obj =>
            {
                var tcs = (TaskCompletionSource<object>)obj;
                tcs.TrySetResult(null);
            }, waitForStop);

            await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);执行完毕。
// Host will use its default ShutdownTimeout if none is specified.

           await host.StopAsync(); //调用关闭 }

具体原理就是Host使用这个applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource这个类,使用这个类是可以控制task的取消执行的。

所以,两个解决方案,如果是webapi,就将将超时时间设置大一点。

第二,如果在非webapi中,使用了超长执行的Task,就使用CancellationTokenSource吧,将它的Token传进去,在外边判断是否执行中,如果不在执行中,就执行Cancel方法,当然在task内部,也可以

判断token,是否自己主动取消掉。

这是官方的一个例子,了解CancellationTokenSource这个类,那么就会明白如何去处理Task

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class Example
{
   public static void Main()
   {
      // Define the cancellation token.
      CancellationTokenSource source = new CancellationTokenSource();
      CancellationToken token = source.Token;

      Random rnd = new Random();
      Object lockObj = new Object();

      List<Task<int[]>> tasks = new List<Task<int[]>>();
      TaskFactory factory = new TaskFactory(token);
      for (int taskCtr = 0; taskCtr <= 10; taskCtr++) {
         int iteration = taskCtr + 1;
         tasks.Add(factory.StartNew( () => {
                                       int value;
                                       int[] values = new int[10];
                                       for (int ctr = 1; ctr <= 10; ctr++) {
                                          lock (lockObj) {
                                             value = rnd.Next(0,101);
                                          }
                                          if (value == 0) {
                                             source.Cancel();
                                             Console.WriteLine("Cancelling at task {0}", iteration);
                                             break;
                                          }
                                          values[ctr-1] = value;
                                       }
                                       return values;
                                    }, token));   

      }
      try {
         Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(),
                                                      (results) => {
                                                         Console.WriteLine("Calculating overall mean...");
                                                         long sum = 0;
                                                         int n = 0;
                                                         foreach (var t in results) {
                                                            foreach (var r in t.Result) {
                                                                  sum += r;
                                                                  n++;
                                                               }
                                                         }
                                                         return sum/(double) n;
                                                      } , token);
         Console.WriteLine("The mean is {0}.", fTask.Result);
      }
      catch (AggregateException ae) {
         foreach (Exception e in ae.InnerExceptions) {
            if (e is TaskCanceledException)
               Console.WriteLine("Unable to compute mean: {0}",
                                 ((TaskCanceledException) e).Message);
            else
               Console.WriteLine("Exception: " + e.GetType().Name);
         }
      }
      finally {
         source.Dispose();
      }
   }
}
// Repeated execution of the example produces output like the following:
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//
//       Cancelling at task 10
//       Unable to compute mean: A task was canceled.
//
//       Calculating overall mean...
//       The mean is 5.29545454545455.
//
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//
//       Cancelling at task 6
//       Unable to compute mean: A task was canceled.
//
//       Calculating overall mean...
//       The mean is 4.97363636363636.
//
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//
//       Calculating overall mean...
//       The mean is 4.86545454545455.

二   业务的事务一致性

因为微服务的理念中是牺牲了系统业务的一致性,我们知道事务的一致性都是靠的数据库的本地事务,或者分布式事务来实现的,但是微服务是严禁使用分布式事务。那么如何保证整个系统的事务完整性尼?举个例子:比如订单服务中,新接受一个订单,这个订单需要同步到库房的订单子系统,那么在订单服务中的这个订单在最后更新自己订单状态的时候,是需要同时发送异步消息给库房消息服务器的,如果这时候网络断了,本地订单更新成功了,但是异步消息没有发送过去,这样就会引起业务的缺失,目前有两个方法可以实现:

第一:为本地数据库创建事件源表,记录下消息和本地数据更新的全部状态,比如订单在更新前就可以添加事件,事件状态可以有,准备更新订单,订单已更新,发送消息队列,消息发送成功等。

这样的好处就是最后跟踪这个事务处理的时候,每个步骤都可以找到,而且完全不用事务。最后job去跟踪失败情况,然后根据情况处理。

第二:只是用本地事务,就是在订单更新的时候,同时给事件源表添加消息内容,然后让后台job去发送消息,这样是最简单和最稳定的方式。

当然,最合适的还是第一种方法,虽然代码能复杂点,但是最后的效果是一样的,而且效率是比第二种方法更高效,但是考虑打事件源表并不是并发频繁操作的表,所以这个看自己的喜好了。

针对一个系统,业务的一致性,也并不是全部,针对于一些关键业务做好一致性,但是很多其实可以设计成为在用户ui层面去补偿操作,唯一的坏处就是一部分数据需要重新填写。

三     事件源

这个事件源并不是为了解决业务的一致性,而是为了应对大数据量的请求,比如,客户管理,一个分类下有上万条记录需要处理,那么往往我们需要对性能和实时反馈上有个折衷。

系统设计如下:

  这样看来,会增加1个api服务和一个后台服务,但是对于系统的问题,却得到了一个缓冲,或许这个设计不是最好的,但是却可以做一个抛砖引玉的案例,现实中案例非常多变,所以设计也会有很多方案。

  因为目前我们看到的大部分app,请求的时候,某些功能确实会有少许等待事件,这个都是为了折衷,当然这一篇内容并不是讨论云或者分布式计算,但是在后台这块处理越快,反馈也越快。

  这套方案的设计理念其实就是异步处理,可以有自己的优化空间,而并不会消耗api这个轻量级服务,后台分布式计算越快,app反应也越快,到一定程度,就并不会感觉到有延迟,这就是大师比喻的鼻子与眼睛的关系。

原文地址:https://www.cnblogs.com/ck0074451665/p/10332044.html

时间: 2024-10-06 13:31:48

asp.net core microservices 架构之Task 事务一致性 事件源 详解的相关文章

asp.net core microservices 架构之eureka服务发现

一 简介 微服务将需多的功能拆分为许多的轻量级的子应用,这些子应用相互调度.好处就是轻量级,完全符合了敏捷开发的精神.我们知道ut(单元测试),不仅仅提高我们的程序的健壮性,而且可以强制将类和方法的设计尽量的单一化.那么微服务也是这样,敏捷对于软件工程的意义就是快速开发,验证市场需求,然后快速改进,从而适应市场节奏.什么东西要快速,就必须轻量级.大家知道一个应用的复杂程度,完全是和这个项目的功能和代码数量挂钩的,这是软件自诞生就存在的问题,一个设计不好的软件,最后会让这个软件更新和改进变的非常复

asp.net core microservices 架构之 分布式自动计算(一)

   一:简介   自动计算都是常驻内存的,没有人机交互.我们经常用到的就是console job和sql job了.sqljob有自己的宿主,与数据库产品有很关联,暂时不提.console job使用quartz.net框架,目前3.x已经支持netcore. 如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现.但是分布式就不可同日而语了.如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如

Spring事务管理(详解+实例)

写这篇博客之前我首先读了<Spring in action>,之后在网上看了一些关于Spring事务管理的文章,感觉都没有讲全,这里就将书上的和网上关于事务的知识总结一下,参考的文章如下: Spring事务机制详解 Spring事务配置的五种方式 Spring中的事务管理实例详解 1 初步理解 理解事务之前,先讲一个你日常生活中最常干的事:取钱. 比如你去ATM机取1000块钱,大体有两个步骤:首先输入密码金额,银行卡扣掉1000元钱:然后ATM出1000元钱.这两个步骤必须是要么都执行要么都

Mysql主从数据库架构的复制原理及配置详解

1 复制概述 Mysql内建的复制功能是构建大型,高性能应用程序的基础.将Mysql的数据分布到多个系统上去,这种分布的机制,是通过将Mysql的某一台主机的数据复制到其它主机(slaves)上,并重新执行一遍来实现的.复制过程中一个服务器充当主服务器,而一个或多个其它服务器充当从服务器.主服务器将更新写入二进制日志文件,并维护文件的一个索引以跟踪日志循环.这些日志可以记录发送到从服务器的更新.当一个从服务器连接主服务器时,它通知主服务器从服务器在日志中读取的最后一次成功更新的位置.从服务器接收

(转)Spring事务管理(详解+实例)

文章转自:http://blog.csdn.net/trigl/article/details/50968079 写这篇博客之前我首先读了<Spring in action>,之后在网上看了一些关于Spring事务管理的文章,感觉都没有讲全,这里就将书上的和网上关于事务的知识总结一下,参考的文章如下: Spring事务机制详解 Spring事务配置的五种方式 Spring中的事务管理实例详解 一.初步理解 理解事务之前,先讲一个你日常生活中最常干的事:取钱.  比如你去ATM机取1000块钱,

SQL Server 事务隔离级别详解

原文:SQL Server 事务隔离级别详解 标签: SQL SEERVER/MSSQL SERVER/SQL/事务隔离级别选项/设计数据库事务级别 SQL 事务隔离级别 概述 隔离级别用于决定如果控制并发用户如何读写数据的操作,同时对性能也有一定的影响作用. 步骤 事务隔离级别通过影响读操作来间接地影响写操作:可以在回话级别上设置事务隔离级别也可以在查询(表级别)级别上设置事务隔离级别.事务隔离级别总共有6个隔离级别:READ UNCOMMITTED(未提交读,读脏),相当于(NOLOCK)R

Asp.Net MVC3 简单入门第一季(三)详解Controller之Filter

前言 前面两篇写的比较简单,刚开始写这个系列的时候我面向的对象是刚开始接触Asp.Net MVC的朋友,所以写的尽量简单.所以写的没多少技术含量.把这些技术总结出来,然后一简单的方式让更多的人很好的接受这是我一直努力的方向.后面会有稍微复杂点的项目!让我们一起期待吧! 此文我将跟大家介绍一下Asp.Net MVC3 Filter的一些用法.你会了解和学习到全局Fileter,Action Filter等常用用法. 第一节:Filter知识储备 项目大一点总会有相关的AOP面向切面的组件,而MVC

Android task和back stack详解(官方文档翻译)

一个应用往往包含很多activities.每个activity都应围绕着用户可执行的特定动作来设计,并且可以启动其它activitie.例如,一个email应用可能可能有一个显示新邮件列表的activity.当用户选择一个邮件,一个新的activity被打开以显示邮件内容. 一个activity也可以打开同一设备上存在于其它应用的activitie,例如,如果你的应用想要发送一个邮件,你可以定义一个intent来执行一个"send"动作并包含一些数据,比如一个地址和一条信息.另一个应用

SpringMVC 事务配置完全详解

1.本设计采用springMVC+Hibernate ,事务控制在Service层,直接上代码 2.web.xml <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.5" xsi:schemaLocation="http://java.sun.com/