在Asp.Net Core中集成Kafka(中)

  在上一篇中我们主要介绍如何在Asp.Net Core中同步Kafka消息,通过上一篇的操作我们发现上面一篇中介绍的只能够进行简单的首发kafka消息并不能够消息重发、重复消费、乐观锁冲突等问题,这些问题在实际的生产环境中是非常要命的,如果在消息的消费方没有做好必须的幂等性操作,那么消费者重复消费的问题会比较严重的,另外对于消息的生产者来说,记录日志的方式也不是足够友好,很多时候在后台监控程序中我们需要知道记录更多的关于消息的分区、偏移等更多的消息。而在消费者这边我们更多的需要去解决发送方发送重复消息,以及面对乐观锁冲突的时候该怎么解决这些问题,当然代码中的这些方案都是我们在实际生产中摸索出来的一些方案,当然这些都是需要后续进行进一步优化的,这里我们将分别就生产者和消费者中出现的问题来进行分析和说明。

 图一 消费者方几乎同一时刻接收到两条同样的Kafka消息(Grafana监控)

  一 生产者方  

using System;
using System.ComponentModel.DataAnnotations;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Abp.Dependency;
using Confluent.Kafka;
using JetBrains.Annotations;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Sunlight.Kafka.Abstractions;

[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]

namespace Sunlight.Kafka {
    /// <summary>
    /// Kafka 生产者的 Domain Service
    /// </summary>
    public class KafkaProducer : ISingletonDependency, IDisposableDependencyObjectWrapper<IProducer<string, string>>, IMessageProducer {
        private readonly IConfiguration _config;
        private readonly ILogger<KafkaProducer> _logger;
        private readonly IProducer<string, string> _producer;

        /// <summary>
        /// 构造 <see cref="KafkaProducer"/>
        /// </summary>
        /// <param name="config"></param>
        /// <param name="logger"></param>
        public KafkaProducer(IConfiguration config,
            ILogger<KafkaProducer> logger) {
            _config = config;
            _logger = logger;

            var producerConfig = new ProducerConfig {
                BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
                MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
            };

            var builder = new ProducerBuilder<string, string>(producerConfig);
            _producer = builder.Build();
            Object = _producer;
        }

        /// <summary>
        /// 发送事件
        /// </summary>
        /// <param name="event"></param>
        public void Produce(IIntegrationEvent @event) {
            ProduceAsync(@event).GetAwaiter().GetResult();
        }

        /// <summary>
        /// 发送事件
        /// </summary>
        /// <param name="event"></param>
        public async Task ProduceAsync(IIntegrationEvent @event) {
            await ProduceAsync(@event, @event.GetType().Name);
        }

        /// <inheritdoc />
        public async Task ProduceAsync(IIntegrationEvent @event, [NotNull] string eventName) {
            if (string.IsNullOrEmpty(eventName)) {
                throw new ArgumentNullException(nameof(eventName));
            }

            var topic = _config.GetValue<string>($"Kafka:Topics:{eventName}");
            if (string.IsNullOrEmpty(topic)) {
                throw new NullReferenceException("topic不能为空");
            }
            var key = Guid.NewGuid().ToString();
            try {
                var json = JsonConvert.SerializeObject(@event);
                var dr = await _producer.ProduceAsync(topic, new Message<string, string> { Key = key, Value = json });
                _logger.LogInformation($"成功发送消息 {dr.Key}.{ @event.Key}, offSet: {dr.TopicPartitionOffset}");
            } catch (ProduceException<string, string> ex) {
                _logger.LogError(ex, $"发送失败 {topic}.{key}.{ @event.Key}, 原因 {ex.Error.Reason} ");
                throw new ValidationException("当前服务器繁忙,请稍后再尝试");
            }
        }

        /// <summary>
        /// 释放方法
        /// </summary>
        public void Dispose() {
            _producer?.Dispose();
        }

        /// <summary>
        /// 要释放的对象
        /// </summary>
        public IProducer<string, string> Object { get; }
    }
}

  在这里我们来看看IMessageProducer接口定义

using System.Threading.Tasks;
using Sunlight.Kafka.Abstractions;

namespace Sunlight.Kafka
{
    /// <summary>
    /// 消息的生产者
    /// </summary>
    public interface IMessageProducer
    {
        /// <summary>
        /// 发送事件
        /// </summary>
        /// <param name="event"></param>
        void Produce(IIntegrationEvent @event);

        /// <summary>
        /// 发送事件
        /// </summary>
        /// <param name="event"></param>
        Task ProduceAsync(IIntegrationEvent @event);

