asp.net core mcroservices 架构之 分布式日志(三):集成kafka

一 kafka介绍

kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了。硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快。kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性。最熟悉的消息队列框架有ActiveMQRabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列的一些瓶颈,比如并发,rabbitMQ在多个处理程序下,并不能保证执行顺序,还是必须自己去处理独占,而kafka使用consumer group的方式,实现了可以多个处理程序处理一个topic下的记录。如图:

每个分区的记录保证能被每个组接受,这样可以并发去处理一个topic的记录,而且扩展组,则可以随意根据应用需求去扩展你的应用程序,但是每个组的消费者不能超过分区的数量。

kafka Distribution 提供了容错的功能,每一个partition都有一个服务器叫leader,还有零个或者一个以上的服务器叫follower,当这些follower都在同步数据的时候,leader扛起所有的写和读,当leader挂掉,follower会随机选取一个服务器当leader,当然必须有几个follower同步时 in-sync的。还有kafka虽然的那个记录具有原子性,但是并不支持事务。

因为这一篇并不是专门讲解kafka,所以点到为止。

二     扩展服务 开发

以前讲过,netcore的一个很重要的特性就是支持依赖注入,在这里一切皆服务。那么如果需要kafka作为日志服务的终端,就首先需要kafka服务,下面咱们就开发一个kafka服务。

首先,服务就是需要构建,这是netcore开发服务的第一步,我们首先建立一个IKafkaBuilder.cs接口类,如下:

homusing Microsoft.Extensions.DependencyInjection;

namespace Walt.Freamwork.Service
{
    public interface IKafkaBuilder
    {
         /// <summary>
        /// Gets the <see cref="IServiceCollection"/> where Logging services are configured.
        /// </summary>
        IServiceCollection Services { get; }
    }
}

再实现它,KafkaBuilder.cs

using Microsoft.Extensions.DependencyInjection;

namespace Walt.Freamwork.Service
{
    public class KafkaBuilder : IKafkaBuilder
    {
        public IServiceCollection Services {get;}

        public KafkaBuilder(IServiceCollection services)
        {
            Services=services;
        }
    }
}

再利用扩展方法为serviceCollection类加上扩展方法:

 using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Walt.Framework.Service.Kafka;

namespace Walt.Framework.Service
{

    public static class ServiceCollectionExtensions
    {
        /// <summary>
        /// Adds logging services to the specified <see cref="IServiceCollection" />.
        /// </summary>
        /// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param>
        /// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns>
        public static IServiceCollection AddKafka(this IServiceCollection services)
        {
            return AddKafka(services, builder => { });
        }

        public static IServiceCollection AddKafka(this IServiceCollection services
        , Action<IKafkaBuilder> configure)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }

            services.AddOptions();
            configure(new KafkaBuilder(services));
            services.TryAddSingleton<IKafkaService,KafkaService>();  //kafka的服务类
            return services;
        }
    }
}
KafkaService的实现:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;

namespace  Walt.Framework.Service.Kafka
{
    public class KafkaService : IKafkaService
    {

        private KafkaOptions _kafkaOptions;
        private Producer _producer;
        public KafkaService(IOptionsMonitor<KafkaOptions>  kafkaOptions)
        {
            _kafkaOptions=kafkaOptions.CurrentValue;
            kafkaOptions.OnChange((kafkaOpt,s)=>{
                _kafkaOptions=kafkaOpt;
                    System.Diagnostics.Debug
                    .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);

            });
             _producer=new Producer(_kafkaOptions.Properties);
        }

        private byte[] ConvertToByte(string str)
        {
            return System.Text.Encoding.Default.GetBytes(str);
        }

        public  async Task<Message> Producer(string topic,string key,string value)
        {
            if(string.IsNullOrEmpty(topic)
            ||string.IsNullOrEmpty(value))
            {
                throw new ArgumentNullException("topic或者value不能为null.");
            }

           var task=  await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value));
           return task;
        }

    }
}

那么咱们是不是忘记什么了,看上面的代码,是不是那个配置类KafkaOptions 还没有说明?

再在这个位置添加kafka的配置类KafkaConfigurationOptions:

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Walt.Freamwork.Service;

namespace Walt.Freamwork.Configuration
{
    public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions>
    {

        private readonly IConfiguration _configuration;

        public KafkaConfigurationOptions(IConfiguration configuration)
        {
           _configuration=configuration;
        }

        public void Configure(KafkaOptions options)
        {
                //这里仅仅自定义一些你自己的代码,使用上面configuration配置中的配置节,处理程序没法自动绑定的
                  一些事情。
        }
    }
}

