asp.net core microservices 架构之 分布式自动计算(一)

   一:简介  

自动计算都是常驻内存的,没有人机交互。我们经常用到的就是console job和sql job了。sqljob有自己的宿主,与数据库产品有很关联,暂时不提。console job使用quartz.net框架,目前3.x已经支持netcore。

如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现。但是分布式就不可同日而语了。如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如如果你的订单太多,而一个进程处理延迟是10秒,那用户体验就会非常不好,虽然异步已经提高了你的吞吐量,但是延迟太久,对后续业务还是造成很大的干扰,时不时的会进行停顿。如果两到三台的机器进行处理,那延迟就会大大的减低,但是这两到三台服务器如果分配处理任务?如何分割这个数据,多进程进行处理?就需要到这一篇讲解的知识了。

在多个job应用之间进行协调的工具,就是zookeeper了,zookeeper官方介绍:一个分布式应用协调服务。其实他也是一个类似文件系统的写一致的数据存储软件,我们可以使用它做分布式锁,对应用进行协调控制。

目前流行的这一类产品也比较多,但是我是熟悉quartz,至于切片的功能,在quartz之上可以进行封装,因为所要封装的功能不多,所以我还是选择了quartz。

二 zookeeper 服务

首先就是zookeeper服务,和前面log日志一样,首先创建构建和配置文件类。

  首先看看配置类:

ZookeeperConfiguration.cs

using Microsoft.Extensions.Configuration;

namespace Walt.Framework.Configuration
{
    public class ZookeeperConfiguration
    {
        public IConfiguration Configuration { get; }

        public ZookeeperConfiguration(IConfiguration configuration)
        {
            Configuration = configuration;
        }
    }

}

ZookeeperConfigurationExtensioncs.cs

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Walt.Framework.Service.Zookeeper;

namespace Walt.Framework.Configuration
{
    public static class ZookeeperConfigurationExtensioncs
    {
          public static IZookeeperBuilder AddConfiguration(this IZookeeperBuilder builder
          ,IConfiguration configuration)
          {

                InitService( builder,configuration);
                return builder;
          }

          public static void InitService(IZookeeperBuilder builder,IConfiguration configuration)
          {
            builder.Services.TryAddSingleton<IConfigureOptions<ZookeeperOptions>>(
                  new ZookeeperConfigurationOptions(configuration));

            builder.Services.TryAddSingleton
            (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<ZookeeperOptions>>(
                  new ConfigurationChangeTokenSource<ZookeeperOptions>(configuration)) );

            builder.Services
            .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<ZookeeperOptions>>
            (new ConfigureFromConfigurationOptions<ZookeeperOptions>(configuration)));

             builder.Services.AddSingleton(new ZookeeperConfiguration(configuration));
          }
    }
} 

ZookeeperConfigurationOptions.cs

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Walt.Framework.Service.Zookeeper;

namespace Walt.Framework.Configuration
{
    public class ZookeeperConfigurationOptions : IConfigureOptions<ZookeeperOptions>
    {

        private readonly IConfiguration _configuration;

        public ZookeeperConfigurationOptions(IConfiguration configuration)
        {
           _configuration=configuration;
        }

        public void Configure(ZookeeperOptions options)
        {
             System.Diagnostics.Debug.WriteLine("zookeeper配置类,适配方法。"
             +Newtonsoft.Json.JsonConvert.SerializeObject(options));
        }
    }
}

以上这三个类就是配置类,下面是构建类和配置信息类:

ZookeeperBuilder.cs

using Microsoft.Extensions.DependencyInjection;

namespace Walt.Framework.Service.Zookeeper
{
    public class ZookeeperBuilder : IZookeeperBuilder
    {
        public IServiceCollection Services {get;}

        public ZookeeperBuilder(IServiceCollection services)
        {
            Services=services;
        }
    }
}

ZookeeperOptions.cs

using System;
using System.Collections.Generic;

namespace Walt.Framework.Service.Zookeeper
{
    public class ZookeeperOptions
    {
        /// <param name="connectstring">comma separated host:port pairs, each corresponding to a zk server.
        /// e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional chroot suffix is used the example would look like:
        /// "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" where the client would be rooted at "/app/a" and all paths would
        /// be relative to this root - ie getting/setting/etc... "/foo/bar" would result in operations being run on "/app/a/foo/bar"
        /// (from the server perspective).</param>
        public string Connectstring{get;set;}

