【分布式锁】Redis实现可重入的分布式锁

一、前言

之前写的一篇文章《细说分布式锁》介绍了分布式锁的三种实现方式,但是Redis实现分布式锁关于Lua脚本实现、自定义分布式锁注解以及需要注意的问题都没描述。本文就是详细说明如何利用Redis实现重入的分布式锁。

二、方案

死锁问题

当一个客户端获取锁成功之后,假如它崩溃了导致它再也无法和 Redis 节点通信,那么它就会一直持有这个锁,导致其它客户端永远无法获得锁了,因此锁必须要有一个自动释放的时间。
  我们需要保证setnx命令和expire命令以原子的方式执行,否则如果客户端执行setnx获得锁后,这时客户端宕机了,那么这把锁没有设置过期时间,导致其他客户端永远无法获得锁了。

锁被其他线程释放

如果不加任何处理即简单使用 SETNX 实现 Redis 分布式锁,就会遇到一个问题:如果线程 C1 获得锁,但由于业务处理时间过长,锁在线程 C1 还未处理完业务之前已经过期了,这时线程 C2 获得锁,在线程 C2 处理业务期间线程 C1 完成业务执行释放锁操作,但这时线程 C2 仍在处理业务线程 C1 释放了线程 C2 的锁,导致线程 C2 业务处理实际上没有锁提供保护机制;同理线程 C2 可能释放线程 C3 的锁,从而导致严重的问题。
  因此每个线程释放锁的时候只能释放自己的锁,即锁必须要有一个拥有者的标记,并且也需要保证释放锁的原子性操作。
  在释放锁的时候判断拥有者的标记(value是否相同),只有相同时才可以删除,同时利用Lua脚本来达到原子操作,脚本如下:

  1. if redis.call("get", KEYS[1]) == ARGV[1] then


  2. return redis.call("del", KEYS[1]) 

  3. else 

  4. return 0 

  5. end 

可重入问题

可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。
这里有两种解决方案:

  1. 客户端在获得锁后保存value(拥有者标记),然后释放锁的时候将value和key同时传过去。
  2. 利用ThreadLocal实现,获取锁后将Redis中的value保存在ThreadLocal中,同一线程再次尝试获取锁的时候就先将 ThreadLocal 中的 值 与 Redis 的 value 比较,如果相同则表示这把锁所以该线程,即实现可重入锁。

这里的实现的方案是基于单机Redis,之前说的集群问题这里暂不考虑。

三、编码

我们通过自定义分布式锁注解+AOP可以更加方便的使用分布式锁,只需要在加锁的方法上加上注解即可。
Redis分布式锁接口

/**
 * Redis分布式锁接口
 * Created by 2YSP on 2019/9/20.
 */
public interface IRedisDistributedLock {

  /**
   *
   * @param key
   * @param requireTimeOut 获取锁超时时间 单位ms
   * @param lockTimeOut 锁过期时间,一定要大于业务执行时间 单位ms
   * @param retries 尝试获取锁的最大次数
   * @return
   */
  boolean lock(String key, long requireTimeOut, long lockTimeOut, int retries);

  /**
   * 释放锁
   * @param key
   * @return
   */
  boolean release(String key);

}

Redis 分布式锁实现类

/**
 * Redis 分布式锁实现类
 * Created by 2YSP on 2019/9/20.
 */
@Slf4j
@Component
public class RedisDistributedLockImpl implements IRedisDistributedLock {

  /**
   * key前缀
   */
  public static final String PREFIX = "Lock:";
  /**
   * 保存锁的value
   */
  private ThreadLocal<String> threadLocal = new ThreadLocal<>();

  private static final Charset UTF8 = Charset.forName("UTF-8");
  /**
   * 释放锁脚本
   */
  private static final String UNLOCK_LUA;

  /*
   * 释放锁脚本,原子操作
   */
  static {
    StringBuilder sb = new StringBuilder();
    sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
    sb.append("then ");
    sb.append("    return redis.call(\"del\",KEYS[1]) ");
    sb.append("else ");
    sb.append("    return 0 ");
    sb.append("end ");
    UNLOCK_LUA = sb.toString();
  }

  @Autowired
  private RedisTemplate redisTemplate;

