.net core microservices 架构之 分布式

 一:简介  

自动计算都是常驻内存的,没有人机交互。我们经常用到的就是console job和sql job了。sqljob有自己的宿主,与数据库产品有很关联,暂时不提。console job使用quartz.net框架,目前3.x已经支持netcore。

如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现。但是分布式就不可同日而语了。如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如如果你的订单太多,而一个进程处理延迟是10秒,那用户体验就会非常不好,虽然异步已经提高了你的吞吐量,但是延迟太久,对后续业务还是造成很大的干扰,时不时的会进行停顿。如果两到三台的机器进行处理,那延迟就会大大的减低,但是这两到三台服务器如果分配处理任务?如何分割这个数据,多进程进行处理?就需要到这一篇讲解的知识了。

在多个job应用之间进行协调的工具,就是zookeeper了,zookeeper官方介绍:一个分布式应用协调服务。其实他也是一个类似文件系统的写一致的数据存储软件,我们可以使用它做分布式锁,对应用进行协调控制。

目前流行的这一类产品也比较多,但是我是熟悉quartz,至于切片的功能,在quartz之上可以进行封装,因为所要封装的功能不多,所以我还是选择了quartz。

二 zookeeper 服务

首先就是zookeeper服务,和前面log日志一样,首先创建构建和配置文件类。

  首先看看配置类:

ZookeeperConfiguration.cs

using Microsoft.Extensions.Configuration;

namespace Walt.Framework.Configuration
{
    public class ZookeeperConfiguration
    {
        public IConfiguration Configuration { get; }

        public ZookeeperConfiguration(IConfiguration configuration)
        {
            Configuration = configuration;
        }
    }

}

ZookeeperConfigurationExtensioncs.cs

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Walt.Framework.Service.Zookeeper;

namespace Walt.Framework.Configuration
{
    public static class ZookeeperConfigurationExtensioncs
    {
          public static IZookeeperBuilder AddConfiguration(this IZookeeperBuilder builder
          ,IConfiguration configuration)
          {

                InitService( builder,configuration);
                return builder;
          }

          public static void InitService(IZookeeperBuilder builder,IConfiguration configuration)
          {
            builder.Services.TryAddSingleton<IConfigureOptions<ZookeeperOptions>>(
                  new ZookeeperConfigurationOptions(configuration));

            builder.Services.TryAddSingleton
            (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<ZookeeperOptions>>(
                  new ConfigurationChangeTokenSource<ZookeeperOptions>(configuration)) );

            builder.Services
            .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<ZookeeperOptions>>
            (new ConfigureFromConfigurationOptions<ZookeeperOptions>(configuration)));

             builder.Services.AddSingleton(new ZookeeperConfiguration(configuration));
          }
    }
} 

ZookeeperConfigurationOptions.cs

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Walt.Framework.Service.Zookeeper;

namespace Walt.Framework.Configuration
{
    public class ZookeeperConfigurationOptions : IConfigureOptions<ZookeeperOptions>
    {

        private readonly IConfiguration _configuration;

        public ZookeeperConfigurationOptions(IConfiguration configuration)
        {
           _configuration=configuration;
        }

        public void Configure(ZookeeperOptions options)
        {
             System.Diagnostics.Debug.WriteLine("zookeeper配置类,适配方法。"
             +Newtonsoft.Json.JsonConvert.SerializeObject(options));
        }
    }
}

以上这三个类就是配置类,下面是构建类和配置信息类:

ZookeeperBuilder.cs

using Microsoft.Extensions.DependencyInjection;

namespace Walt.Framework.Service.Zookeeper
{
    public class ZookeeperBuilder : IZookeeperBuilder
    {
        public IServiceCollection Services {get;}

        public ZookeeperBuilder(IServiceCollection services)
        {
            Services=services;
        }
    }
}

ZookeeperOptions.cs

using System;
using System.Collections.Generic;

namespace Walt.Framework.Service.Zookeeper
{
    public class ZookeeperOptions
    {
        /// <param name="connectstring">comma separated host:port pairs, each corresponding to a zk server.
        /// e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional chroot suffix is used the example would look like:
        /// "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" where the client would be rooted at "/app/a" and all paths would
        /// be relative to this root - ie getting/setting/etc... "/foo/bar" would result in operations being run on "/app/a/foo/bar"
        /// (from the server perspective).</param>
        public string Connectstring{get;set;}

        public int SessionTimeout{get;set;}

