分布式锁实现

分布式锁实现

分布式锁

经常用于在解决分布式环境下的业务一致性和协调分布式环境。

实际业务场景中,比如说解决并发一瞬间的重复下单,重复确认收货,重复发现金券等。

使用分布式锁的场景一般不能太多。

开源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock

开源相关群: .net 开源基础服务 238543768

这里整理了C#.net关于redis分布式锁和zookeeper分布式锁的实现,仅用于研究。(可能有bug)

采用ServiceStack.Redis实现Redis分布式锁



1

 

/*
 * Redis分布式锁
 * 采用ServiceStack.Redis实现的Redis分布式锁
 * 详情可阅读其开源代码
 * 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同 xxf里面默认使用2.2版本
 */    public class RedisDistributedLock : BaseRedisDistributedLock
    {
        private ServiceStack.Redis.RedisLock _lock;
        private RedisClient _client;
        public RedisDistributedLock(string redisserver, string key)
            : base(redisserver, key)
        {

        }

        public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
        {
            if (lockresult == LockResult.Success)
                throw new DistributedLockException("检测到当前锁已获取");
            _client = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient();
            /*             * 阅读源码发现当其获取锁后,redis连接资源会一直占用,知道获取锁的资源释放后,连接才会跳出,可能会导致连接池资源的浪费。             */            

try            {
                this._lock = new ServiceStack.Redis.RedisLock(_client, key, getlockTimeOut);
                lockresult =  LockResult.Success;
            }
            catch (Exception exp)
            {
                XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
                lockresult = LockResult.LockSystemExceptionFailure;
            }
            return lockresult;
        }

        public override void Dispose()
        {
            try            {
                if (this._lock != null)
                    this._lock.Dispose();
                if (_client != null)
                    this._client.Dispose();
            }
            catch (Exception exp)
            {
                XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
            }
        }
    }

来自网络的java实现Redis分布式锁(C#版)

/*
* Redis分布式锁
* 采用网络上java实现的Redis分布式锁
* 参考 http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html
* 详情可阅读其开源代码
*/    public class RedisDistributedLockFromJava : BaseRedisDistributedLock
    {

        public RedisDistributedLockFromJava(string redisserver, string key)
            : base(redisserver, key)
        {

        }

        public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
        {
            if (lockresult == LockResult.Success)
                throw new DistributedLockException("检测到当前锁已获取");
            try            {
                // 1. 通过SETNX试图获取一个lock                         string @lock = key;
                long taskexpiredMilliseconds = (taskrunTimeOut != null ? (long)taskrunTimeOut.Value.TotalMilliseconds : (long)DistributedLockConfig.MaxLockTaskRunTime);
                long getlockexpiredMilliseconds = (getlockTimeOut != null ? (long)getlockTimeOut.Value.TotalMilliseconds : 0);
                long hassleepMilliseconds = 0;
                while (true)
                {
                    using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
                    {
                        long value = CurrentUnixTimeMillis() + taskexpiredMilliseconds + 1;
                        /*Java以前版本都是用SetNX,但是这种是无法设置超时时间的,不是很理解为什么,                                       * 可能是因为原来的redis命令比较少导致的?现在用Add不知道效果如何.                                         因对redis细节不了解,但个人怀疑若异常未释放锁经常发生,可能会导致内存逐步溢出*/                        

              bool acquired = redisclient.Add<long>(@lock, value, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));
                        //SETNX成功,则成功获取一个锁
                        if (acquired == true)
                        {
                            lockresult = LockResult.Success;
                        }
                        //SETNX失败,说明锁仍然被其他对象保持,检查其是否已经超时                                      else                                      {
                            var oldValueBytes = redisclient.Get(@lock);
                            //超时
                            if (oldValueBytes != null && BitConverter.ToInt64(oldValueBytes, 0) < CurrentUnixTimeMillis())
                            {
                                /*此处虽然重设并获取锁,但是超时时间可能被覆盖,故重设超时时间;若有进程一直在尝试获取锁,那么锁存活时间应该被延迟*/                                

                  var getValueBytes = redisclient.GetSet(@lock, BitConverter.GetBytes(value));
                                var o1 = redisclient.ExpireEntryIn(@lock, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));//这里如果程序异常终止,依然会有部分锁未释放的情况。                                // 获取锁成功                                if (getValueBytes == oldValueBytes)
                                {
                                    lockresult = LockResult.Success;
                                }
                                // 已被其他进程捷足先登了
                                else
                                {
                                    lockresult = LockResult.GetLockTimeOutFailure;
                                }
                            }
                            //未超时,则直接返回失败
                            else
                            {
                                lockresult = LockResult.GetLockTimeOutFailure;
                            }
                        }
                    }

                    //成功拿到锁
                    if (lockresult == LockResult.Success)
                        break;

                    //获取锁超时
                    if (hassleepMilliseconds >= getlockexpiredMilliseconds)
                    {
                        lockresult = LockResult.GetLockTimeOutFailure;
                        break;
                    }

                    //继续等待
                    System.Threading.Thread.Sleep(DistributedLockConfig.GetLockFailSleepTime);
                    hassleepMilliseconds += DistributedLockConfig.GetLockFailSleepTime;
                }
            }
            catch (Exception exp)
            {
                XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
                lockresult = LockResult.LockSystemExceptionFailure;
            }
            return lockresult;
        }

        private long CurrentUnixTimeMillis()
        {
            return (long)(System.DateTime.UtcNow - new System.DateTime(1970, 1, 1, 0, 0, 0, System.DateTimeKind.Utc)).TotalMilliseconds;
        }

        public override void Dispose()
        {
            if (lockresult == LockResult.Success || lockresult == LockResult.LockSystemExceptionFailure)
            {
                try                {
                    long current = CurrentUnixTimeMillis();
                    using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
                    {
                        var v = redisclient.Get(key);
                        if (v != null)

                        {
                            // 避免删除非自己获取得到的锁
                            if (current < BitConverter.ToInt64(v, 0))
                            {
                                redisclient.Del(key);
                            }
                        }
                    }
                }
                catch (Exception exp)
                {
                    XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
                }
            }
        }
    }