  @Override
  public boolean lock(String key, long requireTimeOut, long lockTimeOut, int retries) {
    //可重入锁判断
    String originValue = threadLocal.get();
    if (!StringUtils.isBlank(originValue) && isReentrantLock(key, originValue)) {
      return true;
    }
    String value = UUID.randomUUID().toString();
    long end = System.currentTimeMillis() + requireTimeOut;
    int retryTimes = 1;

    try {
      while (System.currentTimeMillis() < end) {
        if (retryTimes > retries) {
          log.error(" require lock failed,retry times [{}]", retries);
          return false;
        }
        if (setNX(wrapLockKey(key), value, lockTimeOut)) {
          threadLocal.set(value);
          return true;
        }
        // 休眠10ms
        Thread.sleep(10);

        retryTimes++;
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return false;
  }

  private boolean setNX(String key, String value, long expire) {
    /**
     * List设置lua的keys
     */
    List<String> keyList = new ArrayList<>();
    keyList.add(key);
    return (boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
      Boolean result = connection
          .set(key.getBytes(UTF8),
              value.getBytes(UTF8),
              Expiration.milliseconds(expire),
              SetOption.SET_IF_ABSENT);
      return result;
    });

  }

  /**
   * 是否为重入锁
   */
  private boolean isReentrantLock(String key, String originValue) {
    String v = (String) redisTemplate.opsForValue().get(key);
    return v != null && originValue.equals(v);
  }

  @Override
  public boolean release(String key) {
    String originValue = threadLocal.get();
    if (StringUtils.isBlank(originValue)) {
      return false;
    }
    return (boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
      return connection
          .eval(UNLOCK_LUA.getBytes(UTF8), ReturnType.BOOLEAN, 1, wrapLockKey(key).getBytes(UTF8),
              originValue.getBytes(UTF8));
    });
  }

  private String wrapLockKey(String key) {
    return PREFIX + key;
  }
}

分布式锁注解

@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLock {

  /**
   * 默认包名加方法名
   * @return
   */
  String key() default "";

  /**
   * 过期时间 单位:毫秒
   * <pre>
   *     过期时间一定是要长于业务的执行时间.
   * </pre>
   */
  long expire() default 30000;

  /**
   * 获取锁超时时间 单位:毫秒
   * <pre>
   *     结合业务,建议该时间不宜设置过长,特别在并发高的情况下.
   * </pre>
   */
  long timeout() default 3000;

  /**
   * 默认重试次数
   * @return
   */
  int retryTimes() default Integer.MAX_VALUE;

}

aop切片类

@Component
@Aspect
@Slf4j
public class RedisLockAop {

  @Autowired
  private IRedisDistributedLock redisDistributedLock;

  @Around(value = "@annotation(lock)")
  public Object doAroundAdvice(ProceedingJoinPoint proceedingJoinPoint, DistributedLock lock) {
    // 加锁
    String key = getKey(proceedingJoinPoint, lock);
    Boolean success = null;
    try {
        success = redisDistributedLock
          .lock(key, lock.timeout(), lock.expire(), lock.retryTimes());
      if (success) {
        log.info(Thread.currentThread().getName() + " 加锁成功");
        return proceedingJoinPoint.proceed();
      }
      log.info(Thread.currentThread().getName() + " 加锁失败");
      return null;
    } catch (Throwable throwable) {
      throwable.printStackTrace();
      return null;
    } finally {
      if (success){
        boolean result = redisDistributedLock.release(key);
        log.info(Thread.currentThread().getName() + " 释放锁结果:{}",result);
      }
    }
  }

  private String getKey(JoinPoint joinPoint, DistributedLock lock) {
    if (!StringUtils.isBlank(lock.key())) {
      return lock.key();
    }
    return joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature()
        .getName();
  }
}

业务逻辑处理类

@Service
public class TestService {

  @DistributedLock(retryTimes = 1000,timeout = 1000)
  public String lockTest() {
    try {
      System.out.println("模拟执行业务逻辑。。。");
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
      return "error";
    }

    return "ok";
  }
}

四、测试

  1. 启动本地redis,启动项目
  2. 打开cmd,利用ab压力测试设置3个线程100个请求。

ab -c 3 -n 100 http://localhost:8000/lock/test

idea控制台输出如下:

enter description here

至此大功告成,代码地址

ps: 遇到一个奇怪的问题,我用 RedisTemplate.execute(RedisScript script, List keys, Object... args) 这个方法,通过加载resource目录下的lua脚本来释放锁的时候一直不成功,参数没任何问题,而且我之前的文章就是用这个方法可以正确的释放锁。

原文地址:https://www.cnblogs.com/2YSP/p/11563448.html

时间: 2024-10-09 18:18:44

【分布式锁】Redis实现可重入的分布式锁的相关文章

转:【Java并发编程】之一:可重入内置锁