        public int SessionTimeout{get;set;}

        public int SessionId{get;set;}

        public string SessionPwd{get;set;}

        public bool IsReadOnly{get;set;}

    }
}

ZookeeperService.cs 这个类中有两个watch类。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using static org.apache.zookeeper.ZooKeeper;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using static org.apache.zookeeper.Watcher.Event;
using Newtonsoft.Json;
using System.Collections.Concurrent;

namespace  Walt.Framework.Service.Zookeeper
{

    internal class  WaitLockWatch:Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        private string _path;

        private ZookeeperService _zookeeperService;

        public string _tempNode;

        public WaitLockWatch(AutoResetEvent autoResetEvent
        ,ZookeeperService zookeeperService
        ,ILogger logger,string path
        ,string tempNode)
        {
            _autoResetEvent=autoResetEvent;
            _zookeeperService=zookeeperService;
            _logger=logger;
            _path=path;
            _tempNode=tempNode;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("{0}节点下子节点发生改变,激发监视方法。",_path);
            var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result;
            if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",_path);
                        return Task.FromResult(0);
                   }
                   var top=childList.Children.OrderBy(or=>or).First();
                   if(_path+"/"+top==_tempNode)
                   {
                        _logger.LogDebug("释放阻塞");
                        _autoResetEvent.Set();
                   }