ServiceStack.Redis内部实现版本(较旧)


1

 

/*
  * Redis分布式锁
 * 采用ServiceStack.Redis实现的Redis分布式锁
 * 详情可阅读其开源代码
 * 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同
  * 拷贝自网络开源代码 较旧的实现版本
  */    public class RedisDistributedLockFromServiceStack : BaseRedisDistributedLock
    {
        public RedisDistributedLockFromServiceStack(string redisserver, string key)
            : base(redisserver, key)
        {

        }
        public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
        {
            if (lockresult == LockResult.Success)
                throw new DistributedLockException("检测到当前锁已获取");
            try
            {
                using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
                {
                    ExecExtensions.RetryUntilTrue(
                             () =>
                             {
                                 //This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
                                  //Calculate a unix time for when the lock should expire                                                   TimeSpan realSpan = taskrunTimeOut ?? TimeSpan.FromMilliseconds(DistributedLockConfig.MaxLockTaskRunTime); //new TimeSpan(365, 0, 0, 0); //if nothing is passed in the timeout hold for a year                                 DateTime expireTime = DateTime.UtcNow.Add(realSpan);
                                 string lockString = (expireTime.ToUnixTimeMs() + 1).ToString();

                                 //Try to set the lock, if it does not exist this will succeed and the lock is obtained                                 var nx = redisClient.SetEntryIfNotExists(key, lockString);
                                 if (nx)
                                 {
                                     lockresult = LockResult.Success;
                                     return true;
                                 }

                                 //If we‘ve gotten here then a key for the lock is present. This could be because the lock is                                 //correctly acquired or it could be because a client that had acquired the lock crashed (or didn‘t release it properly).                                 //Therefore we need to get the value of the lock to see when it should expire
                                  redisClient.Watch(key);
                                 string lockExpireString = redisClient.Get<string>(key);
                                 long lockExpireTime;
                                 if (!long.TryParse(lockExpireString, out lockExpireTime))
                                 {
                                     redisClient.UnWatch();  // since the client is scoped externally                                                           lockresult = LockResult.GetLockTimeOutFailure;
                                     return false;
                                 }

                                 //If the expire time is greater than the current time then we can‘t let the lock go yet                                                    if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
                                 {
                                     redisClient.UnWatch();  // since the client is scoped externally                                                          lockresult = LockResult.GetLockTimeOutFailure;
                                     return false;
                                 }

                                 //If the expire time is less than the current time then it wasn‘t released properly and we can attempt to                                  //acquire the lock. The above call to Watch(_lockKey) enrolled the key in monitoring, so if it changes                                 //before we call Commit() below, the Commit will fail and return false, which means that another thread                                  //was able to acquire the lock before we finished processing.                                 using (var trans = redisClient.CreateTransaction()) // we started the "Watch" above; this tx will succeed if the value has not moved                                  {
                                     trans.QueueCommand(r => r.Set(key, lockString));
                                     //return trans.Commit(); //returns false if Transaction failed                                     var t = trans.Commit();
                                     if (t == false)
                                         lockresult = LockResult.GetLockTimeOutFailure;
                                     else
                                     lockresult = LockResult.Success;
                                     return t;
                                 }
                             },
                             getlockTimeOut
                         );

                }
            }
            catch (Exception exp)
            {
                XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
                lockresult = LockResult.LockSystemExceptionFailure;
            }
            return lockresult;
        }

        public override void Dispose()
        {
            try            {
                using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
                {
                    redisClient.Remove(key);
                }
            }
            catch (Exception exp)
            {
                XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
            }
        }
    }