每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁或监视器锁.线程在进入同步代码块之前会自动获取锁,并且在退出同步代码块时会自动释放锁.获得内置锁的唯一途径就是进入由这个锁保护的同步代码块或方法. 当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞.然而,由于内置锁是可重入的,因此如果摸个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功."重入"意味着获取锁的操作的粒度是"线程",而不是调用.重入的一种实现 方法是,为每个锁关联一个

Java并发编程(1):可重入内置锁

每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁或监视器锁.线程在进入同步代码块之前会自动获取锁,并且在退出同步代码块时会自动释放锁.获得内置锁的唯一途径就是进入由这个锁保护的同步代码块或方法. 当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞.然而,由于内置锁是可重入的,因此如果摸个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功."重入"意味着获取锁的操作的粒度是"线程",而不是调用.重入的一种实现方法是,为每个锁关联一个获

浅谈Java中的锁:Synchronized、重入锁、读写锁

Java开发必须要掌握的知识点就包括如何使用锁在多线程的环境下控制对资源的访问限制 ◆ Synchronized ◆ 首先我们来看一段简单的代码: 12345678910111213141516171819 public class NotSyncDemo { public static int i=0; static class ThreadDemo extends Thread { @Override public void run() { for (int j=0;j<10000;j++)

可重入的独占锁——ReentrantLock源码分析

ReentrantLock面试题分析 1.ReentrantLock是怎么实现的? 2.ReentrantLock的公平锁和非公平锁是如何实现的? 1.ReentrantLock类图结构 从类图我们可以直观地了解到,ReentrantLock最终还是使用AQS来实现地,并且根据参数来决定其内部是一个公平??还是非公平锁??,默认是非公平锁??. public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(bo

ReentrantLock 重入锁(下)

前沿: ReentrantLock 是java重入锁一种实现,在java中我们通常使用ReentrantLock 和 synchronized来实现锁功能,本篇通过例子来理解下ReentrantLock使用以及什么是可重入锁. 理解可重入: 1. 锁机制是为了多线程并发访问共享资源情况下为保证线程的安全,和对资源的原子性操作, 举个例子: i=i+1;其实是三部操作首先将i读取到线程的临时区存放,然后加一操作,最后将结果写回去.所谓锁机制就是保证一段程序在某段时间只有一个线程执行. 2. 可重入

ReentrantLock(重入锁)简单源码分析

1.ReentrantLock是基于AQS实现的一种重入锁. 2.先介绍下公平锁/非公平锁 公平锁 公平锁是指多个线程按照申请锁的顺序来获取锁. 非公平锁 非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁.有可能,会造成优先级反转或者饥饿现象. 3.重入锁/不可重入锁 可重入锁:广义上的可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁. 不可重入锁

偏向锁跟可重入性有什么区别

1. 并发包中的ReentrantLock是偏向锁河轻量级锁码? 是的. 2. 偏向锁跟可重入性有什么区别,非偏向锁如何实现可重入? 偏向锁和可重入性直接没啥关系.. 当然要是锁不具备可重入性,那就无所谓偏向了. 可重入性是指比如一个线程获得了对象A上的锁,如果它第二次请求A的锁必然可以获得(也就是说不会自己把自己锁住),可重入性是线程必须满足的,不然很多代码就会死锁了 偏向锁是说如果线程请求一个自己已经获得的锁,它不会去再次执行lock和unlock,这样可以提升性能. 如何实现可重入都是一样

Java 实现基于Redis的分布式可重入锁

如何实现可重入? 首先锁信息(指redis中lockKey关联的value值) 必须得设计的能负载更多信息,之前non-reentrant时value直接就是一个超时时间,但是要实现可重入单超时时间是不够的,必须要标识锁是被谁持有的,也就是说要标识分布式环境中的线程,还要记录锁被入了多少次. 如何在分布式线程中标识唯一线程? MAC地址 +jvm进程 + 线程ID(或者线程地址都行),三者结合即可唯一分布式环境中的线程.下载 实现 锁的信息采用json存储,格式如下: 代码框架还是和之前实现的非

可重入锁(good)

可重入锁,也叫做递归锁,是指在一个线程中可以多次获取同一把锁,比如:一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法[即可重入],而无需重新获得锁:https://www.zhihu.com/question/23284564/answer/29633571 对于不同线程则相当于普通的互斥锁. 在JAVA环境下 ReentrantLock 和synchronized 都是 可重入锁 最大的作用是避免死锁.在很多情况下线程需要多次进入锁内执行任务