            return Task.FromResult(0);
       }
    }

    internal class WaitConnWatch : Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        public WaitConnWatch(AutoResetEvent autoResetEvent
        ,ILogger logger)
        {
            _autoResetEvent=autoResetEvent;
            _logger=logger;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("watch激发,回掉状态:{0}",@event.getState().ToString());
            if(@event.getState()== KeeperState.SyncConnected
            ||@event.getState()== KeeperState.ConnectedReadOnly)
            {
                _logger.LogDebug("释放阻塞");
                _autoResetEvent.Set();
            }
            return Task.FromResult(0);
       }
    }

    public class ZookeeperService : IZookeeperService
    {

        public List<string> requestLockSequence=new List<string>();
        private object _lock=new object();
        private ZookeeperOptions _zookeeperOptions;
        private ZooKeeper _zookeeper;

         private static readonly byte[] NO_PASSWORD = new byte[0];

         public Watcher Wathcer {get;set;}

         public ILoggerFactory LoggerFac { get; set; }

         private ILogger _logger;

         AutoResetEvent[] autoResetEvent=new AutoResetEvent[2]
         {new AutoResetEvent(false),new AutoResetEvent(false)};

        public ZookeeperService(IOptionsMonitor<ZookeeperOptions>  zookeeperOptions
        ,ILoggerFactory loggerFac)
        {
            LoggerFac=loggerFac;
            _logger=LoggerFac.CreateLogger<ZookeeperService>();
            _zookeeperOptions=zookeeperOptions.CurrentValue;
            _logger.LogDebug("配置参数:{0}",JsonConvert.SerializeObject(_zookeeperOptions));
             zookeeperOptions.OnChange((zookopt,s)=>{
                _zookeeperOptions=zookopt;
            });
            _logger.LogDebug("开始连接");
            Conn(_zookeeperOptions);
        }

        private void Conn(ZookeeperOptions zookeeperOptions)
        {
            bool isReadOnly=default(Boolean);
            Wathcer=new WaitConnWatch(autoResetEvent[0],_logger);
            if(isReadOnly!=zookeeperOptions.IsReadOnly)
            {
                isReadOnly=zookeeperOptions.IsReadOnly;
            }

            byte[] pwd=new byte[0];
            //如果没有密码和sessionId
            if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
            &&_zookeeperOptions.SessionId==default(int))
            {
             _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
            }
            else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
            {
                pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
            }
            else
            {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                 ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
            }
             if(_zookeeper.getState()==States.CONNECTING)
            {
                _logger.LogDebug("当前状态:CONNECTING。阻塞等待");
                autoResetEvent[0].WaitOne();
            }
        }

        public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList)
        {
            ReConn();
            if(string.IsNullOrEmpty(path)||!path.StartsWith(‘/‘))
            {
                _logger.LogDebug("path路径非法,参数:path:{0}",path);
                return null;
            }
            byte[] dat=new byte[0];
            if(string.IsNullOrEmpty(data))
            {
                dat=System.Text.Encoding.Default.GetBytes(data);
            }
            if(createMode==null)
            {
                 _logger.LogDebug("createMode为null,默认使用CreateMode.PERSISTENT");
                createMode=CreateMode.PERSISTENT;
            }
            return _zookeeper.createAsync(path,dat,aclList,createMode);
        }

        public Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync)
        {
            ReConn();
            if(_zookeeper.existsAsync(path).Result==null )
            {
                _logger.LogDebug("path不存在");
                return null;
            }
            if(isSync)
            {
                 _logger.LogDebug("即将进行同步。");
                 var task=Task.Run(async ()=>{
                    await _zookeeper.sync(path);
                 });
                task.Wait();
            }

            return _zookeeper.getDataAsync(path,watcher);
        }

         public Task<Stat> SetDataAsync(string path,string data,bool isSync)
        {
            ReConn();
            if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }
            byte[] dat=new byte[0];
            if(!string.IsNullOrEmpty(data))
            {
                dat=System.Text.Encoding.Default.GetBytes(data);
            }
            return _zookeeper.setDataAsync(path,dat);
        }

         public async Task<ChildrenResult> GetChildrenAsync(string path,Watcher watcher,bool isSync)
         {
             ReConn();
              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }
             if(isSync)
            {
                 _logger.LogDebug("即将进行同步。");
                 var task=Task.Run(async  ()=>{
                      _logger.LogDebug("开始同步");
                      await _zookeeper.sync(path);
                 });
                task.Wait();
            }
             return await _zookeeper.getChildrenAsync(path,watcher);
         }

         public void DeleteNode(string path,String tempNode)
         {
             if(!string.IsNullOrEmpty(tempNode))
             {
                requestLockSequence.Remove(tempNode);
             }
             ReConn();
              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return;
            }
            var  task=Task.Run(async ()=>{
                 _logger.LogDebug("删除node:{0}",path);
                  await _zookeeper.deleteAsync(path);
            });
            task.Wait();
            var sequencePath=requestLockSequence.Where(w=>path==w).FirstOrDefault();
            if(sequencePath!=null)
            {
                requestLockSequence.Remove(sequencePath);
            }
         }

         public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
         {
             _logger.LogInformation("获取分布式锁开始。");
             ReConn();
             string tempNode=string.Empty;
             tempNodeOut=string.Empty;

              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }

            try
            {
                _logger.LogDebug("开始锁定语句块");
                lock(_lock)
                {
                     _logger.LogDebug("锁定,访问requestLockSequence的代码应该同步。");
                    tempNode=requestLockSequence
                    .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault();

                    if(tempNode==null)
                    {
                        tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result;
                        _logger.LogDebug("创建节点:{0}",tempNode);
                        if(tempNode==null)
                        {
                            _logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
                            ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL);
                            return null;
                        }
                         _logger.LogInformation("创建成功,加入requestLockSequence列表。");
                        requestLockSequence.Add(tempNode);
                    }
                    else
                    {
                        _logger.LogDebug("已经存在的锁节点,返回null");
                        return null;
                    }
                }

                var childList= GetChildrenAsync(path,null,true).Result;
                   if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",path);
                        return null;
                   }
                   _logger.LogDebug("获取path:{0}的子节点:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                   var top=childList.Children.OrderBy(or=>or).First();
                   byte[] da=null;
                   if(path+"/"+top==tempNode)
                   {
                       tempNodeOut =tempNode;
                       da= GetDataAsync(path,null,true).Result.Data;
                        if(da==null||da.Length<1)
                        {
                            return string.Empty;
                        }
                        return System.Text.Encoding.Default.GetString(da);
                   }
                   else
                   {
                    childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result;
                    autoResetEvent[1].WaitOne();
                   }
                    _logger.LogDebug("继续执行。");
                    tempNodeOut =tempNode;
                    da= GetDataAsync(path,null,true).Result.Data;
                    if(da==null||da.Length<1)
                    {
                         return string.Empty;
                    }
                    return System.Text.Encoding.Default.GetString(da);
            }
            catch(Exception ep)
            {
                 _logger.LogError(ep,"获取同步锁出现错误。");
                if(!string.IsNullOrEmpty(tempNode))
                {
                    DeleteNode(tempNode,tempNode);
                }
            }
            return null;
         }

         private void ReConn()
         {
             _logger.LogInformation("检查连接状态");
             if(_zookeeper.getState()==States.CLOSED
             ||_zookeeper.getState()== States.NOT_CONNECTED)
             {
                 _logger.LogInformation("连接为关闭,开始重新连接。");
                Conn(_zookeeperOptions);
             }
         }

         public void Close(string tempNode)
         {
             var task=Task.Run(async ()=>{
             requestLockSequence.Remove(tempNode);
              await _zookeeper.closeAsync();
             });
             task.Wait();
         }

    }
}