Zookeeper 版本实现分布式锁

/*  * 来源java网络源码的zookeeper分布式锁实现(目前仅翻译并简单测试ok,未来集成入sdk)
* 备注:    共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,    然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,    如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,    从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
*/    public class ZooKeeprDistributedLockFromJava : IWatcher
    {
        private ZooKeeper zk;
        private string root = "/locks"; //根
        private string lockName; //竞争资源的标志
        private string waitNode; //等待前一个锁
        private string myZnode; //当前锁
        //private CountDownLatch latch; //计数器
        private AutoResetEvent autoevent;
        private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(30000);
        private IList<Exception> exception = new List<Exception>();

        /// <summary>
        /// 创建分布式锁,使用前请确认config配置的zookeeper服务可用 </summary>
        /// <param name="config"> 127.0.0.1:2181 </param>
        /// <param name="lockName"> 竞争资源标志,lockName中不能包含单词lock </param>
        public ZooKeeprDistributedLockFromJava(string config, string lockName)
        {
            this.lockName = lockName;
            // 创建一个与服务器的连接
            try                   {
                zk = new ZooKeeper(config, sessionTimeout, this);
                var stat = zk.Exists(root, false);
                if (stat == null)
                {
                    // 创建根节点
                    zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
                }
            }
            catch (KeeperException e)
            {
                throw e;
            }
        }

        /// <summary>
        /// zookeeper节点的监视器
        /// </summary>
        public virtual void Process(WatchedEvent @event)
        {
            if (this.autoevent != null)
            {
                this.autoevent.Set();
            }
        }

        public virtual bool tryLock()
        {
            try            {
                string splitStr = "_lock_";
                if (lockName.Contains(splitStr))
                {
                    //throw new LockException("lockName can not contains \\u000B");                          }
                //创建临时子节点
                myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
                Console.WriteLine(myZnode + " is created ");
                //取出所有子节点
                IList<string> subNodes = zk.GetChildren(root, false);
                //取出所有lockName的锁
                IList<string> lockObjNodes = new List<string>();
                foreach (string node in subNodes)
                {
                    if (node.StartsWith(lockName))
                    {
                        lockObjNodes.Add(node);
                    }
                }
                Array alockObjNodes = lockObjNodes.ToArray();
                Array.Sort(alockObjNodes);
                Console.WriteLine(myZnode + "==" + lockObjNodes[0]);
                if (myZnode.Equals(root + "/" + lockObjNodes[0]))
                {
                    //如果是最小的节点,则表示取得锁
                    return true;
                }
                //如果不是最小的节点,找到比自己小1的节点
                string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1);
                waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1];
            }
            catch (KeeperException e)
            {
                throw e;
            }
            return false;
        }

        public virtual bool tryLock(TimeSpan time)
        {
            try            {
                if (this.tryLock())
                {
                    return true;
                }
                return waitForLock(waitNode, time);
            }
            catch (KeeperException e)
            {
                throw e;
            }
            return false;
        }

        private bool waitForLock(string lower, TimeSpan waitTime)
        {
            var stat = zk.Exists(root + "/" + lower, true);
            //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if (stat != null)
            {
                Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower);
                autoevent = new AutoResetEvent(false);
                bool r = autoevent.WaitOne(waitTime);
                autoevent.Dispose();
                autoevent = null;
                return r;
            }
            else                return true;
        }

        public virtual void unlock()
        {
            try            {
                Console.WriteLine("unlock " + myZnode);
                zk.Delete(myZnode, -1);
                myZnode = null;
                zk.Dispose();
            }
            catch (KeeperException e)
            {
                throw e;
            }
        }

    }