        public int SessionId{get;set;}

        public string SessionPwd{get;set;}

        public bool IsReadOnly{get;set;}

    }
}

ZookeeperService.cs 这个类中有两个watch类。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using static org.apache.zookeeper.ZooKeeper;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using static org.apache.zookeeper.Watcher.Event;
using Newtonsoft.Json;
using System.Collections.Concurrent;

namespace  Walt.Framework.Service.Zookeeper
{

    internal class  WaitLockWatch:Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        private string _path;

        private ZookeeperService _zookeeperService;

        public string _tempNode;

        public WaitLockWatch(AutoResetEvent autoResetEvent
        ,ZookeeperService zookeeperService
        ,ILogger logger,string path
        ,string tempNode)
        {
            _autoResetEvent=autoResetEvent;
            _zookeeperService=zookeeperService;
            _logger=logger;
            _path=path;
            _tempNode=tempNode;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("{0}节点下子节点发生改变,激发监视方法。",_path);
            var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result;
            if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",_path);
                        return Task.FromResult(0);
                   }
                   var top=childList.Children.OrderBy(or=>or).First();
                   if(_path+"/"+top==_tempNode)
                   {
                        _logger.LogDebug("释放阻塞");
                        _autoResetEvent.Set();
                   }

            return Task.FromResult(0);
       }
    }

    internal class WaitConnWatch : Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        public WaitConnWatch(AutoResetEvent autoResetEvent
        ,ILogger logger)
        {
            _autoResetEvent=autoResetEvent;
            _logger=logger;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("watch激发,回掉状态:{0}",@event.getState().ToString());
            if(@event.getState()== KeeperState.SyncConnected
            ||@event.getState()== KeeperState.ConnectedReadOnly)
            {
                _logger.LogDebug("释放阻塞");
                _autoResetEvent.Set();
            }
            return Task.FromResult(0);
       }
    }

    public class ZookeeperService : IZookeeperService
    {

        public List<string> requestLockSequence=new List<string>();
        private object _lock=new object();
        private ZookeeperOptions _zookeeperOptions;
        private ZooKeeper _zookeeper;

         private static readonly byte[] NO_PASSWORD = new byte[0];

         public Watcher Wathcer {get;set;}

         public ILoggerFactory LoggerFac { get; set; }

         private ILogger _logger;

         AutoResetEvent[] autoResetEvent=new AutoResetEvent[2]
         {new AutoResetEvent(false),new AutoResetEvent(false)};

        public ZookeeperService(IOptionsMonitor<ZookeeperOptions>  zookeeperOptions
        ,ILoggerFactory loggerFac)
        {
            LoggerFac=loggerFac;
            _logger=LoggerFac.CreateLogger<ZookeeperService>();
            _zookeeperOptions=zookeeperOptions.CurrentValue;
            _logger.LogDebug("配置参数:{0}",JsonConvert.SerializeObject(_zookeeperOptions));
             zookeeperOptions.OnChange((zookopt,s)=>{
                _zookeeperOptions=zookopt;
            });
            _logger.LogDebug("开始连接");
            Conn(_zookeeperOptions);
        }

        private void Conn(ZookeeperOptions zookeeperOptions)
        {
            bool isReadOnly=default(Boolean);
            Wathcer=new WaitConnWatch(autoResetEvent[0],_logger);
            if(isReadOnly!=zookeeperOptions.IsReadOnly)
            {
                isReadOnly=zookeeperOptions.IsReadOnly;
            }

            byte[] pwd=new byte[0];
            //如果没有密码和sessionId
            if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
            &&_zookeeperOptions.SessionId==default(int))
            {
             _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
            }
            else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
            {
                pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
            }
            else
            {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                 ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
            }
             if(_zookeeper.getState()==States.CONNECTING)
            {
                _logger.LogDebug("当前状态:CONNECTING。阻塞等待");
                autoResetEvent[0].WaitOne();
            }
        }

        public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList)
        {
            ReConn();
            if(string.IsNullOrEmpty(path)||!path.StartsWith(‘/‘))
            {
                _logger.LogDebug("path路径非法,参数:path:{0}",path);
                return null;
            }
            byte[] dat=new byte[0];
            if(string.IsNullOrEmpty(data))
            {
                dat=System.Text.Encoding.Default.GetBytes(data);
            }
            if(createMode==null)
            {
                 _logger.LogDebug("createMode为null,默认使用CreateMode.PERSISTENT");
                createMode=CreateMode.PERSISTENT;
            }
            return _zookeeper.createAsync(path,dat,aclList,createMode);
        }

        public Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync)
        {
            ReConn();
            if(_zookeeper.existsAsync(path).Result==null )
            {
                _logger.LogDebug("path不存在");
                return null;
            }
            if(isSync)
            {
                 _logger.LogDebug("即将进行同步。");
                 var task=Task.Run(async ()=>{
                    await _zookeeper.sync(path);
                 });
                task.Wait();
            }

            return _zookeeper.getDataAsync(path,watcher);
        }

         public Task<Stat> SetDataAsync(string path,string data,bool isSync)
        {
            ReConn();
            if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }
            byte[] dat=new byte[0];
            if(!string.IsNullOrEmpty(data))
            {
                dat=System.Text.Encoding.Default.GetBytes(data);
            }
            return _zookeeper.setDataAsync(path,dat);
        }

         public async Task<ChildrenResult> GetChildrenAsync(string path,Watcher watcher,bool isSync)
         {
             ReConn();
              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }
             if(isSync)
            {
                 _logger.LogDebug("即将进行同步。");
                 var task=Task.Run(async  ()=>{
                      _logger.LogDebug("开始同步");
                      await _zookeeper.sync(path);
                 });
                task.Wait();
            }
             return await _zookeeper.getChildrenAsync(path,watcher);
         }

         public void DeleteNode(string path,String tempNode)
         {
             if(!string.IsNullOrEmpty(tempNode))
             {
                requestLockSequence.Remove(tempNode);
             }
             ReConn();
              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return;
            }
            var  task=Task.Run(async ()=>{
                 _logger.LogDebug("删除node:{0}",path);
                  await _zookeeper.deleteAsync(path);
            });
            task.Wait();
            var sequencePath=requestLockSequence.Where(w=>path==w).FirstOrDefault();
            if(sequencePath!=null)
            {
                requestLockSequence.Remove(sequencePath);
            }
         }

         public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
         {
             _logger.LogInformation("获取分布式锁开始。");
             ReConn();
             string tempNode=string.Empty;
             tempNodeOut=string.Empty;

              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }

            try
            {
                _logger.LogDebug("开始锁定语句块");
                lock(_lock)
                {
                     _logger.LogDebug("锁定,访问requestLockSequence的代码应该同步。");
                    tempNode=requestLockSequence
                    .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault();

                    if(tempNode==null)
                    {
                        tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result;
                        _logger.LogDebug("创建节点:{0}",tempNode);
                        if(tempNode==null)
                        {
                            _logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
                            ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL);
                            return null;
                        }
                         _logger.LogInformation("创建成功,加入requestLockSequence列表。");
                        requestLockSequence.Add(tempNode);
                    }
                    else
                    {
                        _logger.LogDebug("已经存在的锁节点,返回null");
                        return null;
                    }
                }

                var childList= GetChildrenAsync(path,null,true).Result;
                   if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",path);
                        return null;
                   }
                   _logger.LogDebug("获取path:{0}的子节点:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                   var top=childList.Children.OrderBy(or=>or).First();
                   byte[] da=null;
                   if(path+"/"+top==tempNode)
                   {
                       tempNodeOut =tempNode;
                       da= GetDataAsync(path,null,true).Result.Data;
                        if(da==null||da.Length<1)
                        {
                            return string.Empty;
                        }
                        return System.Text.Encoding.Default.GetString(da);
                   }
                   else
                   {
                    childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result;
                    autoResetEvent[1].WaitOne();
                   }
                    _logger.LogDebug("继续执行。");
                    tempNodeOut =tempNode;
                    da= GetDataAsync(path,null,true).Result.Data;
                    if(da==null||da.Length<1)
                    {
                         return string.Empty;
                    }
                    return System.Text.Encoding.Default.GetString(da);
            }
            catch(Exception ep)
            {
                 _logger.LogError(ep,"获取同步锁出现错误。");
                if(!string.IsNullOrEmpty(tempNode))
                {
                    DeleteNode(tempNode,tempNode);
                }
            }
            return null;
         }

         private void ReConn()
         {
             _logger.LogInformation("检查连接状态");
             if(_zookeeper.getState()==States.CLOSED
             ||_zookeeper.getState()== States.NOT_CONNECTED)
             {
                 _logger.LogInformation("连接为关闭,开始重新连接。");
                Conn(_zookeeperOptions);
             }
         }

         public void Close(string tempNode)
         {
             var task=Task.Run(async ()=>{
             requestLockSequence.Remove(tempNode);
              await _zookeeper.closeAsync();
             });
             task.Wait();
         }

    }
}