前面的类如果了解了netcore的扩展服务和配置机制,就很简单理解了,我们主要是讲解这个服务类的功能。

首先看服务类其中的一段代码:

 private void Conn(ZookeeperOptions zookeeperOptions)
        {
            bool isReadOnly=default(Boolean);
            Wathcer=new WaitConnWatch(autoResetEvent[0],_logger); //监控连接是否连接成功
            if(isReadOnly!=zookeeperOptions.IsReadOnly)
            {
                isReadOnly=zookeeperOptions.IsReadOnly;
            }

            byte[] pwd=new byte[0];
            //如果没有密码和sessionId
            if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
            &&_zookeeperOptions.SessionId==default(int))
            {
             _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
            }
            else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
            {
                pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
            }
            else
            {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                 ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
            }
             if(_zookeeper.getState()==States.CONNECTING)
            {
                _logger.LogDebug("当前状态:CONNECTING。阻塞等待");
                autoResetEvent[0].WaitOne();
            }
        }

这个方法是连接zookeeper,我们在构造函数中调用它,注意,zookeeper是异步,所以需要watcher类监控和AutoResetEvent阻塞当前线程,因为如果不阻塞,在连接还没建立的时候,后面调用,

会出现错误。在监控类被触发的时候,执行取消阻塞。

 internal class WaitConnWatch : Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        public WaitConnWatch(AutoResetEvent autoResetEvent
        ,ILogger logger)
        {
            _autoResetEvent=autoResetEvent;
            _logger=logger;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("watch激发,回掉状态:{0}",@event.getState().ToString());
            if(@event.getState()== KeeperState.SyncConnected
            ||@event.getState()== KeeperState.ConnectedReadOnly)
            {
                _logger.LogDebug("释放阻塞");
                _autoResetEvent.Set();  //取消阻塞当前线程。
            }
            return Task.FromResult(0);
       }
    }

大家看zookeeper服务类,里面很多方法在执行前需要执行conn方法,这只是一种失败重连的机制,因为一般没有线程池的,我都会给这个服务单例。哪有人会问,如果失败重连,会不会阻塞当前应用,

这个不会,因为netcore是多线程的,但是会降低这个应用的生产力。我前面翻译过一篇net的关于线程的知识,后面也会单独一篇讲解netcore的线程模型。还有代码中我很少用异常去处理容错性,尽量抛出原生的异常,使用日志去记录,然后为这个类返回个null,异常对性能也有一定的消耗,当然看自己习惯了,目的都是为了应用的健壮性。

