Java使用Redisson分布式锁实现原理

本篇文章摘自:https://www.jb51.net/article/149353.htm

由于时间有限,暂未验证 仅先做记录。有大家注意下哈(会尽快抽时间进行验证)

1. 基本用法

添加依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.8.2</version>
</dependency>
Config config = new Config();
config.useClusterServers()
  .setScanInterval(2000) // cluster state scan interval in milliseconds
  .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
  .addNodeAddress("redis://127.0.0.1:7002");

RedissonClient redisson = Redisson.create(config);

RLock lock = redisson.getLock("anyLock");

lock.lock();

try {
  ...
} finally {
  lock.unlock();
}

针对上面这段代码,重点看一下Redisson是如何基于Redis实现分布式锁的

Redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法

更多请参见https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

2. 加锁

可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

首先,看一下evalWriteAsync方法的定义

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

首先,看一下evalWriteAsync方法的定义

最后两个参数分别是keys和params

实际调用是这样的:

单独将调用的那一段摘出来看

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
         "if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
           "redis.call(‘hset‘, KEYS[1], ARGV[2], 1); " +
           "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
           "return nil; " +
         "end; " +
         "if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " +
           "redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
           "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
           "return nil; " +
         "end; " +
         "return redis.call(‘pttl‘, KEYS[1]);",
          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,这段脚本的意思是

  1、判断有没有一个叫“abc”的key

  2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

  3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

  4、返回“abc”的生存时间(毫秒)

这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

3. 解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
      "if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
        "redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
        "return 1; " +
      "end;" +
      "if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then " +
        "return nil;" +
      "end; " +
      "local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1); " +
      "if (counter > 0) then " +
        "redis.call(‘pexpire‘, KEYS[1], ARGV[2]); " +
        "return 0; " +
      "else " +
        "redis.call(‘del‘, KEYS[1]); " +
        "redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
        "return 1; "+
      "end; " +
      "return nil;",
      Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
 
}

我们还是假设name=abc,假设线程ID是Thread-1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面脚本的意思是:

  1、判断是否存在一个叫“abc”的key

  2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1

  3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

  4、若字段不存在,返回空,若字段存在,则字段值减1

  5、若减完以后,字段值仍大于0,则返回0

  6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

4. 等待

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

再回到前面获取锁的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return;
  }
 
  //  订阅
  RFuture<RedissonLockEntry> future = subscribe(threadId);
  commandExecutor.syncSubscription(future);
 
  try {
    while (true) {
      ttl = tryAcquire(leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        break;
      }
 
      // waiting for message
      if (ttl >= 0) {
        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
        getEntry(threadId).getLatch().acquire();
      }
    }
  } finally {
    unsubscribe(future, threadId);
  }
//    get(lockAsync(leaseTime, unit));
}
 
 
protected static final LockPubSub PUBSUB = new LockPubSub();
 
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
  return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
 
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
  PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

5. 小结

6. 其它相关

基于Redis的分布式锁的简单实现

@感谢原文作者的分享:https://www.jb51.net/article/149353.htm

原文地址:https://www.cnblogs.com/nov5026/p/10764068.html

时间: 2024-07-29 14:06:52

Java使用Redisson分布式锁实现原理的相关文章

Redisson 分布式锁超简封装

Redisson是一个在Redis的基础上实现的Java驻内存数据网格.它几乎提供了Redis所有工具,不仅封装Redis底层数据结构,而且还提供了很多Java类型映射.Redisson支持redis单实例.redis哨兵.redis cluster.redis master-slave等各种部署架构.Redisson除了普通分布式锁还支持 联锁(MultiLock),读写锁(ReadWriteLock),公平锁(Fair Lock),红锁(RedLock),信号量(Semaphore),可过期

SpringBoot集成redisson分布式锁

官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95 1.引用redisson的pom <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.5.0</version> </dependency> 2.定义Lo

springboot整合redisson分布式锁

一.通过maven引入redisson的jar包 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency> 二.在yaml文件中引入redis的相关配置(redis单节点可以读取原有redis配置拼装,如果是主从需另外独立配置,相关属性

[转帖]SpringBoot集成redisson分布式锁

https://www.cnblogs.com/yangzhilong/p/7605807.html 前几天同事刚让增加上这一块东西. 百度查一下 啥意思.. 学习一下. 官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95 20180226更新:增加tryLock方法,建议后面去掉DistributedLocker接口和其实现类,直接在RedissLockUtil中注入RedissonClient实现类(简单但会丢失

基于Redis的简单分布式锁的原理

参考资料:https://redis.io/commands/setnx 加锁是为了解决多线程的资源共享问题.Java中,单机环境的锁可以用synchronized和Lock,其他语言也都应该有自己的加锁机制.但是到了分布式环境,单机环境中的锁就没什么作用了,因为每个节点只能获取到自己机器内存中的锁,而无法获取到其他节点的锁状态. 分布式环境中,应该用专门的分布式锁来解决需要加锁的问题.分布式锁有很多实现,Redis,zookeeper都可以.这里以Redis为例,讲述一下基于Redis的分布式

java实现Redis分布式锁

网上到处都是分布式锁的代码,基本都是通过setNX 和 expire 这两个不是原子操作,肯定会有问题,不乏好多人通过用setNX的value当做过期时间来弥补等等.但是好像都不太好,或者多少有点问题. 从一个大神那里得来的代码 1 package com.abc.def.util; 2 3 import redis.clients.jedis.Jedis; 4 5 import java.util.Collections; 6 7 public class RedisDistributedLo

Redlock(redis分布式锁)原理分析

Redlock:全名叫做 Redis Distributed Lock;即使用redis实现的分布式锁: 使用场景:多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击): 官网文档地址如下:https://redis.io/topics/distlock 这个锁的算法实现了多redis实例的情况,相对于单redis节点来说,优点在于 防止了 单节点故障造成整个服务停止运行的情况:并且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法: 此博客或

redis中使用java脚本实现分布式锁

转载于:http://www.itxuexiwang.com/a/shujukujishu/redis/2016/0216/115.html?1455860390 edis被大量用在分布式的环境中,自然而然分布式环境下的锁如何解决,立马成为一个问题.例如我们当前的手游项目,服务器端是按业务模块划分服务器的,有应用服,战斗服等,但是这两个vm都有可能同时改变玩家的属性,这如果在同一个vm下面,就很容易加锁,但如果在分布式环境下就没那么容易了,当然利用redis现有的功能也有解决办法,比如redis

ZooKeeper 分布式锁实现原理

原理 进程需要访问共享数据时, 就在"/locks"节点下创建一个sequence类型的子节点, 称为thisPath. 当thisPath在所有子节点中最小时, 说明该进程获得了锁. 进程获得锁之后, 就可以访问共享资源了. 访问完成后, 需要将thisPath删除. 锁由新的最小的子节点获得. 进程如何知道thisPath是所有子节点中最小的呢? 可以在创建的时候, 通过getChildren方法获取子节点列表, 然后在列表中找到排名比thisPath前1位的节点, 称为waitP