前面的类如果了解了netcore的扩展服务和配置机制,就很简单理解了,我们主要是讲解这个服务类的功能。

首先看服务类其中的一段代码:

 private void Conn(ZookeeperOptions zookeeperOptions)
        {
            bool isReadOnly=default(Boolean);
            Wathcer=new WaitConnWatch(autoResetEvent[0],_logger); //监控连接是否连接成功
            if(isReadOnly!=zookeeperOptions.IsReadOnly)
            {
                isReadOnly=zookeeperOptions.IsReadOnly;
            }

            byte[] pwd=new byte[0];
            //如果没有密码和sessionId
            if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
            &&_zookeeperOptions.SessionId==default(int))
            {
             _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
            }
            else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
            {
                pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
            }
            else
            {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                 ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
            }
             if(_zookeeper.getState()==States.CONNECTING)
            {
                _logger.LogDebug("当前状态:CONNECTING。阻塞等待");
                autoResetEvent[0].WaitOne();
            }
        }

这个方法是连接zookeeper,我们在构造函数中调用它,注意,zookeeper是异步,所以需要watcher类监控和AutoResetEvent阻塞当前线程,因为如果不阻塞,在连接还没建立的时候,后面调用,

会出现错误。在监控类被触发的时候,执行取消阻塞。

 internal class WaitConnWatch : Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        public WaitConnWatch(AutoResetEvent autoResetEvent
        ,ILogger logger)
        {
            _autoResetEvent=autoResetEvent;
            _logger=logger;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("watch激发,回掉状态:{0}",@event.getState().ToString());
            if(@event.getState()== KeeperState.SyncConnected
            ||@event.getState()== KeeperState.ConnectedReadOnly)
            {
                _logger.LogDebug("释放阻塞");
                _autoResetEvent.Set();  //取消阻塞当前线程。
            }
            return Task.FromResult(0);
       }
    }

