使用Redisson实现分布式锁,Spring AOP简化之

源码

Redisson概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

关于Redisson更多详细介绍,可参考Redssion概述

Redisson提供的分布式锁

可重入锁

Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁。下面是RLock的基本使用方法:

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

// 支持过期解锁功能
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

Redisson同时还为分布式锁提供了异步执行的相关方法:

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

公平锁

Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。在提供了自动过期解锁功能的同时,保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。

RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();

// 支持过期解锁功能
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();

其他锁

Redisson还提供了其他机制的锁,如联锁(MultiLock)、红锁(RedLock)等。详细可参考:分布式锁和同步器

使用Redisson实现分布式锁

  1. 定义回调接口
/**
 * 分布式锁回调接口
 */
public interface DistributedLockCallback<T> {
    /**
     * 调用者必须在此方法中实现需要加分布式锁的业务逻辑
     *
     * @return
     */
    public T process();

    /**
     * 得到分布式锁名称
     *
     * @return
     */
    public String getLockName();
}
  1. 定义分布式锁模板
/**
 * 分布式锁操作模板
 */
public interface DistributedLockTemplate {

    long DEFAULT_WAIT_TIME = 30;
    long DEFAULT_TIMEOUT   = 5;
    TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

    /**
     * 使用分布式锁,使用锁默认超时时间。
     * @param callback
     * @param fairLock 是否使用公平锁
     * @return
     */
    <T> T lock(DistributedLockCallback<T> callback, boolean fairLock);

    /**
     * 使用分布式锁。自定义锁的超时时间
     *
     * @param callback
     * @param leaseTime 锁超时时间。超时后自动释放锁。
     * @param timeUnit
     * @param fairLock 是否使用公平锁
     * @param <T>
     * @return
     */
    <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock);

    /**
     * 尝试分布式锁,使用锁默认等待时间、超时时间。
     * @param callback
     * @param fairLock 是否使用公平锁
     * @param <T>
     * @return
     */
    <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock);

    /**
     * 尝试分布式锁,自定义等待时间、超时时间。
     * @param callback
     * @param waitTime 获取锁最长等待时间
     * @param leaseTime 锁超时时间。超时后自动释放锁。
     * @param timeUnit
     * @param fairLock 是否使用公平锁
     * @param <T>
     * @return
     */
    <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock);
}
  1. 实现分布式锁模板
public class SingleDistributedLockTemplate implements DistributedLockTemplate {
    private RedissonClient redisson;

    public SingleDistributedLockTemplate() {
    }

    public SingleDistributedLockTemplate(RedissonClient redisson) {
        this.redisson = redisson;
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) {
        return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
        RLock lock = getLock(redisson, callback.getLockName(), fairLock);
        try {
            lock.lock(leaseTime, timeUnit);
            return callback.process();
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    @Override
    public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) {
        return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
    }

    @Override
    public <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
        RLock lock = getLock(redisson, callback.getLockName(), fairLock);
        try {
            if (lock.tryLock(waitTime, leaseTime, timeUnit)) {
                return callback.process();
            }
        } catch (InterruptedException e) {

        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
        return null;
    }

    private RLock getLock(RedissonClient redisson, String lockName, boolean fairLock) {
        RLock lock;
        if (fairLock) {
            lock = redisson.getFairLock(lockName);
        } else {
            lock = redisson.getLock(lockName);
        }
        return lock;
    }

    public void setRedisson(RedissonClient redisson) {
        this.redisson = redisson;
    }
}
  1. 使用SingleDistributedLockTemplate
DistributedLockTemplate lockTemplate = ...;
final String lockName = ...;
lockTemplate.lock(new DistributedLockCallback<Object>() {
    @Override
    public Object process() {
        //do some business
        return null;
    }

    @Override
    public String getLockName() {
        return lockName;
    }
}, false);

但是每次使用分布式锁都要写类似上面的重复代码,有没有什么方法可以只关注核心业务逻辑代码的编写,即上面的"do some business"。下面介绍如何使用Spring AOP来实现这一目标。

使用Spring AOP简化分布式锁

  1. 定义注解@DistributedLock
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {

    String lockName() default "";

    String lockNamePre() default ""; //lockName后缀

    String lockNamePost() default ""; //lockName后缀

    String param() default ""; //获取注解的方法第一个参数对象的某个属性值来作为lockName。因为有时候lockName是不固定的。

    boolean fairLock() default false;  //是否使用公平锁。

    boolean tryLock() default false;  //是否使用尝试锁。

    long waitTime() default 30L;

    long leaseTime() default 5L;

    TimeUnit timeUnit() default TimeUnit.SECONDS;

}
  1. 定义切面织入的代码
@Aspect
@Component
public class DistributedLockAspect {

    @Autowired
    private DistributedLockTemplate lockTemplate;

    @Pointcut("@annotation(cn.sprinkle.study.distributedlock.common.annotation.DistributedLock)")
    public void DistributedLockAspect() {}