        /// <summary>
        /// 发送事件
        /// </summary>
        /// <param name="event"></param>
        /// <param name="eventName">指定事件的名称</param>
        /// <returns></returns>
        Task ProduceAsync(IIntegrationEvent @event, string eventName);
    }
}

  在接口中我们分别定义了消息发送的同步和异步及重载方法,另外我们还继承了ABP中的IDisposableDependencyObjectWrapper接口,关于这个接口我们来看一下接口的声明和定义(想了解更多的关于ABP的知识,也可点击这里关注本人之前的博客)。

using System;

namespace Abp.Dependency
{
    /// <summary>
    /// This interface is used to wrap an object that is resolved from IOC container.
    /// It inherits <see cref="IDisposable"/>, so resolved object can be easily released.
    /// In <see cref="IDisposable.Dispose"/> method, <see cref="IIocResolver.Release"/> is called to dispose the object.
    /// This is non-generic version of <see cref="IDisposableDependencyObjectWrapper{T}"/> interface.
    /// </summary>
    public interface IDisposableDependencyObjectWrapper : IDisposableDependencyObjectWrapper<object>
    {

    }
}

  如果想了解关于这个接口更多的信息,请点击这里

  另外在实际发送消息的时候,我们需要记录消息的具体Partition以及Offset这样我们就能够快速找到这条消息,从而方便后面的重试,另外有时候由于服务器的网络问题的时候可能抛出MessageTimeout的消息,这个时候我们需要通过Confluent.Kafka库中的ProduceException异常来捕获这些信息记录抛出异常信息,另外在我们的业务层需要给出一个“当前服务器繁忙,请稍后再尝试”这样一个友好的提示信息。

  另外在发送消息的时候,每一次都会产生一个Guid类型的Key发送到消息的消费方,这个Key将会作为接收消息的实体KafkaReceivedMessage 的主键Id,这个会在后文有具体的解释。

  二 消费者方

  在我们这篇文章记录的重点就是消费方,因为这里我们需要解决诸如消息重复消费以及乐观锁冲突的一系列问题,后面我们将会就这些问题来一一进行讲解和说明。

  2.1 如何解决消息重复消费

  在这里我们通过KafkaReceivedMessage这样一个实体来在数据库中记录收到的消息,并且在发送方每次发送时候传递唯一的一个Guid,这样我们就简单利用每次插入消息时主键Id不允许重复来处理重复发送的同一条消息的问题,我们首先来看看这个实体。