其他方法是操作zookeeper的类,大家可以看我贴出来的代码。因为zookeeper最出名的估计就是分布式锁了,所以就把这个功能加进来。

 public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
         {
             _logger.LogInformation("获取分布式锁开始。");
             ReConn();
             string tempNode=string.Empty;
             tempNodeOut=string.Empty;

              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }

            try
            {
                _logger.LogDebug("开始锁定语句块");
                lock(_lock)  //这是我为了防止重复提交做的防止并发,当然实际用的地方是协调应用之间的功能,而肯定不会用人机交互。你可以把这个看作多此一举。
                {
                     _logger.LogDebug("锁定,访问requestLockSequence的代码应该同步。");
                    tempNode=requestLockSequence
                    .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault();

                    if(tempNode==null)
                    {
                        tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result;
                        _logger.LogDebug("创建节点:{0}",tempNode);
                        if(tempNode==null)
                        {
                            _logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
                            ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL);
                            return null;
                        }
                         _logger.LogInformation("创建成功,加入requestLockSequence列表。");
                        requestLockSequence.Add(tempNode);
                    }
                    else
                    {
                        _logger.LogDebug("已经存在的锁节点,返回null");
                        return null;
                    }
                }

                var childList= GetChildrenAsync(path,null,true).Result;  //首先获取lock子节点。
                   if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",path);
                        return null;
                   }
                   _logger.LogDebug("获取path:{0}的子节点:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                   var top=childList.Children.OrderBy(or=>or).First();
                   byte[] da=null;
                   if(path+"/"+top==tempNode) //判断是否是当前自己的节点在队列顶端。
                   {
                       tempNodeOut =tempNode;
                       da= GetDataAsync(path,null,true).Result.Data;
                        if(da==null||da.Length<1)
                        {
                            return string.Empty;
                        }
                        return System.Text.Encoding.Default.GetString(da);
                   }
                   else
                   {
                    childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result; //如果自己不再队列顶端,则加监听等待这个节点有更改。
                    autoResetEvent[1].WaitOne();
                   }
                    _logger.LogDebug("继续执行。");
                    tempNodeOut =tempNode;
                    da= GetDataAsync(path,null,true).Result.Data;
                    if(da==null||da.Length<1)
                    {
                         return string.Empty;
                    }
                    return System.Text.Encoding.Default.GetString(da);
            }
            catch(Exception ep)
            {
                 _logger.LogError(ep,"获取同步锁出现错误。");
                if(!string.IsNullOrEmpty(tempNode))
                {
                    DeleteNode(tempNode,tempNode);
                }
            }
            return null;
         }

接下来看监听类:

 internal class  WaitLockWatch:Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        private string _path;

        private ZookeeperService _zookeeperService;

        public string _tempNode;

        public WaitLockWatch(AutoResetEvent autoResetEvent
        ,ZookeeperService zookeeperService
        ,ILogger logger,string path
        ,string tempNode)
        {
            _autoResetEvent=autoResetEvent;
            _zookeeperService=zookeeperService;
            _logger=logger;
            _path=path;
            _tempNode=tempNode;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("{0}节点下子节点发生改变,激发监视方法。",_path);
            var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result;
            if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",_path);
                        return Task.FromResult(0);
                   }
                   var top=childList.Children.OrderBy(or=>or).First();
                   if(_path+"/"+top==_tempNode)   //判断当前节点是否队列顶端
                   {
                        _logger.LogDebug("释放阻塞");
                        _autoResetEvent.Set();
                   }

            return Task.FromResult(0);
       }
    }

这个类,在path节点每次更改或者子节点更改的时候都会激发,仅仅是判断当前节点是不是列表的顶端,再执行 _autoResetEvent.Set();释放阻塞让继续执行。

编译提交nuget,然后集成到测试程序看效果。

首先看zookeeper:

目前节点下没有一个子节点,再看看这个节点的值:

首先新建一个看看:

create -s -e /testzookeeper/sequence "" 

然后启动程序,当程序提交一个子节点,他是排序的:

我们删除我们刚刚用命令行创建的节点:

 delete /testzookeeper/sequence0000000072

然后再看值:

我们的客户端就是获取到锁后更改这个值,增加了一段字符串,客户端调用如下:

 [HttpGet("{id}")]
        public ActionResult<string> Get(int id)
        {
            string sequenceNode=string.Empty;
            string nodename=_zookeeperService.GetDataByLockNode("/testzookeeper","sequence",ZooDefs.Ids.OPEN_ACL_UNSAFE,out  sequenceNode);
            if(sequenceNode==null)
            {
                return "获取分布式锁失败,请查看日志。";
            }

            _zookeeperService.SetDataAsync("/testzookeeper",nodename+"执行了"+DateTime.Now.ToString("yyyyMMddhhmmss"),false);
            if(!string.IsNullOrEmpty(sequenceNode))
            {
                _zookeeperService.DeleteNode(sequenceNode,sequenceNode);
                return "取得锁并且成功处理数据,释放锁成功。";
            }

            return "出现错误,请查日志。";
        }

当然这个是我做测试用的,实际开发,web就是用来做他的数据库存取的,尽量不要去做一些额外的功能,因为i守护进程或者后台服务和job有它自己的任务。

下一节我们就实际 使用这个分布式锁做quartz的切片功能。

原文地址:https://www.cnblogs.com/ck0074451665/p/10258224.html

时间: 2024-11-09 04:53:52

asp.net core microservices 架构之 分布式自动计算(一)的相关文章

