Redis如何实现高并发分布式锁?

众所周知,分布式锁在微服务架构中是重头戏,尤其是在互联网公司,基本上企业内部都会有自己的一套分布式锁开发框架。本文主要介绍使用Redis如何构建高并发分布式锁。

假设 存在一个SpringBoot的控制器,其扣减库存的业务逻辑如下:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {

    // 将库存取出来
    int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判断库存够不够减
    if (stock > 0) {
        // 将库存回写到redis
        int tmp = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", tmp.toString());
        logger.info("库存扣减成功");
    } else {
        logger.info("库存扣减失败");
    }

    return "finished.";
}

不难看出,在应用服务器运行这段代码的时候就会有线程安全性问题。因为多个线程同时去修改Redis服务中的数据。因此考虑给这段代码加上一把锁:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    synchronized (this) {
        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判断库存够不够减
        if (stock > 0) {
            // 将库存回写到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("库存扣减成功");
        } else {
            logger.info("库存扣减失败");
        }
    }
    return "finished.";
}

这样一来,当多个HTTP请求来请求数据的时候,多个线程去修改同一数据会有JVM本地锁来进行合理的资源限制。虽然这样解决了线程安全性问题,但是这仅仅是JVM级别的锁,在分布式的环境下,由于像这样的Web应用随时会进行动态扩容,因此当多个应用的时候,同样会有线程安全性问题,当上面这段代码遇到类似下面的架构时还是会有各种各样的问题:

对于上述的情况,我们可以使用redis api提供的setnx方法解决:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {

    // 尝试获取锁
    Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

    // 判断是否获得锁
    if (!flag) { return "error"; }

    int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判断库存够不够减
    if (stock > 0) {
        // 将库存回写到redis
        int tmp = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", tmp.toString());
        logger.info("库存扣减成功");
    } else {
        logger.info("库存扣减失败");
    }

    // 删除锁
    stringRedisTemplate.delete("Hello");

    return "finished.";
}

setnx key value是将key的值设置为value,当且仅当key不存在的时候。如果设置成功就返回1,否则就返回0。

这样的话,首先尝试获取锁,然后当业务执行完成的时候再删除锁。但是还是有问题的,当获取锁的时候抛出异常或者业务执行抛出异常怎么办,所以加入异常处理逻辑:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    try {
        // 尝试获取锁
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

        // 判断是否获得锁
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判断库存够不够减
        if (stock > 0) {
            // 将库存回写到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("库存扣减成功");
        } else {
            logger.info("库存扣减失败");
        }
    } finally {
        // 删除锁
        stringRedisTemplate.delete("Hello");
    }
    return "finished.";
}

经过这样的修改,看起来没什么问题了。但是当程序获得锁并且开始执行业务逻辑的时候,突然程序挂掉了或者被一些粗暴的运维工程师给kill,在finally中删除锁的逻辑就会得不到执行,因此就会产生死锁。对于这种情况,我们可以给这个锁设置一个超时时间:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    try {
        // 尝试获取锁
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

        // 设置超时时间, 根据业务场景估计超时时长
        stringRedisTmplate.expire("Hello", 10, TimeUnit.SECONDS);

        // 判断是否获得锁
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判断库存够不够减
        if (stock > 0) {
            // 将库存回写到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("库存扣减成功");
        } else {
            logger.info("库存扣减失败");
        }
    } finally {
        // 删除锁
        stringRedisTemplate.delete("Hello");
    }
    return "finished.";
}

如果程序这么来写,相对来说安全一些了,但是还是存在问题。试想一下,当获取锁成功时,正想给这把锁设置超时的时候,程序挂掉了,还是会出现死锁的,因此在redis较高的版本中提供的setIfAbsent方法中可以同时设置锁的超时时间。

 Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World", 10, TimeUnit.SECONDS);