大家看zookeeper服务类,里面很多方法在执行前需要执行conn方法,这只是一种失败重连的机制,因为一般没有线程池的,我都会给这个服务单例。哪有人会问,如果失败重连,会不会阻塞当前应用,

这个不会,因为netcore是多线程的,但是会降低这个应用的生产力。我前面翻译过一篇net的关于线程的知识,后面也会单独一篇讲解netcore的线程模型。还有代码中我很少用异常去处理容错性,尽量抛出原生的异常,使用日志去记录,然后为这个类返回个null,异常对性能也有一定的消耗,当然看自己习惯了,目的都是为了应用的健壮性。

其他方法是操作zookeeper的类,大家可以看我贴出来的代码。因为zookeeper最出名的估计就是分布式锁了,所以就把这个功能加进来。

 public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
         {
             _logger.LogInformation("获取分布式锁开始。");
             ReConn();
             string tempNode=string.Empty;
             tempNodeOut=string.Empty;

              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }

            try
            {
                _logger.LogDebug("开始锁定语句块");
                lock(_lock)  //这是我为了防止重复提交做的防止并发,当然实际用的地方是协调应用之间的功能,而肯定不会用人机交互。你可以把这个看作多此一举。
                {
                     _logger.LogDebug("锁定,访问requestLockSequence的代码应该同步。");
                    tempNode=requestLockSequence
                    .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault();

                    if(tempNode==null)
                    {
                        tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result;
                        _logger.LogDebug("创建节点:{0}",tempNode);
                        if(tempNode==null)
                        {
                            _logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
                            ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL);
                            return null;
                        }
                         _logger.LogInformation("创建成功,加入requestLockSequence列表。");
                        requestLockSequence.Add(tempNode);
                    }
                    else
                    {
                        _logger.LogDebug("已经存在的锁节点,返回null");
                        return null;
                    }
                }

                var childList= GetChildrenAsync(path,null,true).Result;  //首先获取lock子节点。
                   if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",path);
                        return null;
                   }
                   _logger.LogDebug("获取path:{0}的子节点:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                   var top=childList.Children.OrderBy(or=>or).First();
                   byte[] da=null;
                   if(path+"/"+top==tempNode) //判断是否是当前自己的节点在队列顶端。
                   {
                       tempNodeOut =tempNode;
                       da= GetDataAsync(path,null,true).Result.Data;
                        if(da==null||da.Length<1)
                        {
                            return string.Empty;
                        }
                        return System.Text.Encoding.Default.GetString(da);
                   }
                   else
                   {
                    childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result; //如果自己不再队列顶端,则加监听等待这个节点有更改。
                    autoResetEvent[1].WaitOne();
                   }
                    _logger.LogDebug("继续执行。");
                    tempNodeOut =tempNode;
                    da= GetDataAsync(path,null,true).Result.Data;
                    if(da==null||da.Length<1)
                    {
                         return string.Empty;
                    }
                    return System.Text.Encoding.Default.GetString(da);
            }
            catch(Exception ep)
            {
                 _logger.LogError(ep,"获取同步锁出现错误。");
                if(!string.IsNullOrEmpty(tempNode))
                {
                    DeleteNode(tempNode,tempNode);
                }
            }
            return null;
         }