/// <summary>
    /// Kafka消费者收到的消息记录
    /// </summary>
    public class KafkaReceivedMessage : Entity<Guid> {
        /// <summary>
        /// 消费者组
        /// </summary>
        [MaxLength(50)]
        [Required]
        public string Group { get; set; }

        /// <summary>
        /// 消息主题
        /// </summary>
        [MaxLength(100)]
        [Required]
        public string Topic { get; set; }

        /// <summary>
        /// 消息编号, 用于记录日志, 便于区分, 建议用编号
        /// </summary>
        [MaxLength(50)]
        public string Code { get; set; }

        /// <summary>
        /// 消息内容
        /// </summary>
        [MaxLength(int.MaxValue)]
        public string Content { get; set; }

        /// <summary>
        /// kafka 中的 partition
        /// </summary>
        public int? Partition { get; set; }

        /// <summary>
        /// kafka 中的 offset
        /// </summary>
        [MaxLength(100)]
        [Required]
        public string Offset { get; set; }

        /// <summary>
        /// 接受时间
        /// </summary>
        public DateTime ReceivedTime { get; set; }

        /// <summary>
        /// 过期时间
        /// </summary>
        public DateTime? ExpiresAt { get; set; }

        /// <summary>
        /// 重试次数
        /// </summary>
        public int Retries { get; set; }

        /// <summary>
        /// 不是用Guid做全局唯一约束的消息
        /// </summary>
        public bool Old { get; set; }

        /// <inheritdoc />
        public override string ToString() {
            return $"{Group}.{Topic}.{Id}.{Code},{Partition}:{Offset}";
        }

  有了这个实体,我们在接收到这条消息的时候我们首先会尝试将这条消息存入到数据库,如果存入成功就说明不是重复消息,如果存入失败,就记录Kafka收到重复消息,我们先来看一下具体的实现。

/// <summary>
    /// Kafka 消费者的后台服务基础类
    /// </summary>
    /// <typeparam name="T">事件类型</typeparam>
    public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
        /// <summary>
        /// IOC服务提供方
        /// </summary>
        protected IServiceProvider Services { get; }

        /// <summary>
        /// 配置文件
        /// </summary>
        protected IConfiguration Config { get; }

        /// <summary>
        /// 主题
        /// </summary>
        protected string Topic { get; }

        /// <summary>
        /// 日志
        /// </summary>
        protected ILogger<KafkaConsumerHostedService<T>> Logger { get; }

        /// <summary>
        /// DbContext的类型, 必须是业务中实际的类型
        /// </summary>
        protected Type DbContextType { get; }

        /// <summary>
        /// 消费者的配置
        /// </summary>
        protected ConsumerConfig ConsumerConfig { get; }

        /// <summary>
        /// 保存失败时的重复次数, 一般用于 DbUpdateConcurrencyException
        /// </summary>
        protected int SaveDataRetries { get; }

        /// <summary>
        /// 构造 <see cref="KafkaConsumerHostedService{T}"/>
        /// </summary>
        /// <param name="services"></param>
        /// <param name="config"></param>
        /// <param name="logger"></param>
        /// <param name="dbContext">DbContext的类型, 必须是业务中实际的类型</param>
        protected KafkaConsumerHostedService(IServiceProvider services,
            IConfiguration config,
            ILogger<KafkaConsumerHostedService<T>> logger, DbContext dbContext) {
            Services = services;
            Config = config;
            Logger = logger;
            DbContextType = dbContext.GetType();

            Topic = Config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");
            if (string.IsNullOrWhiteSpace(Topic)) {
                Logger.LogCritical($"未能找到{typeof(T).Name}所对应的Topic");
                Environment.Exit(0);
            }

            const int MaxRetries = 5;
            const int DefaultRetries = 2;
            SaveDataRetries = Config.GetValue<int?>("Kafka:SaveDataRetries") ?? DefaultRetries;
            SaveDataRetries = Math.Min(SaveDataRetries, MaxRetries);

            ConsumerConfig = new ConsumerConfig {
                BootstrapServers = Config.GetValue<string>("Kafka:BootstrapServers"),
                AutoOffsetReset = AutoOffsetReset.Earliest,
                GroupId = Config.GetValue<string>("Application:Name"),
                EnableAutoCommit = true
            };
        }

        /// <summary>
        /// 消费该事件,比如调用 Application Service 持久化数据等
        /// </summary>
        /// <param name="event">事件内容</param>
        protected abstract void DoWork(T @event);

        /// <summary>
        /// 保存收到的消息到数据库, 防止重复消费
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        private async Task<bool> SaveMessageAsync(KafkaReceivedMessage message) {
            using var scope = Services.CreateScope();
            var service = (DbContext)scope.ServiceProvider.GetRequiredService(DbContextType);
            service.Set<KafkaReceivedMessage>().Add(message);
            try {
                await service.SaveChangesAsync();
                Logger.LogInformation($"Kafka 收到消息 {message}");
                return true;
            } catch (DbUpdateException ex) when (ex.InnerException?.Message.Contains("PRIMARY KEY") == true) {
                Logger.LogError($"Kafka 收到重复消息 {message}");
            } finally {
                service.Entry(message).State = EntityState.Detached;
            }
            return false;
        }

        /// <summary>
        /// 反序列化消息
        /// </summary>
        /// <param name="result"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        protected virtual async Task<T> DeserializeEvent(ConsumeResult<string, string> result, KafkaReceivedMessage message) {
            T @event;
            try {
                @event = JsonConvert.DeserializeObject<T>(result.Value);
            } catch (Exception e) when(e is JsonReaderException || e is JsonSerializationException || e is JsonException) {
                @event = default;
                if (!await SaveMessageAsync(message))
                    Logger.LogError(e, ErrorMessageTemp, message, e.InnerException?.Message ?? e.Message);
            }

            if (Guid.TryParse(result.Key, out var key) && result.Key != @event?.Key) {
                message.Code = @event?.Key;
                message.Id = key;
            } else {
                message.Id = Guid.NewGuid();
                message.Code = result.Key;
                message.Old = true;
            }

            return await SaveMessageAsync(message) ? @event : default;
        }

        private void TryDoWork(T @event, KafkaReceivedMessage message, int saveRetries) {
            if (saveRetries < 0) {

                Logger.LogError(ErrorMessageTemp, message, "乐观锁冲突");
            }

            try {
                DoWork(@event);
                // 在遇到 乐观锁冲突的时候, 需要重试几次, 因为这很容易就发生了.
            } catch (DbUpdateConcurrencyException) {
                TryDoWork(@event, message, --saveRetries);
            } catch (AbpDbConcurrencyException) {
                TryDoWork(@event, message, --saveRetries);
            }

        }

        const string ErrorMessageTemp = "Kafka 消息 {0} 消费失败, 原因: {1}";

        /// <summary>
        /// 构造 Kafka 消费者实例,监听指定 Topic,获得最新的事件
        /// </summary>
        /// <param name="stoppingToken">终止标识</param>
        /// <returns></returns>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
            await Task.Factory.StartNew(async () => {
                var builder = new ConsumerBuilder<string, string>(ConsumerConfig);
                using var consumer = builder.Build();
                consumer.Subscribe(Topic);
                //当前事件的Key
                Logger.LogInformation($"Kafka 消费者订阅 {Topic}");
                while (!stoppingToken.IsCancellationRequested) {
                    try {
                        var result = consumer.Consume(stoppingToken);
                        //包含分区和OffSet的详细信息
                        var message = new KafkaReceivedMessage {
                            Group = ConsumerConfig.GroupId,
                            Topic = result.Topic,
                            Content = result.Value,
                            Partition = result.Partition,
                            Offset = result.Offset.ToString(),
                            ReceivedTime = DateTime.Now
                        };
                        try {
                            var @event = await DeserializeEvent(result, message);
                            if (@event == null)
                                continue;
                            TryDoWork(@event, message, SaveDataRetries);
                        } catch (ValidationException ex) {
                            Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
                        } catch (AbpValidationException ex) {
                            Logger.LogError(ex, ErrorMessageTemp, message, GetValidationErrorNarrative(ex));
                        } catch (SqlException ex) {
                            Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
                        } catch (Exception ex) {
                            Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
                        }
                    } catch (OperationCanceledException ex) {
                        consumer.Close();
                        Logger.LogInformation(ex, "Kafka 消费者结束,退出后台线程");
                    } catch (ConsumeException ex) {
                        Logger.LogError(ex, "Kafka 消费者产生异常,");
                    } catch (KafkaException ex) {
                        Logger.LogError(ex, "Kafka 产生异常,");
                    }
                }
            }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        private static string GetValidationErrorNarrative(AbpValidationException validationException) {
            var detailBuilder = new StringBuilder();
            detailBuilder.AppendLine("验证过程中检测到以下错误");

            foreach (var validationResult in validationException.ValidationErrors) {
                detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
                detailBuilder.AppendLine();
            }

            return detailBuilder.ToString();
        }
    }

  这里我们通过SaveMessageAsync这个异步方法来保存数据到数据库,检测的时候我们通过捕获InnerException里面的Message中是否包含"PRIMARY KEY"来判断是不是主键冲突的。

  3.2 乐观锁冲突校验

  在做了第一步消息重复消费校验后,我们需要利用数据库中的DbUpdateConcurrencyException来捕获乐观锁的冲突,因为我们的业务处理都是通过继承KafkaConsumerHostedService这个基类,然后重载里面的DoWork方法来实现对业务代码的调用的,当然由于Kafka消息的异步特性,所以不可避免多个消息同时修改同一个实体,而由于这些异步消息产生的DbUpdateConcurrencyException就不可避免,在这里我们采用的默认次数是2次,最多可以重试5次的机制,通过这种方式来保证乐观锁冲突,如果5次重试还是失败则会提示乐观锁冲突,并且日志记录当前错误内容,通过这种方式能够在一定程度上减少由于并发问题导致的消费者消费失败的概率,当然关于这方面的探索还在随着业务的不断深入而不断去优化,期待后续的持续关注。

