以商品超卖为例讲解Redis分布式锁

本案例主要讲解Redis实现分布式锁的两种实现方式:Jedis实现、Redisson实现。网上关于这方面讲解太多了,Van自认为文笔没他们好,还是用示例代码说明。

一、jedis 实现

该方案只考虑Redis单机部署的场景

1.1 加锁

1.1.1 原理

jedis.set(String key, String value, String nxxx, String expx, int time)
  1. key: 使用key来当锁,因为key是唯一的;
  2. value: 我传的是唯一值(UUID),很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因是分布式锁要满足解铃还须系铃人:通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候要验证value值,不能误解锁;
  3. nxxx: 这个参数我填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  4. expx: 这个参数我传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定;
  5. time: 与第四个参数相呼应,代表key的过期时间。

1.1.2 小结

  • set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性;
  • 其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁;
  • 最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。

1.2 释放锁

释放锁时需要验证value值,也就是说我们在获取锁的时候需要设置一个value,不能直接用del key这种粗暴的方式,因为直接del key任何客户端都可以进行解锁了,所以解锁时,我们需要判断锁是否是自己的(基于value值来判断)

  1. 首先,写了一个简单Lua脚本代码,作用是:获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁);
  2. 然后,将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKeyARGV[1]赋值为requestIdeval()方法是将Lua代码交给Redis服务端执行。

1.3 案例(家庭多人领取奖励的场景)

这里放出的是关键代码,详细可运行的代码可至文末地址下载示例代码。

1.3.1 准备

该案例模拟家庭内多人通过领取一个奖励,但是只能有一个人能领取成功,不能重复领取(之前做过奖励模块的需求)

  • family_reward_record