这样一来,尝试获取锁和设置锁的超时时间就具备原子性了。实际上经过我们这一番改造,这在小型企业已经没有太大的问题, 因为像这种代码每天也就执行几百次,并不算做高并发的场景。当这样的代码被暴露在超高并发场景下的时候,还是会存在各种各样的问题。试想一个场景,当一个HTTP请求请求到控制器的时候,应用获取到锁了,超时时间也设置成功了,但是应用的业务逻辑超过了超时时间,我们这里的超时时间设置的是10秒,当应用的业务逻辑执行15秒的时候,锁就被redis服务删除了。假设恰好此时又有一个HTTP请求来请求控制器,此时应用服务器会再启动一个线程来获取锁,而且还获取成功了,但是这次的HTTP请求对应的业务逻辑还没有执行完。新来的TTTP请求也在执行,由于新来的HTTP请求也在执行,因为锁超时后被删除,新的HTTP请求也成功获取锁了。当原来的HTTP请求对应的业务逻辑执行完成以后,尝试删除锁,这样正好删除的是新来的HTTP请求对应的锁。这个时候redis中又没有锁了,这样第三个HTTP请求又会获得锁,所以情况就不妙了。

为了解决上面的问题,我们可以将代码优化为下面的样子:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    String clientUuid = UUID.randomUUID().toString();
    try {
        // 尝试获取锁,设置超时时间, 根据业务场景估计超时时长
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", clientUuid, 10, TimeUnit.SECONDS);

        // 判断是否获得锁
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判断库存够不够减
        if (stock > 0) {
            // 将库存回写到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("库存扣减成功");
        } else {
            logger.info("库存扣减失败");
        }
    } finally {
        // 删除锁的时候判断是不是自己的锁
        if (clientUuid.equals(stringRedisTemplate.opsForValue().get("Hello"))) {
            stringRedisTemplate.delete("Hello");
        }
    }
    return "finished.";
}

但是由于程序的不可预知性,谁也不能保证极端情况下,同时会有多个线程同时执行这段业务逻辑。我们可以在当执行业务逻辑的时候同时开一个定时器线程,每隔几秒就重新将这把锁设置为10秒,也就是给这把锁进行“续命”。这样就用担心业务逻辑到底执行多长时间了。但是这样程序的复杂性就会增加,每个业务逻辑都要写好多的代码,因此这里推荐在分布式环境下使用redisson。因此我们使用redisson实现分支线程的代码:

  • 引入依赖:
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>
  • 初始化Redisson的客户端配置:
@Bean
public Redisson redisson () {
    Config cfg = new Config();
    cfg.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
    return (Redisson) Redisson.create(cfg);
}
  • 在程序中注入Redisson客户端:
@Autowired
private Redisson redisson;
  • 对应的业务逻辑:
@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    // 获取锁对象
    RLock lock = redisson.getLock("Hello");
    try {
        // 尝试加锁, 默认30秒, 自动后台开一个线程实现锁的续命
        lock.tryLock();

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判断库存够不够减
        if (stock > 0) {
            // 将库存回写到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("库存扣减成功");
        } else {
            logger.info("库存扣减失败");
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
    return "finished.";
}

Redisson分布式锁的实现原理如下:

但是这个架构还是存在问题的,因为redis服务器是主从的架构,当在master节点设置锁之后,slave节点会立刻同步。但是如果刚在master节点设置上了锁,slave节点还没来得及设置,master节点就挂掉了。还是会产生上同样的问题,新的线程获得锁。

因此使用redis构建高并发的分布式锁,仅适合单机架构,当使用主从架构的redis时还是会出现线程安全性问题。

原文地址:https://blog.51cto.com/xvjunjie/2428610

时间: 2024-08-07 23:39:32

Redis如何实现高并发分布式锁?的相关文章

高并发分布式锁

==============================================分布式======================================= 原文地址:https://www.cnblogs.com/lovedaodao/p/10113279.html

Redis实现高并发分布式序列号

使用Redis实现高并发分布式序列号生成服务 序列号的构成 为建立良好的数据治理方案,作数据掌握.分析.统计.商业智能等用途,业务数据的编码制定通常都会遵循一定的规则,一般来讲,都会有自己的编码规则和自增序列构成.比如我们常见的身份证号.银行卡号.社保电脑号等等. 以某公司产品标识码(代表该产品的唯一编码)的构成为例: 规则定义:商品款号(8位)+颜色号(3位)+号型码(3位) (共14位) 其标识码为:62X19001 001 46A 业务含义为: 2009年男装秋冬季仿毛套西黑色170A版

常用的分布式锁和redis和zk两种分布式锁的对比

常用的分布式锁 一..基于数据库实现分布式锁 1. 悲观锁 利用select … where … for update 排他锁 注意: 其他附加功能与实现一基本一致,这里需要注意的是“where name=lock ”,name字段必须要走索引,否则会锁表.有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题. 2. 乐观锁 所谓乐观锁与前边最大区别在于基于CAS思想,是不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才

Redis中是如何实现分布式锁的?

分布式锁常见的三种实现方式: 数据库乐观锁: 基于Redis的分布式锁: 基于ZooKeeper的分布式锁. 本地面试考点是,你对Redis使用熟悉吗?Redis中是如何实现分布式锁的. 要点 Redis要实现分布式锁,以下条件应该得到满足 互斥性 在任意时刻,只有一个客户端能持有锁. 不能死锁 客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁. 容错性 只要大部分的Redis节点正常运行,客户端就可以加锁和解锁. 实现 可以直接通过 set key value px mil

基于redis集群实现的分布式锁,可用于秒杀商品的库存数量管理,有測试代码(何志雄)

转载请标明出处. 在分布式系统中,常常会出现须要竞争同一资源的情况,本代码基于redis3.0.1+jedis2.7.1实现了分布式锁. redis集群的搭建,请见我的另外一篇文章:<><redis3.0.1集群环境搭建> 可用于比如秒杀系统中的商品库存的管理.付完整代码及測试用例. package com.gaojiasoft.gaojiaRedis; import java.util.UUID; import java.util.concurrent.LinkedBlockin

服务端高并发分布式架构 ESB 企业服务总线

服务端高并发分布式架构演进之路 - 个人文章 - SegmentFault 思否 https://segmentfault.com/a/1190000018626163 ESB 企业服务总线讲解 https://mp.weixin.qq.com/s/vWtuv2UnnVPi4U5w97REmg 银行企业服务总线应用架构 https://mp.weixin.qq.com/s/PwvPHGHbjotrST0biiTUKA 原文地址:https://www.cnblogs.com/yuanjiangw

web开发中的两把锁之数据库锁:(高并发--乐观锁、悲观锁)

这篇文章讲了 1.同步异步概念(消去很多疑惑),同步就是一件事一件事的做:sychronized就是保证线程一个一个的执行. 2.我们需要明白,锁机制有两个层面,一种是代码层次上的,如Java中的同步锁,典型的就是同步关键字synchronized ( 线    程级别的).另一个就是数据库层次上的,比较典型的就是悲观锁和乐观锁. 3.常见并发同步案例分析   附原文链接 http://www.cnblogs.com/xiohao/p/4385508.html 对于我们开发的网站,如果网站的访问

使用redis设计一个简单的分布式锁

最近看了有关redis的一些东西,了解了redis的一下命令,就记录一下: redis中的setnx命令: 关于redis的操作命令,我们一般会使用set,get等一系列操作,数据结构也有很多,这里我们使用最简单的string来存储锁. redis下提供一个setnx命令用来将key值设为value,类似于set功能,但是它和set是有区别的,在于后面的nx,setnx是SET if Not eXists.就是:当且仅当key值不存在的时候,将该key值设置为value. 也就是说使用setnx

单进程单线程的Redis如何能够高并发

参考文档: (1)http://yaocoder.blog.51cto.com/2668309/888374 (2)http://www.cnblogs.com/syyong/p/6231326.html 1.基本原理 采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗) (1)为什么不采用多进程或多线程处理? 多线程处理可能涉及到锁 多线程处理会涉及到线程切换而消耗CPU (2)单线程处理的缺点? 无法发挥多核CPU性能,不过可以通过在单机开多个Redi