接下来看监听类:

 internal class  WaitLockWatch:Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        private string _path;

        private ZookeeperService _zookeeperService;

        public string _tempNode;

        public WaitLockWatch(AutoResetEvent autoResetEvent
        ,ZookeeperService zookeeperService
        ,ILogger logger,string path
        ,string tempNode)
        {
            _autoResetEvent=autoResetEvent;
            _zookeeperService=zookeeperService;
            _logger=logger;
            _path=path;
            _tempNode=tempNode;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("{0}节点下子节点发生改变,激发监视方法。",_path);
            var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result;
            if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("获取子序列失败,计数为零.path:{0}",_path);
                        return Task.FromResult(0);
                   }
                   var top=childList.Children.OrderBy(or=>or).First();
                   if(_path+"/"+top==_tempNode)   //判断当前节点是否队列顶端
                   {
                        _logger.LogDebug("释放阻塞");
                        _autoResetEvent.Set();
                   }

            return Task.FromResult(0);
       }
    }

这个类,在path节点每次更改或者子节点更改的时候都会激发,仅仅是判断当前节点是不是列表的顶端,再执行 _autoResetEvent.Set();释放阻塞让继续执行。

编译提交nuget,然后集成到测试程序看效果。

首先看zookeeper:

目前节点下没有一个子节点,再看看这个节点的值:

首先新建一个看看:

create -s -e /testzookeeper/sequence "" 

然后启动程序,当程序提交一个子节点,他是排序的:

我们删除我们刚刚用命令行创建的节点:

 delete /testzookeeper/sequence0000000072

然后再看值:

我们的客户端就是获取到锁后更改这个值,增加了一段字符串,客户端调用如下:

 [HttpGet("{id}")]
        public ActionResult<string> Get(int id)
        {
            string sequenceNode=string.Empty;
            string nodename=_zookeeperService.GetDataByLockNode("/testzookeeper","sequence",ZooDefs.Ids.OPEN_ACL_UNSAFE,out  sequenceNode);
            if(sequenceNode==null)
            {
                return "获取分布式锁失败,请查看日志。";
            }

            _zookeeperService.SetDataAsync("/testzookeeper",nodename+"执行了"+DateTime.Now.ToString("yyyyMMddhhmmss"),false);
            if(!string.IsNullOrEmpty(sequenceNode))
            {
                _zookeeperService.DeleteNode(sequenceNode,sequenceNode);
                return "取得锁并且成功处理数据,释放锁成功。";
            }

            return "出现错误,请查日志。";
        }

当然这个是我做测试用的,实际开发,web就是用来做他的数据库存取的,尽量不要去做一些额外的功能,因为i守护进程或者后台服务和job有它自己的任务。

原文地址:https://www.cnblogs.com/Leo_wl/p/10271889.html

时间: 2024-09-29 12:44:58

.net core microservices 架构之 分布式的相关文章

asp.net core microservices 架构之 分布式自动计算(一)

   一:简介   自动计算都是常驻内存的,没有人机交互.我们经常用到的就是console job和sql job了.sqljob有自己的宿主,与数据库产品有很关联,暂时不提.console job使用quartz.net框架,目前3.x已经支持netcore. 如果单台服务器运行计算,那就很简单了,quartz很强大了,而且支持故障灾难转移集群,docker做宿主,很容易实现.但是分布式就不可同日而语了.如果你的一个数据处理太慢,需要多进程多主机处理,那么就需要多进程自动协调处理这一数据,比如

asp.net core microservices 架构之eureka服务发现