原文地址:https://www.cnblogs.com/seekdream/p/12038356.html

时间: 2024-11-08 21:43:40

在Asp.Net Core中集成Kafka(中)的相关文章

基于Jenkins Pipeline的ASP.NET Core持续集成实践

原文:基于Jenkins Pipeline的ASP.NET Core持续集成实践 最近在公司实践持续集成,使用到了Jenkins的Pipeline来提高团队基于ASP.NET Core API服务的集成与部署,因此这里总结一下. 一.关于持续集成与Jenkins Pipeline 1.1 持续集成相关概念 互联网软件的开发和发布,已经形成了一套标准流程,最重要的组成部分就是持续集成(Continuous integration,简称 CI) . 持续集成指的是,频繁地 (一天多次) 将代码集成到

Asp.Net Core 如何在 IIS 中设置环境变量

当运行一个 Asp.Net Core 应用的时候, WebHostBuilder 根据环境变量来判断当前运行的是哪个环境,可能是 Development,Staging或者Production.你也可以设置成随便的一个字符串. 这个链接将会告诉你 如何在各种平台各种环境中设置环境变量.但如果你使用 IIS来代理 Asp.Net Core.你需要在 web.config 中设置环境变量 <configuration> <system.webServer> <handlers&g

ASP.NET Core Web API 集成测试中使用 Bearer Token

