【redis】--springboot实现redis的分布式锁

目录

  • 1.redis的应用场景
  • 2.redis的分布式锁
  • 3.通过redisson框架实现redis分布式锁

1.redis的应用场景

  • 商品秒杀
  • 点赞等

现在有一个减少商品的场景,我们很容易能写出其代码

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
        if (stock>0){
            stock--;
            redisTemplate.opsForValue().set("a",stock+"");
            System.out.println("扣除成功,剩余:" + stock);
        }else {
            System.out.println("扣除失败,剩余:" + stock);
        }
        return "end";
    }
}

但是有一个问题,该程序单机下线程不安全。不过可以解决:加锁

@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
 public String deductSt0ck(){
     synchronized(this){
         if (!aBoolean){
             return "error";
         }

         int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
         if (stock>0){
             stock--;
             redisTemplate.opsForValue().set("a",stock+"");
             System.out.println("扣除成功,剩余:" + stock);
         }else {
             System.out.println("扣除失败,剩余:" + stock);
         }
         return "end";
     }
 }
}

加锁解决了单机的线程安全的问题,但是在集群的情况下线程依旧不安全,因为集群的情况下有多个服务器同时运行那么依然会产生线程安全问题;

因为在同一时间有两个jvm运行,其中一个jvm的锁肯定不会影响另一个jvm。故此时就需要用到redis的分布式锁。

2.redis的分布式锁

Redis 是一个单进程单线程的非关系型数据库,而 Redis 锁的实质便是让并行的多个线程在Redis 内部以串行的方式执行

redis的(SETNX key value)语句

  • 将 key 的值设为 value ,当且仅当 key 不存在。
  • 若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

故可以通过redis自身特性,及其setnx操作来实现其分布式锁。有代码

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;

  @RequestMapping("/redis")
  public String deductSt0ck(){
      String lock = "lock";
      Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, "wf");

      if (!aBoolean){
          return "error";
      }

      int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));

      if (stock>0){
          stock--;
          redisTemplate.opsForValue().set("a",stock+"");
          System.out.println("扣除成功,剩余:" + stock);
      }else {
          System.out.println("扣除失败,剩余:" + stock);
      }
      redisTemplate.delete(lock);
      return "end";
  }
}

该方法可以规避在单机状态下的安全问题但这个代码依然有问题,当程序加锁后,如果程序在解锁前出现异常,导致方法退出就不会把加的锁解开,会导致出现死锁,可以使用try--finally解决异常的问题。 还有在加锁后如果该程序突然挂断那依然会形成死锁,这个问题可以通过给key设置超时时间来解决

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
        String lock = "lock";
        String value = String.valueOf(UUID.randomUUID());
        try {
            Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);

            if (!aBoolean){
                return "error";
            }

            int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));

            if (stock>0){
                stock--;
                redisTemplate.opsForValue().set("a",stock+"");
                System.out.println("扣除成功,剩余:" + stock);
            }else {
                System.out.println("扣除失败,剩余:" + stock);
            }
        }finally {
                redisTemplate.delete(lock);
        }
        return "end";
    }
}

到这里,看似问题已经解决了,但是确实代码在分布式高并发的情况下依然有问题,就是在加锁后,如果代码出现问题,导致在还没有执行完成业务逻辑的情况下,达到了设置到超时时间,就会导致锁失效,从而使其他的线程获得执行权限,而在其他的线程加完锁后,第一个线程突然执行完成释放第二个线程加的锁。使下一个线程获得执行权限,加锁又被第二个线程释放/。这样链式这些下去就会导致锁失效。
这个问题可以通过每次给线程设置value值不同的锁,在释放锁使判断,如果value与设置到相同就释放锁,负责不释放。

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
        String lock = "lock";
        String value = String.valueOf(UUID.randomUUID());
        try {
            Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);

            if (!aBoolean){
                return "error";
            }

            int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));

            if (stock>0){
                stock--;
                redisTemplate.opsForValue().set("a",stock+"");
                System.out.println("扣除成功,剩余:" + stock);
            }else {
                System.out.println("扣除失败,剩余:" + stock);
            }
        }finally {
            if (value.equals(redisTemplate.opsForValue().get(lock))){
                redisTemplate.delete(lock);
            }
        }
        return "end";
    }
}