    @Around(value = "DistributedLockAspect()")
    public Object doAround(ProceedingJoinPoint pjp) throws Throwable {

        //切点所在的类名
        String targetName = pjp.getTarget().getClass().getName();

        //使用了注解的方法
        String methodName = pjp.getSignature().getName();

        Class targetClass = Class.forName(targetName);

        Method[] methods = targetClass.getMethods();

        Object[] arguments = pjp.getArgs();

        //得到使用注解的方法。可使用Method.getAnnotation(Class<T> annotationClass)获取指定的注解,然后可获得注解的属性
        Optional<Method> optional = Arrays.stream(methods)
                .parallel()
                .filter(method -> method.getName().equals(methodName))
                .findAny();

        if (optional.isPresent()) {
            Method m = optional.get();
            final String lockName = getLockName(m, arguments);
            return lock(pjp, m, lockName);
        }

        return null;
    }

    public String getLockName(Method method, Object[] args) throws Throwable {

        DistributedLock annotation = method.getAnnotation(DistributedLock.class);

        String lockName = annotation.lockName(),
                param = annotation.param();

        if (StringUtils.isEmpty(lockName)) {
            if (!StringUtils.isEmpty(param)) {

                if (args.length > 0) {
                    Object arg = args[0];

                    lockName = String.valueOf(getParam(arg, param));

                    String preLockName = annotation.lockNamePre(),
                            postLockName = annotation.lockNamePost();

                    lockName = preLockName + lockName + postLockName;

                    return lockName;
                }
            }
        } else {
            return lockName;
        }

        throw new IllegalArgumentException("lockName can‘t be empty!");
    }

    /**
     * 从方法参数获取数据
     *
     * @param param
     * @param arg 方法的参数数组
     * @return
     */
    public Object getParam(Object arg, String param) throws Throwable {

        if (!StringUtils.isEmpty(param) && arg != null) {
            Object result = PropertyUtils.getProperty(arg, param);
            return result;
        }

        return null;
    }

    public Object lock(ProceedingJoinPoint pjp, Method method, final String lockName) {

        DistributedLock annotation = method.getAnnotation(DistributedLock.class);

        boolean fairLock = annotation.fairLock();

        boolean tryLock = annotation.tryLock();

        if (tryLock) {
            return tryLock(pjp, annotation, lockName, fairLock);
        } else {
            return lock(pjp,lockName, fairLock);
        }
    }

    public Object lock(ProceedingJoinPoint pjp, final String lockName, boolean fairLock) {
        return lockTemplate.lock(new DistributedLockCallback<Object>() {
            @Override
            public Object process() {
                return proceed(pjp);
            }

            @Override
            public String getLockName() {
                return lockName;
            }
        }, fairLock);
    }

    public Object tryLock(ProceedingJoinPoint pjp, DistributedLock annotation, final String lockName, boolean fairLock) {

        long waitTime = annotation.waitTime(),
                leaseTime = annotation.leaseTime();
        TimeUnit timeUnit = annotation.timeUnit();

        return lockTemplate.tryLock(new DistributedLockCallback<Object>() {
            @Override
            public Object process() {
                return proceed(pjp);
            }

            @Override
            public String getLockName() {
                return lockName;
            }
        }, waitTime, leaseTime, timeUnit, fairLock);
    }

    public Object proceed(ProceedingJoinPoint pjp) {
        try {
            return pjp.proceed();
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return null;
    }
}
  1. 使用注解@DistributedLock实现分布式锁

有了上面两段代码,以后需要用到分布式锁,只需在核心业务逻辑方法添加注解@DistributedLock,并设置LockName、fairLock等即可。

@Service
public class DistributionService {

    @Autowired
    private RedissonClient redissonClient;

    @DistributedLock(lockName = "lock", lockNamePost = ".lock")
    public Integer aspect() {
        RMap<String, Integer> map = redissonClient.getMap("distributionTest");

        Integer count = map.get("count");

        if (count > 0) {
            count = count - 1;
            map.put("count", count);
        }

        return count;
    }

    @DistributedLock(param = "id", lockNamePost = ".lock")
    public Integer aspect(Person person) {
        RMap<String, Integer> map = redissonClient.getMap("distributionTest");

        Integer count = map.get("count");

        if (count > 0) {
            count = count - 1;
            map.put("count", count);
        }

        return count;
    }

}
  1. 测试

定义一个Worker类:

public class Worker implements Runnable {

    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private final DistributionService service;

    public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, DistributionService service) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
        this.service = service;
    }

    @Override
    public void run() {
        try {
            startSignal.await();

            System.out.println(Thread.currentThread().getName() + " start");

            //Integer count = service.aspect();
            Integer count = service.aspect(new Person(1, "张三"));

            System.out.println(Thread.currentThread().getName() + ": count = " + count);

            doneSignal.countDown();

        } catch (InterruptedException ex) {
            System.out.println(ex);
        }
    }
}

定义Controller类:

@RestController
@RequestMapping("/distributedLockTest")
public class DistributedLockTestController {

    private int count = 10;

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private DistributionService service;