然后,将配置类添加进服务:

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Walt.Framework.Service;

namespace Walt.Framework.Configuration
{
    public static class KafkaConfigurationExtensioncs
    {
          public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder
          ,IConfiguration configuration)
          {

                InitService( builder,configuration);
                return builder;
          }

          public static void InitService(IKafkaBuilder builder,IConfiguration configuration)
          {
            builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>(
                  new KafkaConfigurationOptions(configuration));  //配置类和配置内容

            builder.Services.TryAddSingleton
            (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>(
                  new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );//这个是观察类,如果更改,会激发onchange方法

            builder.Services
            .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>>
            (new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); //这个是option类,没这个,配置无法将类绑定

             builder.Services.AddSingleton(new KafkaConfiguration(configuration));
          }
    }
} 

ok,推送nuget,业务部分调用。

三     kafka服务调用

在project中引用然后restore:

引入命名空间:

调用:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Walt.Framework.Log;
using Walt.Framework.Configuration;
using Walt.Framework.Service;

namespace Walt.TestMcroServoces.Webapi
{
    public class Program
    {
        public static void Main(string[] args)
        { 

            var host = new WebHostBuilder()
            .ConfigureAppConfiguration((hostingContext, configContext) =>{
                 var en=hostingContext.HostingEnvironment;
                 if(en.IsDevelopment())
                 {
                     configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json");
                 }
                 else
                 {
                     configContext.AddJsonFile("appsettings.json");
                 }
                   configContext.AddCommandLine(args)
             .AddEnvironmentVariables()
             .SetBasePath(Directory.GetCurrentDirectory()).Build(); 

            }).ConfigureServices((context,configureServices)=>{
                   configureServices.AddKafka(KafkaBuilder=>{
                    KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService"));
                   });
            })   //kafka的调用。
            .ConfigureLogging((hostingContext, logging) => {

                logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"))
                .AddCustomizationLogger();

            }).UseKestrel(KestrelServerOption=>{
                KestrelServerOption.ListenAnyIP(801);
            })
            .UseStartup<Startup>().Build();
            host.Run();
            Console.ReadKey();
        }
    }

}

然后提交git,让jenkins构建docker发布运行:

jenkin是是非常牛的一款构建工具,不仅仅根据插件可以扩展不同环境,还支持分布式构建.

这是我们用jenikins构建的的:

让它跑起来:

调用看看:

这个方法是输出Properties数组的:

四 集成kafka

kafka的接口不多,看看都有那些:

https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html

ConsumerProducer是咱们发布消息和调用消息的两个主类,代码在上文已经实现的service。

客户端代码:

使用my-replicated-topic-morepart这儿topic,还是希望多分区,因为后面consumer使用分布式计算读取。

consumer先在客户端监听:

product端的调用代码:

执行这个接口后,再看consumer接收到的消息:

最后一步,将咱们kafka日志部分替换为真实的kafka环境,看结果:

分布式日志到这里结束,可能大家觉得后面还有日志索引和日志展现,因为这个读kafka需要分布式去处理,

我下面刚好要写分布式计算的文章,所以到时可以拿这个当例子,承前继后。

using System;

using System.Collections.Generic;

using System.Threading.Tasks;

using Confluent.Kafka;

using Microsoft.Extensions.Options;

namespace Walt.Framework.Service.Kafka

{

public class KafkaService : IKafkaService

{

private KafkaOptions _kafkaOptions;

private Producer _producer;

public KafkaService(IOptionsMonitor<KafkaOptions> kafkaOptions)

{

_kafkaOptions=kafkaOptions.CurrentValue;

kafkaOptions.OnChange((kafkaOpt,s)=>{

_kafkaOptions=kafkaOpt;

System.Diagnostics.Debug

.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);

});

_producer=new Producer(_kafkaOptions.Properties);

}

private byte[] ConvertToByte(string str)

{

return System.Text.Encoding.Default.GetBytes(str);

}

public async Task<Message> Producer(string topic,string key,string value)

{

if(string.IsNullOrEmpty(topic)

||string.IsNullOrEmpty(value))

{

throw new ArgumentNullException("topic或者value不能为null.");

}

var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value));

return task;

}

}

}

原文地址:https://www.cnblogs.com/ck0074451665/p/10211725.html

时间: 2024-11-07 20:41:50

asp.net core mcroservices 架构之 分布式日志(三):集成kafka的相关文章