到这里看似代码已经完美了。但是上叙方案还有问题没有解决,就是在第二个线程加锁执行时,如果第一个线程的主逻辑没有执行完成,那么就会导致两个线程进行同时执行业务逻辑,此时线程依然不安全。而要解决该问题就需要在程序还未结束,锁到达超时时间时延长程锁的超时时间,及写一个子线程,检测如果只要程序没有执行完成就不停的延长锁的过期时间

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private Redisson redisson;

    @RequestMapping("/redis2")
    public String deductStock(){
        String lock = "lock";
        String value = String.valueOf(UUID.randomUUID());
        try {
            Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
           if (!aBoolean){
                return "error";
            }
            new Thread(()->{
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        if (value.equals(redisTemplate.opsForValue().get(lock))){
                            redisTemplate.expire(lock,15,TimeUnit.SECONDS);
                        }
                    }
                },5000,5000);
            }).start();

            int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));

            if (stock>0){
                stock--;
                redisTemplate.opsForValue().set("a",stock+"");
                System.out.println("扣除成功,剩余:" + stock);
            }else {
                System.out.println("扣除失败,剩余:" + stock);
            }
        }finally {
            try {
                TimeUnit.SECONDS.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (value.equals(redisTemplate.opsForValue().get(lock))){
                redisTemplate.delete(lock);
            }
        }
        return "ok";
    }
}

3.通过redisson框架实现redis分布式锁

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
    String lock = "lock";
    RLock rLock = redisson.getLock(lock);

    try {
    rLock.lock(30,TimeUnit.SECONDS);
    int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));

        if (stock>0){
        stock--;
        redisTemplate.opsForValue().set("a",stock+"");
        System.out.println("扣除成功,剩余:" + stock);
        }else {
        System.out.println("扣除失败,剩余:" + stock);
        }
    } finally {
      rLock.unlock();
    }
      return "end";
    }
}

redisson默认实现了以上所有的东西,

Redis 主从架构失效问题

   以上架构还存在问题:有这样一种场景,即线程1成功上锁后,但是主服务器还未来的及复制到从服务器便发生了宕机,此时从服务器被选择为主服务器,由于其没有锁记录,其他线程便可以进行上锁,此时便发生了线程安全问题。

   Red Lock 算法也可以解决上面存在的问题,其算法思想为:*使用多台 Redis Master ,节点完全独立,节点间不需要进行数据同步,因为 Master-Slave 架构一旦 Master 发生故障时数据没有复制到 Slave,被选为 Master 的 Slave 就丢掉了锁,另一个客户端就可以再次拿到锁,锁通过 setNX(原子操作) 命令设置,在有效时间内当获得锁的数量大于 (n/2+1) 代表成功,失败后需要向所有节点发送释放锁的消息。*

原文地址:https://www.cnblogs.com/wf614/p/12275028.html

时间: 2024-10-07 17:26:17

【redis】--springboot实现redis的分布式锁的相关文章

Redis如何实现高并发分布式锁?

众所周知,分布式锁在微服务架构中是重头戏,尤其是在互联网公司,基本上企业内部都会有自己的一套分布式锁开发框架.本文主要介绍使用Redis如何构建高并发分布式锁. 假设 存在一个SpringBoot的控制器,其扣减库存的业务逻辑如下: @Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping(value = "/deduct-stock") public String deductSotck()

基于redis集群实现的分布式锁,可用于秒杀商品的库存数量管理,有測试代码(何志雄)