asp.net core microservices 架构之eureka服务发现

一 简介 微服务将需多的功能拆分为许多的轻量级的子应用,这些子应用相互调度.好处就是轻量级,完全符合了敏捷开发的精神.我们知道ut(单元测试),不仅仅提高我们的程序的健壮性,而且可以强制将类和方法的设计尽量的单一化.那么微服务也是这样,敏捷对于软件工程的意义就是快速开发,验证市场需求,然后快速改进,从而适应市场节奏.什么东西要快速,就必须轻量级.大家知道一个应用的复杂程度,完全是和这个项目的功能和代码数量挂钩的,这是软件自诞生就存在的问题,一个设计不好的软件,最后会让这个软件更新和改进变的非常复

.net core microservices 架构之 分布式

 一:简介   自动计算都是常驻内存的,没有人机交互.我们经常用到的就是console job和sql job了.sqljob有自己的宿主,与数据库产品有很关联,暂时不提.console job使用quartz.net框架,目前3.x已经支持netcore. 如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现.但是分布式就不可同日而语了.如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如如果

asp.net core microservices 架构之Task 事务一致性 事件源 详解

一 aspnetcore之task的任务状态-CancellationToken 我有一篇文章讲解了asp.net的线程方面的知识.我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

一 kafka介绍 kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了.硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快.kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性.最熟悉的消息队列框架有ActiveMQ 和 RabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列

Pro ASP.NET Core MVC 第6版 第二章(后半章)

增加动态输出 整个web应用平台的关注点在于构建并显示动态输出内容.在MVC里,控制器负责构建一些数据并将其传给视图.视图负责渲染成HTML. 从控制器向视图传递数据的一种方式是使用ViewBag 对象,它是一个控制器基类的成员.ViewBag是一个动态对象,你可以给他赋值任意属性给视图来渲染用.代码2-5 演示了如何在HomeController里传递简单对象. Listing 2-5. 设置视图数据 using System; using Microsoft.AspNetCore.Mvc;

Pro ASP.NET Core MVC 第6版 第一章

第一章 ASP.NET Core MVC 的前世今生 ASP.NET Core MVC 是一个微软公司开发的Web应用程序开发框架,它结合了MVC架构的高效性和简洁性,敏捷开发的思想和技术,和.NET 平台的最好的部分.在本章,我们将学习为什么微软创建ASP.NET Core MVC, 看看他和他的前辈的比较以及和其他类似框架的比较,最后,大概讲一下ASP.NET core MVC里面有什么新东西,还有本书中包括哪些内容. 了解ASP.NET Core MVC的历史 最开始的ASP.NET 诞生

ASP.NET Core 缓存技术 及 Nginx 缓存配置

前言 在Asp.Net Core Nginx部署一文中,主要是讲述的如何利用Nginx来实现应用程序的部署,使用Nginx来部署主要有两大好处,第一是利用Nginx的负载均衡功能,第二是使用Nginx的反向代理来降低我们后端应用程序的压力.那除了以上两点之外,其实我们还可以利用代理服务器的缓存功能来进一步的降低后端应用程序的压力,提升系统的吞吐量(tps).这一篇就来看一下具体应该如何去做吧. 目录 WEB 缓存 ASP.NET Core 缓存 内存缓存 分布式缓存 Response 缓存 Ng

ASP.NET Core 介绍

原文 https://docs.asp.net/en/latest/intro.html by Daniel Roth, Rick Anderson and Shaun Luttin 章节 什么是 ASP.NET Core? 为什么重新设计出一个 AsP.NET Core? 应用解析 启动 服务 中间件 ASP.NET Core 是对 ASP.NET 的一次重大修改,基本是重新设计了.本篇向你介绍了ASP.NET Core 的一些新概念以及这些新的特性能对我们编写现在流行的WEB应用提供哪些帮助

ASP.NET Core 介绍和项目解读

1. 前言 2. ASP.NET Core 简介 2.1 什么是ASP.NET Core 2.2 ASP.NET Core的特点 2.3 ASP.NET Core 项目文件夹解读 2.3.1 项目文件夹总览 2.3.2 project.json和global.json 2.3.1 Properties——launchSettings.json 2.3.4 Startup.cs (1) 构造函数 (2) ConfigureServices (3) Configure 2.3.5 bundlecon