CREATE TABLE `family_reward_record` (
  `id` bigint(10) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `family_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品名称',
  `reward_type` int(10) NOT NULL DEFAULT '1' COMMENT '商品库存数量',
  `state` int(1) NOT NULL DEFAULT '0' COMMENT '商品状态',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '入库时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=270 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='家庭领取奖励表(家庭内多人只能有一个人能领取成功,不能重复领取)';
  • application.yml
spring:
  datasource:
    url: jdbc:mysql://47.98.178.84:3306/dev
    username: dev
    password: password
    driver-class-name: com.mysql.jdbc.Driver
  redis:
    host: 47.98.178.84
    port: 6379
    password: password
    timeout: 2000
# mybatis
mybatis:
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: cn.van.mybatis.demo.entity

1.3.2 核心实现

  • Jedis 单机配置类 - RedisConfig.java
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.timeout}")
    private int timeout;

    @Bean
    public JedisPool redisPoolFactory() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        if (StringUtils.isEmpty(password)) {
            return new JedisPool(jedisPoolConfig, host, port, timeout);
        }
        return new JedisPool(jedisPoolConfig, host, port, timeout, password);
    }

    @Bean(name = "redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
        objectMapper.enableDefaultTyping(DefaultTyping.NON_FINAL);

        Jackson2JsonRedisSerializer<Object> jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        jsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setDefaultSerializer(jsonRedisSerializer);

        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
  • 分布式锁工具类 - RedisDistributedLock.java
@Component
public class RedisDistributedLock {
    /**
     * 成功获取锁标示
     */
    private static final String LOCK_SUCCESS = "OK";
    /**
     * 成功解锁标示
     */
    private static final Long RELEASE_SUCCESS = 1L;

    @Autowired
    private JedisPool jedisPool;

    /**
     * redis 数据存储过期时间
     */
    final int expireTime = 500;

    /**
     * 尝试获取分布式锁
     * @param lockKey 锁
     * @param lockValue 请求标识
     * @return 是否获取成功
     */
    public boolean tryLock(String lockKey, String lockValue) {
        Jedis jedis = null;
        try{
            jedis = jedisPool.getResource();
            String result = jedis.set(lockKey, lockValue, "NX", "PX", expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
        return false;
    }

    /**
     * 释放分布式锁
     * @param lockKey 锁
     * @param lockValue 请求标识
     * @return 是否释放成功
     */
    public boolean unLock(String lockKey, String lockValue) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
        return false;
    }
}
  • 不加锁时:模拟 familyId = 1 的家庭同时领取奖励
@Override
public HttpResult receiveAward() {
    Long familyId = 1L;
    Map<String, Object> params = new HashMap<String, Object>(16);
    params.put("familyId", familyId);
    params.put("rewardType", 1);
    int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params);
    if (count == 0) {
        FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now());
        int num = familyRewardRecordMapper.insert(recordDO);
        if (num == 1) {
            return HttpResult.success();
        }
        return HttpResult.failure(-1, "记录插入失败");
    }
    return HttpResult.success("该记录已存在");
}
  • 加锁的实现:模拟 familyId = 2 的家庭同时领取奖励
@Override
public HttpResult receiveAwardLock() {
    Long familyId = 2L;
    Map<String, Object> params = new HashMap<String, Object>(16);
    params.put("familyId", familyId);
    params.put("rewardType", 1);
    int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params);
    if (count == 0) {
        // 没有记录则创建领取记录
        FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now());
        // 分布式锁的key(familyId + rewardType)
        String lockKey = recordDO.getFamilyId() + "_" + recordDO.getRewardType();
        // 分布式锁的value(唯一值)
        String lockValue = createUUID();
        boolean lockStatus = redisLock.tryLock(lockKey, lockValue);
        // 锁被占用
        if (!lockStatus) {
            log.info("锁已经占用了");
            return HttpResult.failure(-1,"失败");
        }
        // 不管多个请求,加锁之后,只会有一个请求能拿到锁,进行插入操作
        log.info("拿到了锁,当前时刻:{}",System.currentTimeMillis());

        int num = familyRewardRecordMapper.insert(recordDO);
        if (num != 1) {
            log.info("数据插入失败!");
            return HttpResult.failure(-1, "数据插入失败!");
        }
        log.info("数据插入成功!准备解锁...");
        boolean unLockState = redisLock.unLock(lockKey,lockValue);
        if (!unLockState) {
            log.info("解锁失败!");
            return HttpResult.failure(-1, "解锁失败!");
        }
        log.info("解锁成功!");
        return HttpResult.success();
    }
    log.info("该记录已存在");
    return HttpResult.success("该记录已存在");
}
private String createUUID() {
    UUID uuid = UUID.randomUUID();
    String str = uuid.toString().replace("-", "_");
    return str;
}

1.3.3 测试

我采用的是JMeter工具进行测试,加锁和不加锁的情况都设置成:五次并发请求。

1.3.3.1 不加锁

/**
 * 家庭成员领取奖励(不加锁)
 * @return
 */
@PostMapping("/receiveAward")
public HttpResult receiveAward() {
    return redisLockService.receiveAward();
}

1.3.3.2 加锁

/**
 * 家庭成员领取奖励(加锁)
 * @return
 */
@PostMapping("/receiveAwardLock")
public HttpResult receiveAwardLock() {
    return redisLockService.receiveAwardLock();
}

通过对比,说明分布式锁起作用了。

1.4 小结

我上家使用的就是这种加锁方式,看上去很OK,实际上在Redis集群的时候会出现问题,比如:

A客户端在Redismaster节点上拿到了锁,但是这个加锁的key还没有同步到slave节点,master故障,发生故障转移,一个slave节点升级为master节点,B客户端也可以获取同个key的锁,但客户端A也已经拿到锁了,这就导致多个客户端都拿到锁。

正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock

二、Redlock实现

2.1 原理

antirez提出的Redlock算法大概是这样的:

Redis的分布式环境中,我们假设有NRedis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

2.1.1 加锁

为了取到锁,客户端应该执行以下操作(RedLock算法加锁步骤):

  1. 获取当前Unix时间,以毫秒为单位;
  2. 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁;
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功;
  4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  5. 如果因为某些原因,获取锁失败(没有在至少N/2+1Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

2.1.2 解锁

向所有的Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁.

2.2 案例(商品超卖为例)

这部分以最常见的案例:抢购时的商品超卖(库存数减少为负数)为例

2.2.1 准备

  • good
CREATE TABLE `good` (
                      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
                      `good_name` varchar(255) NOT NULL COMMENT '商品名称',
                      `good_counts` int(255) NOT NULL COMMENT '商品库存',
                      `create_time` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
                      PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='商品表';
-- 插入两条测试数据
INSERT INTO `good` VALUES (1, '哇哈哈', 5, '2019-09-20 17:39:04');
INSERT INTO `good` VALUES (2, '卫龙', 5, '2019-09-20 17:39:06');
  • 配置文件跟上面一样

2.2.2 核心实现

  • Redisson 配置类 RedissonConfig.java

我这里配置的是单机,更多配置详见https://github.com/redisson/redisson/wiki/配置

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;

    /**
     * RedissonClient,单机模式
     * @return
     * @throws IOException
     */
    @Bean
    public RedissonClient redissonSentinel() {
        //支持单机,主从,哨兵,集群等模式,此为单机模式

        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://" + host + ":" + port)
                .setPassword(password);
        return Redisson.create(config);
    }
}
  • 不加锁时
@Override
public HttpResult saleGoods(){
    // 以指定goodId = 1:哇哈哈为例
    Long goodId = 1L;
    GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId);
    int goodStock = goodDO.getGoodCounts();
    if (goodStock >= 1) {
        goodMapper.saleOneGood(goodId);
    }
    return HttpResult.success();
}
  • 加锁
@Override
public HttpResult saleGoodsLock(){
    // 以指定goodId = 2:卫龙为例
    Long goodId = 2L;
    GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId);
    int goodStock = goodDO.getGoodCounts();
    String key = goodDO.getGoodName();
    log.info("{}剩余总库存,{}件", key,goodStock);
    // 将商品的实时库存放在redis 中,便于读取
    stringRedisTemplate.opsForValue().set(key, Integer.toString(goodStock));
    // redisson 锁 的key
    String lockKey = goodDO.getId() +"_" + key;
    RLock lock = redissonClient.getLock(lockKey);
    // 设置60秒自动释放锁  (默认是30秒自动过期)
    lock.lock(60, TimeUnit.SECONDS);
    // 此步开始,串行销售
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
    // 如果缓存中库存量大于1,可以继续销售
    if (stock >= 1) {
        goodDO.setGoodCounts(stock - 1);
        int num = goodMapper.saleOneGood(goodId);
        if (num == 1) {
            // 减库存成功,将缓存同步
            stringRedisTemplate.opsForValue().set(key,Integer.toString((stock-1)));
        }
        log.info("{},当前库存,{}件", key,stock);
    }
    lock.unlock();
    return HttpResult.success();
}

2.3 测试

采用的是JMeter工具进行测试,初始化的时候两个商品的库存设置都是:5;所以这里加锁和不加锁的情况都设置成:十次并发请求。

2.3.1 不加锁

/**
 * 售卖商品(不加锁)
 * @return
 */
@PostMapping("/saleGoods")
public HttpResult saleGoods() {
    return redisLockService.saleGoods();
}

2.3.2 加锁

/**
 * 售卖商品(加锁)
 * @return
 */
@PostMapping("/saleGoodsLock")
public HttpResult saleGoodsLock() {
    return redisLockService.saleGoodsLock();
}

2.3.3 小结

通过2.3.12.3.2的结果对比很明显:前者出现了超卖情况,库存数卖到了-5,这是决不允许的;而加了锁的情况后,库存只会减少到0,便不再销售。

三、总结

再次说明:以上代码不全,如需尝试,请前往Van 的 Github 查看完整示例代码

第一种基于Redis的分布式锁并不适合用于生产环境。Redisson 可用于生产环境。当然,分布式的选择还有Zookeeper的选项,Van后续会整理出来供大家参考。

3.1 示例源码地址

https://github.com/vanDusty/SpringBoot-Home/tree/master/springboot-demo-lock/redis-lock

3.2 技术交流

  1. 风尘博客
  2. 风尘博客-掘金
  3. 风尘博客-CSDN

原文地址:https://www.cnblogs.com/vandusty/p/11561160.html

时间: 2024-11-10 09:04:14

以商品超卖为例讲解Redis分布式锁的相关文章

订单并发商品超卖问题解决

问题:商品超卖(库存数出现负数). 模拟并发: goods商品表: /** * 下单 * @return string * @throws \yii\db\Exception */ public function actionIndex() { $redis = Yii::$app->redis; // 使用redis做一些统计 $redis->incr('total'); // 自增(记录一共成功进来了多少个请求) //$redis->del('total'); //$redis-&g

redis分布式锁解决超卖问题

1.1 redis事物 1.redis事物介绍 1. redis事物是可以一次执行多个命令,本质是一组命令的集合. 2. 一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入 作用:一个队列中,一次性.顺序性.排他性的执行一系列命令 2.multi 指令基本使用 1. 下面指令演示了一个完整的事物过程,所有指令在exec前不执行,而是缓存在服务器的一个事物队列中 2. 服务器一旦收到exec指令才开始执行事物队列,执行完毕后一次性返回所有结果 3. 因为redis是单线程的,所

基于redis分布式锁实现“秒杀”

最近在项目中遇到了类似"秒杀"的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓"秒杀"的基本思路. 业务场景 所谓秒杀,从业务角度看,是短时间内多个用户"争抢"资源,这里的资源在大部分秒杀场景里是商品:将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确. 一些可能的实现 刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可

SpringBoot集成Redis分布式锁以及Redis缓存

https://blog.csdn.net/qq_26525215/article/details/79182687 集成Redis 首先在pom.xml中加入需要的redis依赖和缓存依赖 <!-- 引入redis依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifa

【分布式缓存系列】集群环境下Redis分布式锁的正确姿势

一.前言 在上一篇文章中,已经介绍了基于Redis实现分布式锁的正确姿势,但是上篇文章存在一定的缺陷——它加锁只作用在一个Redis节点上,如果通过sentinel保证高可用,如果master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况: 客户端1在Redis的master节点上拿到了锁 Master宕机了,存储锁的key还没有来得及同步到Slave上 master故障,发生故障转移,slave节点升级为master节点 客户端2从新的Master获取到了对应同一个资源的锁 于是,客

Redis 分布式锁的实现

0X00 测试环境 CentOS 6.6 + Redis 3.2.10 + PHP 7.0.7(+ phpredis 4.1.0) [[email protected] ~]# cat /etc/issue CentOS release 6.6 (Final) Kernel \r on an \m [[email protected] ~]# redis-server -v Redis server v=3.2.10 sha=00000000:0 malloc=jemalloc-3.6.0 bi

redis分布式锁redisson

原文:https://blog.csdn.net/Kincym/article/details/78697472 关于redisson的源代码请参考官网:https://github.com/redisson/redisson redisson官方讲解参考:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95 springBoot中实现 首先需要引入redisson <!--redis--> <dependency>

spring boot项目之redis分布式锁的应用

SETNX key value 起始版本:1.0.0 时间复杂度:O(1) 将key设置值为value,如果key不存在,这种情况下等同SET命令. 当key存在时,什么也不做.SETNX是"SET if Not eXists"的简写. 返回值 Integer reply, 特定值: 1 如果key被设置了 0 如果key没有被设置 ##例子 redis> SETNX mykey "Hello" (integer) 1 redis> SETNX myke

Redis分布式锁实现简单秒杀功能

这版秒杀只是解决瞬间访问过高服务器压力过大,请求速度变慢,大大消耗服务器性能的问题. 主要就是在高并发秒杀的场景下,很多人访问时并没有拿到锁,所以直接跳过了.这样就处理了多线程并发问题的同时也保证了服务器的性能的稳定. 接下来我们使用redis的分布式锁来进行枷锁处理: 我们可以在进入下单的方法后将核心的方法加锁,然后离开后进行解锁 主要三步: 加锁 核心方法 解锁 首页分布式加锁解锁工具类: @Component public class RedisLock { private static