转载请标明出处. 在分布式系统中,常常会出现须要竞争同一资源的情况,本代码基于redis3.0.1+jedis2.7.1实现了分布式锁. redis集群的搭建,请见我的另外一篇文章:<><redis3.0.1集群环境搭建> 可用于比如秒杀系统中的商品库存的管理.付完整代码及測试用例. package com.gaojiasoft.gaojiaRedis; import java.util.UUID; import java.util.concurrent.LinkedBlockin

Redis中是如何实现分布式锁的?

分布式锁常见的三种实现方式: 数据库乐观锁: 基于Redis的分布式锁: 基于ZooKeeper的分布式锁. 本地面试考点是,你对Redis使用熟悉吗?Redis中是如何实现分布式锁的. 要点 Redis要实现分布式锁,以下条件应该得到满足 互斥性 在任意时刻,只有一个客户端能持有锁. 不能死锁 客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁. 容错性 只要大部分的Redis节点正常运行,客户端就可以加锁和解锁. 实现 可以直接通过 set key value px mil

常用的分布式锁和redis和zk两种分布式锁的对比

常用的分布式锁 一..基于数据库实现分布式锁 1. 悲观锁 利用select … where … for update 排他锁 注意: 其他附加功能与实现一基本一致,这里需要注意的是“where name=lock ”,name字段必须要走索引,否则会锁表.有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题. 2. 乐观锁 所谓乐观锁与前边最大区别在于基于CAS思想,是不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才

使用redis设计一个简单的分布式锁

最近看了有关redis的一些东西,了解了redis的一下命令,就记录一下: redis中的setnx命令: 关于redis的操作命令,我们一般会使用set,get等一系列操作,数据结构也有很多,这里我们使用最简单的string来存储锁. redis下提供一个setnx命令用来将key值设为value,类似于set功能,但是它和set是有区别的,在于后面的nx,setnx是SET if Not eXists.就是:当且仅当key值不存在的时候,将该key值设置为value. 也就是说使用setnx

为什么Redis可以方便地实现分布式锁

1.Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系. 2.Redis的SETNX命令可以方便的实现分布式锁. setNX(SET if Not eXists) 语法:SETNX key value 返回值:设置成功,返回 1 :设置失败,返回 0 .(http://www.amjmh.com/v/) 当且仅当 key 不存在时将 key 的值设为 value,并返回1:若给定的 key 已经存在,则 SETNX 不做任何动作,并返回

jedisLock—redis分布式锁实现

一.使用分布式锁要满足的几个条件: 系统是一个分布式系统(关键是分布式,单机的可以使用ReentrantLock或者synchronized代码块来实现) 共享资源(各个系统访问同一个资源,资源的载体可能是传统关系型数据库或者NoSQL) 同步访问(即有很多个进程同事访问同一个共享资源.没有同步访问,谁管你资源竞争不竞争) 二.应用的场景例子 管理后台的部署架构(多台tomcat服务器+redis[多台tomcat服务器访问一台redis]+mysql[多台tomcat服务器访问一台服务器上的m

redis分布式锁和消息队列

最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令. 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样的,需要一下更为简单的锁校验的时候,redis分布式锁的使用就足够满足了.redis的分布式锁其实就是基于setnx方法和redis对key可设置有效时间的功能来实现的.基本用法比较简单. public boolean tryLock(String lock

RedLock.Net - 基于Redis分布式锁的开源实现

工作中,经常会遇到分布式环境中资源访问冲突问题,比如商城的库存数量处理,或者某个事件的原子性操作,都需要确保某个时间段内只有一个线程在访问或处理资源. 因此现在网上也有很多的分布式锁的解决方案,有数据库.MemCache.ZoopKeeper等等的方式. 这次,我们要学习的是一个基于Redis分布式锁的插件,RedLock.Net. 首先必须要有一个Redis服务来支持此分布式锁,其次就当然是要获取此插件了. 可以从Nuget中获取,也可以直接去Github下载   https://github

基于Redis实现分布式锁-Redisson使用及源码分析

在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 本文主要探讨另外一种实现分布式最终一致性的解决方案--采用分布式锁.基于分布式锁的解决