Redisson实现分布式锁—RedissonLock

Redisson实现分布式锁—RedissonLock

有关Redisson实现分布式锁上一篇博客讲了分布式的锁原理:Redisson实现分布式锁---原理

这篇主要讲RedissonLock和RLock。Redisson分布式锁的实现是基于RLock接口,RedissonLock实现RLock接口。

一、RLock接口

1、概念

public interface RLock extends Lock, RExpirable, RLockAsync

很明显RLock是继承Lock锁,所以他有Lock锁的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。

2、RLock锁API

这里针对上面做个整理,这里列举几个常用的接口说明

public interface RRLock {
    //----------------------Lock接口方法-----------------------

    /**
     * 加锁 锁的有效期默认30秒
     */
    void lock();
    /**
     * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
     */
    boolean tryLock();
    /**
     * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
     * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
     *
     * @param time 等待时间
     * @param unit 时间单位 小时、分、秒、毫秒等
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    /**
     * 解锁
     */
    void unlock();
    /**
     * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
     * Thread.currentThread().interrupt(); 方法真正中断该线程
     */
    void lockInterruptibly();

    //----------------------RLock接口方法-----------------------
    /**
     * 加锁 上面是默认30秒这里可以手动设置锁的有效时间
     *
     * @param leaseTime 锁有效时间
     * @param unit      时间单位 小时、分、秒、毫秒等
     */
    void lock(long leaseTime, TimeUnit unit);
    /**
     * 这里比上面多一个参数,多添加一个锁的有效时间
     *
     * @param waitTime  等待时间
     * @param leaseTime 锁有效时间
     * @param unit      时间单位 小时、分、秒、毫秒等
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    /**
     * 检验该锁是否被线程使用,如果被使用返回True
     */
    boolean isLocked();
    /**
     * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
     * 这个比上面那个实用
     */
    boolean isHeldByCurrentThread();
    /**
     * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
     * @param leaseTime  锁有效时间
     * @param unit       时间单位 小时、分、秒、毫秒等
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit);
}

RLock相关接口,主要是新添加了 leaseTime 属性字段,主要是用来设置锁的过期时间,避免死锁。

二、RedissonLock实现类

public class RedissonLock extends RedissonExpirable implements RLock

RedissonLock实现了RLock接口,所以实现了接口的具体方法。这里我列举几个方法说明下

1、void lock()方法

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

发现lock锁里面进去其实用的是lockInterruptibly(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。

具体有关lockInterruptibly()方法讲解推荐一个博客。博客Lock的lockInterruptibly()

接下来执行流程,这里理下关键几步

   /**
     * 1、带上默认值调另一个中断锁方法
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }
    /**
     * 2、另一个中断锁的方法
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException
    /**
     * 3、这里已经设置了锁的有效时间默认为30秒  (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30)
     */
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    /**
     * 4、最后通过lua脚本访问Redis,保证操作的原子性
     */
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。

2、tryLock(long waitTime, long leaseTime, TimeUnit unit)

接口的参数和含义上面已经说过了,现在我们开看下源码,这里只显示一些重要逻辑。

 @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        //1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走
        if (ttl == null) {
            return true;
        }
        //2、如果超过了尝试获取锁的等待时间,当然返回false 了。
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        // 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //  阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
        //  只有await返回true,才进入循环尝试获取锁
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

       //4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果
        //1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 获取锁成功
            if (ttl == null) {
                return true;
            }
           //   获取锁失败
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    }

重点 tryLock一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍

3、unlock()

解锁的逻辑很简单。

@Override
    public void unlock() {
        // 1.通过 Lua 脚本执行 Redis 命令释放锁
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                        "end; " +
                        "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()),
                LockPubSub.unlockMessage, internalLockLeaseTime,
                getLockName(Thread.currentThread().getId()));
        // 2.非锁的持有者释放锁时抛出异常
        if (opStatus == null) {
            throw new IllegalMonitorStateException(
                    "attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + Thread.currentThread().getId());
        }
        // 3.释放锁后取消刷新锁失效时间的调度任务
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }

使用 EVAL 命令执行 Lua 脚本来释放锁:

  1. key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1
  2. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil
  3. 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一
  4. 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1

注意这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。比如下面这种情况

      //设置锁1秒过去
        redissonLock.lock("redisson", 1);
        /**
         * 业务逻辑需要咨询2秒
         */
        redissonLock.release("redisson");
      /**
       * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
       * 那么这个时候 线程2 进来了。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)
       */
只要自己变优秀了,其他的事情才会跟着好起来(中将6)

原文地址:https://www.cnblogs.com/qdhxhz/p/11055426.html

时间: 2024-10-28 09:47:42

Redisson实现分布式锁—RedissonLock的相关文章

使用Redisson实现分布式锁,Spring AOP简化之

源码 Redisson概述 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务.其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publi

利用Redisson实现分布式锁及其底层原理解析

Redis介绍 参考地址:https://blog.csdn.net/turbo_zone/article/details/83422215 redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此

【高并发】你知道吗?大家都在使用Redisson实现分布式锁了!!

写在前面 忘记之前在哪个群里有朋友在问:有出分布式锁的文章吗-@冰河?我的回答是:这周会有,也是[高并发]专题的.想了想,还是先发一个如何使用Redisson实现分布式锁的文章吧?为啥?因为使用Redisson实现分布式锁简单啊!Redisson框架是基于Redis实现的分布式锁,非常强大,只需要拿来使用就行了,至于分布式锁的原理啥的,后面再撸一篇文章就是了. Redisson框架十分强大,基于Redisson框架可以实现几乎你能想到的所有类型的分布式锁.这里,我就列举几个类型的分布式锁,并各自

redisson实现分布式锁原理

Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

Java之——redis并发读写锁,使用Redisson实现分布式锁

原文:http://blog.csdn.net/l1028386804/article/details/73523810 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁. [java] view plain copy public void testReentrantLock(RedissonClient redisson){ RLock lo

Redisson获取分布式锁

1. maven <!-- redisson 分布式锁 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.2</version> </dependency> 2. redisson客户端配置类 package com.harara.redis; import

springboot整合redisson分布式锁

一.通过maven引入redisson的jar包 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency> 二.在yaml文件中引入redis的相关配置(redis单节点可以读取原有redis配置拼装,如果是主从需另外独立配置,相关属性

redisson spring boot starter 做分布式锁

使用redisson做分布式锁 分布式锁 在java中单体应用中,我们如果想要保证一个接口或者服务.方法当下只有一个线程在运行,我们可以通过JDK提供的Lock.Semaphore.同步锁等多种方式实现只有一个线程在运行. 在微服务系统中,我们的单体应用会变成多个节点,只靠JDK本身的锁只能控制一个节点的运行,所以我们需要一个可以控制全局的锁来控制系统的运行,这就是所谓的分布式锁. Zk redis 等中间件都可以做分布式锁,优缺点也各不相同,在我们现在的系统中zk的直接操作还是比较少,更多的是

使用分布式锁时考虑哪些问题

工作中经常会遇到争抢共享资源的场景,比如用户抢购秒杀商品,如果不对商品库存进行保护,可能会造成超卖的情况.超卖现象在售卖火车票的场景下更加明显,两个人购买到同一天同一辆列车,相同座位的情况是不允许出现的.交易系统中的退款同样如此,由于网络延迟和重复提交极端时间差的情况下,可能会造成同一个用户重复的退款请求.以上无论是超卖,还是重复退款,都是没有对需要保护的资源或业务进行完善的保护而造成的,从设计方面一定要避免这种情况的发生. 本文以退款交易场景入手,引入分布式锁,尝试分析分布式锁需要考虑关注点,