以上代码仅做参考,未压测。

代码粘贴有些问题,详细请下载开源包运行研究。

时间: 2024-11-05 12:30:02

分布式锁实现的相关文章

分布式任务&amp;分布式锁(li)

目前系统中存在批量审批.批量授权等各个操作,批量操作中可能因为处理机器.线程不同,造成刷新缓存丢失授权等信息,如批量审批同一用户权限多个权限申请后,流程平台并发的发送多个http请求到acl不同服务器,a机器处理了授权a,b机器同时处理了授权b,然后刷新用户缓存.因为在事务里彼此看不见对方提交的数据,刷新时又完全从db中读取要刷新的数据,就造成了互相丢失对方的数据.因此,需要一个分布式锁工具,来协调各个机器.线上的工作同步问题. 分布式锁千万不能用ReentrantLock,因为它的lock和u

06.Curator分布式锁

锁:分布式的锁全局同步,这意味着任何一个时间点不会有两个客户端都拥有相同的锁. 1.可重入锁Shared Reentrant Lock 首先我们先看一个全局可重入的锁(可以多次获取,不会被阻塞).Shared意味着锁是全局可见的,客户端都可以请求锁.Reentrant和JDK的ReentrantLock类似,意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞. 1.可重入锁相关类介绍 它是由类InterProcessMutex来实现.它的主要方法: // 构造方法 public Inte

分布式锁的实现思路

为什么要用分布式锁 分布式系统,需要采用集群,多个服务之间可能需要用到共享数据(redis或者数据库),这时,JVM内的锁已经不能满足跨JVM的需求,因此需采用外部的锁机制 如何实现分布式锁 个人理解,不管是java提供的锁还是其他锁,就其思想,就是要有一个大家公用的媒介,作为标识,去控制对于共享资源的操作,因此,分布式系统中,共用媒介便可以作为分布式锁,例如:zookeeper,redis等等,甚至文件都可以 实现分布式锁需要注意的问题 1.获取锁需要互斥 2.有自动失效的机制,防止因为持锁线

redisson实现分布式锁原理

Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

高级java必会系列一:zookeeper分布式锁

方案1: 算法思路:利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁.解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁.特点:这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单.缺点是会产生"惊群"效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁.方案2:算法思路:临时顺序节点实现共享锁        客户端调用create(

基于zookeeper的分布式锁实现 【转】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

基于zookeeper的分布式锁实现 【转载】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

使用zookeeper实现分布式锁

简介: 核心是解决资源竞争的问题 分布式系统中经常需要协调多进程或者多台机器之间的同步问题,得益于zookeeper,实现了一个分布式的共享锁,方便在多台服务器之间竞争资源时,来协调各系统之间的协作和同步. 实现1: ConcurrentTest: package com.concurrent; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.con

ZooKeeper场景实践:(7) 分布式锁

1.基本介绍 分布式锁是控制分布式系统之间同步访问共享资源的一种方式,需要互斥来防止彼此干扰来保证一致性.利用Zookeeper的强一致性可以完成锁服务.Zookeeper的官方文档是列举了两种锁,独占锁和共享锁.独占锁保证任何时候都只有一个进程能或者资源的读写权限.共享锁可以同时有多个读,但是同一时刻最多只能有一个写,读和写是互斥的. 2.场景分析 我们准备来实现互斥的锁,按照官网的思路,给定一个锁的路径,如/Lock,所有要申请这个锁的进程都在/Lock目录下创建一个/Lock/lock-的

浅谈分布式锁

分布式一致性问题 首先我们先来看一个小例子: 假设某商城有一个商品库存剩10个,用户A想要买6个,用户B想要买5个,在理想状态下,用户A先买走了6了,库存减少6个还剩4个,此时用户B应该无法购买5个,给出数量不足的提示:而在真实情况下,用户A和B同时获取到商品剩10个,A买走6个,在A更新库存之前,B又买走了5个,此时B更新库存,商品还剩5个,这就是典型的电商"秒杀"活动. 从上述例子不难看出,在高并发情况下,如果不做处理将会出现各种不可预知的后果.那么在这种高并发多线程的情况下,解决