    @RequestMapping(method = RequestMethod.GET)
    public String distributedLockTest() throws Exception {

        RMap<String, Integer> map = redissonClient.getMap("distributionTest");
        map.put("count", 8);

        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(count);

        for (int i = 0; i < count; ++i) { // create and start threads
            new Thread(new Worker(startSignal, doneSignal, service)).start();
        }

        startSignal.countDown(); // let all threads proceed
        doneSignal.await();
        System.out.println("All processors done. Shutdown connection");

        return "finish";
    }

}

Redisson基本配置:

singleServerConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  password:
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://127.0.0.1:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 10
  connectionPoolSize: 64
  database: 0
  dnsMonitoring: false
  dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
useLinuxNativeEpoll: false

工程中需要注入的对象:

@Value("classpath:/redisson-conf.yml")
Resource configFile;

@Bean(destroyMethod = "shutdown")
RedissonClient redisson()
        throws IOException {
    Config config = Config.fromYAML(configFile.getInputStream());
    return Redisson.create(config);
}

@Bean
DistributedLockTemplate distributedLockTemplate(RedissonClient redissonClient) {
    return new SingleDistributedLockTemplate(redissonClient);
}

需要引入的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.5.3</version>
</dependency>
<dependency>
    <groupId>commons-beanutils</groupId>
    <artifactId>commons-beanutils</artifactId>
    <version>1.8.3</version>
</dependency>

最后启动工程,然后访问localhost:8080/distributedLockTest,可以看到如下结果:

观察结果,可以看出,10个线程中只有8个线程能执行count减1操作,而且多个线程是依次执行的。也就是说分布式锁起作用了。

至此,使用Redisson实现分布式锁,然后使用Spring AOP简化分布式锁介绍完毕。

若有什么地方有错误的或需要改进的,欢迎留言一起讨论交流。

时间: 2024-09-30 19:43:54

使用Redisson实现分布式锁,Spring AOP简化之的相关文章

利用Redisson实现分布式锁及其底层原理解析

Redis介绍 参考地址:https://blog.csdn.net/turbo_zone/article/details/83422215 redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此

Redisson实现分布式锁—RedissonLock

Redisson实现分布式锁-RedissonLock 有关Redisson实现分布式锁上一篇博客讲了分布式的锁原理:Redisson实现分布式锁---原理 这篇主要讲RedissonLock和RLock.Redisson分布式锁的实现是基于RLock接口,RedissonLock实现RLock接口. 一.RLock接口 1.概念 public interface RLock extends Lock, RExpirable, RLockAsync 很明显RLock是继承Lock锁,所以他有Lo

【高并发】你知道吗?大家都在使用Redisson实现分布式锁了!!

写在前面 忘记之前在哪个群里有朋友在问:有出分布式锁的文章吗-@冰河?我的回答是:这周会有,也是[高并发]专题的.想了想,还是先发一个如何使用Redisson实现分布式锁的文章吧?为啥?因为使用Redisson实现分布式锁简单啊!Redisson框架是基于Redis实现的分布式锁,非常强大,只需要拿来使用就行了,至于分布式锁的原理啥的,后面再撸一篇文章就是了. Redisson框架十分强大,基于Redisson框架可以实现几乎你能想到的所有类型的分布式锁.这里,我就列举几个类型的分布式锁,并各自

redisson实现分布式锁原理

Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

Java之——redis并发读写锁,使用Redisson实现分布式锁

原文:http://blog.csdn.net/l1028386804/article/details/73523810 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁. [java] view plain copy public void testReentrantLock(RedissonClient redisson){ RLock lo

Redisson获取分布式锁

1. maven <!-- redisson 分布式锁 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.2</version> </dependency> 2. redisson客户端配置类 package com.harara.redis; import

redisson spring boot starter 做分布式锁

使用redisson做分布式锁 分布式锁 在java中单体应用中,我们如果想要保证一个接口或者服务.方法当下只有一个线程在运行,我们可以通过JDK提供的Lock.Semaphore.同步锁等多种方式实现只有一个线程在运行. 在微服务系统中,我们的单体应用会变成多个节点,只靠JDK本身的锁只能控制一个节点的运行,所以我们需要一个可以控制全局的锁来控制系统的运行,这就是所谓的分布式锁. Zk redis 等中间件都可以做分布式锁,优缺点也各不相同,在我们现在的系统中zk的直接操作还是比较少,更多的是

springboot整合redisson分布式锁

一.通过maven引入redisson的jar包 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency> 二.在yaml文件中引入redis的相关配置(redis单节点可以读取原有redis配置拼装,如果是主从需另外独立配置,相关属性

spring data redis分布式锁

问题 项目采用spring-boot-starter-data-redis,RedisTemplate中没有同时设置NX和EX的方法,如果使用setIfAbsent()方法也就是NX,再设置过期时间expire()也就是EX,如果在设置EX时失败则会造成死锁.在jedis中提供了同时设置NX和EX的方法,这里通过RedisTemplate的execute()方法获取Jedis. 存在问题 解决方案可以可以参考Redisson 哨兵模式下有问题,Master挂了可能没有复制到Slave导致锁丢失