在 ASP.NET Core Web API 集成测试一文中, 我介绍了ASP.NET Core Web API的集成测试. 在那里我使用了测试专用的Startup类, 里面的配置和开发时有一些区别, 例如里面去掉了用户身份验证相关的中间件. 但是有些被测试的行为里面需要用到身份/授权信息. 所以本文就介绍一下在API集成测试中发送请求时使用Bearer Token作为Authorization Header的情况. 集成测试中使用Bearer Token 我这个项目里生产时使用的是Identi

asp.net core 系列 6 路由(中)

一.URL 生成 接着上篇讲MVC的路由,MVC 应用程序可以使用路由的 URL 生成功能,生成指向操作的 URL 链接. 生成 URL 可消除硬编码 URL,使代码更稳定.更易维护. 此部分重点介绍 MVC 提供的 URL 生成功能,并且仅涵盖 URL 生成工作原理的基础知识. IUrlHelper 接口用于生成 URL,是 MVC 与路由之间的基础结构的基础部分. 在控制器.视图和视图组件中,可通过 Url 属性找到 IUrlHelper 的实例. // // mvc 框架的Controll

自动挡换手动挡:在 ASP.NET Core 3.0 Middleware 中手动执行 Controller Action

由于遭遇 System.Data.SqlClient 的性能问题(详见之前的博文),向 .NET Core 3.0 的升级工作被迫提前了.在升级过程中遇到了一个问题,我们在 Razor Class Library 中实现的自定义错误页面无法在 ASP.NET Core 3.0 Preview 5 中正常工作,问题原因详见博问 Razor Class Library 中的属性路由在 ASP.NET Core 3.0 中不起作用 . 由于属性路由不起作用的问题没找到解决方法,于是被迫采用了另外一种解

ASP.NET Core 3.0 WebApi中使用Swagger生成API文档简介

参考地址,官网:https://docs.microsoft.com/zh-cn/aspnet/core/tutorials/getting-started-with-swashbuckle?view=aspnetcore-2.2&tabs=visual-studio 与https://www.jianshu.com/p/349e130e40d5 当一个WebApi完成之后,书写API文档是一件非常头疼的事,因为不仅要写得清楚,能让调用接口的人看懂,又是非常耗时耗力的一件事.在之前的一篇随笔中(

Asp.Net Core 中间件应用实践中你不知道的那些事

一.概述 这篇文章主要分享Endpoint 终结点路由的中间件的应用场景及实践案例,不讲述其工作原理,如果需要了解工作原理的同学, 可以点击查看以下两篇解读文章: Asp.Net Core EndPoint 终结点路由工作原理解读 ASP.NET CORE 管道模型及中间件使用解读 1.1 中间件(Middleware)的作用 我们知道,任何的一个web框架都是把http请求封装成一个管道,每一次的请求都是经过管道的一系列操作,最终到达我们写的代码中.那么中间件就是在应用程序管道中的一个组件,用

ASP.NET Core 2 preview 1中Program.cs,Startup.cs和CreateDefaultBuilder的探索

翻译自:Exploring Program.cs, Startup.cs and CreateDefaultBuilder in ASP.NET Core 2 preview 1 ASP.NET Core 2.0的目标之一是已经被简洁化的基础模板.简化了其基本使用,并且让开始一个新项目变得更加简单. 明显从表面上来看,新的Program和Startup类型相比于ASP.NET Core 1.0更加简单.现在,我将从新的WebHost.CreateDefaultBuilder()方法出发,看看它是

初探gitlab &amp; gitlab-runner &amp; asp.net core持续集成

文章简介  gitlab & gitlab-runner 简介 基于gitlab & gitlab-runner 的asp.net core webapi 极简持续集成实践 gitlab & gitlab-runner 简介 写在最前面,文中示例使用到了docker & docker-compose 相关知识,文中的gitlab server 以及 gitlab-runner都是使用docker容器,以及gitlab-runner的执行方式也是docker模式,相关内容不再