一 简介 微服务将需多的功能拆分为许多的轻量级的子应用,这些子应用相互调度.好处就是轻量级,完全符合了敏捷开发的精神.我们知道ut(单元测试),不仅仅提高我们的程序的健壮性,而且可以强制将类和方法的设计尽量的单一化.那么微服务也是这样,敏捷对于软件工程的意义就是快速开发,验证市场需求,然后快速改进,从而适应市场节奏.什么东西要快速,就必须轻量级.大家知道一个应用的复杂程度,完全是和这个项目的功能和代码数量挂钩的,这是软件自诞生就存在的问题,一个设计不好的软件,最后会让这个软件更新和改进变的非常复

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

一 kafka介绍 kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了.硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快.kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性.最熟悉的消息队列框架有ActiveMQ 和 RabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列

asp.net core microservices 架构之Task 事务一致性 事件源 详解

一 aspnetcore之task的任务状态-CancellationToken 我有一篇文章讲解了asp.net的线程方面的知识.我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作

微服务架构的分布式事务场景及解决方案分析

分布式系统架构中,分布式事务问题是一个绕不过去的挑战.而微服务架构的流行,让分布式事问题日益突出! 下面我们以电商购物支付流程中,在各大参与者系统中可能会遇到分布式事务问题的场景进行详细的分析! 如上图所示,假设三大参与平台(电商平台.支付平台.银行)的系统都做了分布式系统架构拆分,按上数中的流程步骤进行分析: 1.电商平台中创建订单:预留库存.预扣减积分.锁定优惠券,此时电商平台内各服务间会有分布式事务问题,因为此时已经要跨多个内部服务修改数据: 2.支付平台中创建支付订单(选银行卡支付):查

分享一个大型进销存供应链项目(多层架构、分布式WCF多服务器部署、微软企业库架构)

分享一个大型进销存供应链项目(多层架构.分布式WCF多服务器部署.微软企业库架构) 这是一个比较大型的项目,准备开源了.支持N家门店同时操作.远程WCF+企业库5.0实现. 这块应该算是库存模块中的核心模块了,因为该块的业务逻辑比较多,比较繁琐,大致讲讲业务逻辑吧,大致的逻辑为:出库单/出库单-->填写订单-->出库/入库-->修改库存信息,按照这个顺序来完成入库出库,顺序不能颠倒,同时还要实现订单的删除,修改,在修改库存信息时由于表和表之间有很多的外键关系,所以要同时删除多张表中含有删

微服务架构的分布式事务解决方案

微服务架构的分布式事务解决方案 标签:分布式事务,微服务,消息最终一致性,分布式事务解决方案发布于 2016-07-16 18:39:05 分布式系统架构中,分布式事务问题是一个绕不过去的挑战.而微服务架构的流行,让分布式事问题日益突出! 下面我们以电商购物支付流程中,在各大参与者系统中可能会遇到分布式事务问题的场景进行详细的分析! 如上图所示,假设三大参与平台(电商平台.支付平台.银行)的系统都做了分布式系统架构拆分,按上数中的流程步骤进行分析: 1.电商平台中创建订单:预留库存.预扣减积分.

java微服务架构的分布式事务解决方案

java微服务架构的分布式事务解决方案 课程目录如下: 1.课程介绍20分钟2.解决方案的效果演示(结合支付系统真实应用场景)45分钟3.常用的分布式事务解决方案介绍47分钟4.消息发送一致性(可靠消息的前提保障)20分钟5.消息发送一致性的异常流程处理16分钟6.常规MQ队列消息的处理流程和特点12分钟7.消息重复发送问题及业务接口的幂等性设计18分钟8.可靠消息最终一致性方案1(本地消息服务)的设计19分钟9.可靠消息最终一致性方案2(独立消息服务)的设计24分钟10.可靠消息服务的设计与实

转载Aaron博客 ---- jQuery 2.0.3 源码分析core - 整体架构

jQuery 2.0.3 源码分析core - 整体架构 整体架构 拜读一个开源框架,最想学到的就是设计的思想和实现的技巧. 废话不多说,jquery这么多年了分析都写烂了,老早以前就拜读过, 不过这几年都是做移动端,一直御用zepto, 最近抽出点时间把jquery又给扫一遍 我也不会照本宣科的翻译源码,结合自己的实际经验一起拜读吧! github上最新是jquery-master,加入了AMD规范了,我就以官方最新2.0.3为准 整体架构 jQuery框架的核心就是从HTML文档中匹配元素并