asp.net core microservices 架构之 分布式自动计算(一)

   一:简介   自动计算都是常驻内存的,没有人机交互.我们经常用到的就是console job和sql job了.sqljob有自己的宿主,与数据库产品有很关联,暂时不提.console job使用quartz.net框架,目前3.x已经支持netcore. 如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现.但是分布式就不可同日而语了.如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如

ASP.NET Core使用Elasticsearch记录NLog日志

ASP.NET Core使用Elasticsearch记录NLog日志 1.新建一个 ASP.NET Core项目 2.安装Nuge包 运行:Install-Package NLog.Web.AspNetCore 运行:Install-Package NLog 运行:Install-package NLog.Targets.ElasticSearch 3.编写NLog配置文件(NLog.config) <?xml version="1.0" encoding="utf-

使用 ASP.NET Core MVC 创建 Web API(三)

原文:使用 ASP.NET Core MVC 创建 Web API(三) 使用 ASP.NET Core MVC 创建 Web API 使用 ASP.NET Core MVC 创建 Web API(一) 使用 ASP.NET Core MVC 创建 Web API(二) 十.添加 GetBookItem 方法 1) 在Visual Studio 2017中的“解决方案资源管理器”中双击打开BookController文件,添加Get方法的API.代码如下. // GET: api/Book [H

asp.net core microservices 架构之eureka服务发现

一 简介 微服务将需多的功能拆分为许多的轻量级的子应用,这些子应用相互调度.好处就是轻量级,完全符合了敏捷开发的精神.我们知道ut(单元测试),不仅仅提高我们的程序的健壮性,而且可以强制将类和方法的设计尽量的单一化.那么微服务也是这样,敏捷对于软件工程的意义就是快速开发,验证市场需求,然后快速改进,从而适应市场节奏.什么东西要快速,就必须轻量级.大家知道一个应用的复杂程度,完全是和这个项目的功能和代码数量挂钩的,这是软件自诞生就存在的问题,一个设计不好的软件,最后会让这个软件更新和改进变的非常复

在.NET Core中使用Exceptionless分布式日志收集框架

一.Exceptionless简介 Exceptionless 是一个开源的实时的日志收集框架,它可以应用在基于 ASP.NET,ASP.NET Core,Web Api,Web Forms,WPF,Console,MVC 等技术栈的应用程序中,并且提供了Rest接口可以应用在 Javascript,Node.js 中.它将日志收集变得简单易用并且不需要了解太多的相关技术细节及配置.在以前,我们做日志收集大多使用 Log4net,Nlog 等框架,在应用程序变得复杂并且集群的时候,可能传统的方式

ASP.NET Core 3中的自定义日志记录

根据我的经验,通常在API中记录请求和响应.这样做可以帮助开发人员调试问题并提供有价值的性能指标.在本教程中,我将介绍如何为ASP.NET Core 3 Web API创建基本的日志记录解决方案.在这篇文章的结尾,我们将有一个有效的日志记录解决方案,它将记录每个请求以及对控制台和文件系统的响应,并且日志将包括API处理每个请求所花费的时间.以下是概述: 1. 先决条件2. 创建RequestLog和ResponseLog模型3. 创建ILogForWebAPI4. 创建WebAPIConsole

ASP.NET Core Web 应用程序系列(三)- 在ASP.NET Core中使用Autofac替换自带DI进行构造函数和属性的批量依赖注入(MVC当中应用)

在上一章中主要和大家分享了在ASP.NET Core中如何使用Autofac替换自带DI进行构造函数的批量依赖注入,本章将和大家继续分享如何使之能够同时支持属性的批量依赖注入. 约定: 1.仓储层接口都以“I”开头,以“Repository”结尾.仓储层实现都以“Repository”结尾. 2.服务层接口都以“I”开头,以“Service”结尾.服务层实现都以“Service”结尾. 接下来我们正式进入主题,在上一章的基础上我们再添加一个web项目TianYa.DotNetShare.Core

.net core microservices 架构之 分布式

 一:简介   自动计算都是常驻内存的,没有人机交互.我们经常用到的就是console job和sql job了.sqljob有自己的宿主,与数据库产品有很关联,暂时不提.console job使用quartz.net框架,目前3.x已经支持netcore. 如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现.但是分布式就不可同日而语了.如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如如果

asp.net core microservices 架构之Task 事务一致性 事件源 详解

一 aspnetcore之task的任务状态-CancellationToken 我有一篇文章讲解了asp.net的线程方面的知识.我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作