【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理

前言

前面已经写了Redisson大多的内容,我们再看看Redisson官网共有哪些组件:

image.png

剩下还有Semaphore和CountDownLatch两块,我们就趁热打铁,赶紧看看Redisson是如何实现的吧。

我们在JDK中都知道Semaphore和CountDownLatch两兄弟,这里就不多赘述,不了解的可以再回头看看。

Semaphore使用示例

先看下Semaphore原理图如下:

image.png

接着我们看下Redisson中使用的案例:

RSemaphore semaphore = redisson.getSemaphore("semaphore");// 同时最多允许3个线程获取锁semaphore.trySetPermits(3);

for(int i = 0; i < 10; i++) {  new Thread(new Runnable() {

    @Override    public void run() {      try {        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]尝试获取Semaphore锁");         semaphore.acquire();        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]成功获取到了Semaphore锁,开始工作");         Thread.sleep(3000);          semaphore.release();        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]释放Semaphore锁");       } catch (Exception e) {        e.printStackTrace();      }    }  }).start();}

Semaphore源码解析

接着我们根据上面的示例,看看源码是如何实现的:

第一步:
semaphore.trySetPermits(3);

public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {    @Override    public boolean trySetPermits(int permits) {        return get(trySetPermitsAsync(permits));    }

    @Override    public RFuture<Boolean> trySetPermitsAsync(int permits) {        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                "local value = redis.call(‘get‘, KEYS[1]); " +                "if (value == false or value == 0) then "                    + "redis.call(‘set‘, KEYS[1], ARGV[1]); "                    + "redis.call(‘publish‘, KEYS[2], ARGV[1]); "                    + "return 1;"                + "end;"                + "return 0;",                Arrays.<Object>asList(getName(), getChannelName()), permits);    }

}

执行流程为:

  1. get semaphore,获取到一个当前的值
  2. 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3
  3. 然后发布一些消息,返回1

接着看看semaphore.acquire();semaphore.release(); 逻辑:

public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {    @Override    public RFuture<Boolean> tryAcquireAsync(int permits) {        if (permits < 0) {            throw new IllegalArgumentException("Permits amount can‘t be negative");        }        if (permits == 0) {            return RedissonPromise.newSucceededFuture(true);        }

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                  "local value = redis.call(‘get‘, KEYS[1]); " +                  "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +                      "local val = redis.call(‘decrby‘, KEYS[1], ARGV[1]); " +                      "return 1; " +                  "end; " +                  "return 0;",                  Collections.<Object>singletonList(getName()), permits);    }

    @Override    public RFuture<Void> releaseAsync(int permits) {        if (permits < 0) {            throw new IllegalArgumentException("Permits amount can‘t be negative");        }        if (permits == 0) {            return RedissonPromise.newSucceededFuture(null);        }

        return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,            "local value = redis.call(‘incrby‘, KEYS[1], ARGV[1]); " +            "redis.call(‘publish‘, KEYS[2], value); ",            Arrays.<Object>asList(getName(), getChannelName()), permits);    }

}

先看看加锁的逻辑tryAcquireAsync()

  1. get semaphore,获取到一个当前的值,比如说是3,3 > 1
  2. decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
  3. decrby semaphore 1
  4. decrby semaphore 1
  5. 执行3次加锁后,semaphore值为0

此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁,如下图:

image.png

接着看看解锁逻辑releaseAsync()

  1. incrby semaphore 1,每次一个客户端释放掉这个锁的话,就会将信号量的值累加1,信号量的值就不是0了

看到这里大家就明白了了,Redisson实现Semaphore其实是很简单了

CountDownLatch使用示例

使用案例:

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");latch.trySetCount(3);System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。"); 

for(int i = 0; i < 3; i++) {  new Thread(new Runnable() {

    @Override    public void run() {      try {        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。");         Thread.sleep(3000);         RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch");        localLatch.countDown();        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作");       } catch (Exception e) {        e.printStackTrace();       }    }

  }).start();}

latch.await();System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走"); 

CountDownLatch 源码解析

源码如下:

 public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

    @Override    public RFuture<Boolean> trySetCountAsync(long count) {        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                "if redis.call(‘exists‘, KEYS[1]) == 0 then "                    + "redis.call(‘set‘, KEYS[1], ARGV[2]); "                    + "redis.call(‘publish‘, KEYS[2], ARGV[1]); "                    + "return 1 "                + "else "                    + "return 0 "                + "end",                Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);    }

    @Override    public RFuture<Void> countDownAsync() {        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                        "local v = redis.call(‘decr‘, KEYS[1]);" +                        "if v <= 0 then redis.call(‘del‘, KEYS[1]) end;" +                        "if v == 0 then redis.call(‘publish‘, KEYS[2], ARGV[1]) end;",                    Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);    }

}

先分析trySetCount()方法逻辑:

  1. exists anyCountDownLatch,第一次肯定是不存在的
  2. set redisson_countdownlatch__channel__anyCountDownLatch 3
  3. 返回1

接着分析latch.await();方法,如下图:

image.png

这个方法其实就是陷入一个while true死循环,不断的get anyCountDownLatch的值,如果这个值还是大于0那么就继续死循环,否则的话呢,就退出这个死循环

最后分析localLatch.countDown();方法:

  1. decr anyCountDownLatch,就是每次一个客户端执行countDown操作,其实就是将这个cocuntDownLatch的值递减1
  2. await()方面已经分析过,死循环去判断anyCountDownLatch对应存储的值是否为0,如果为0则接着执行自己的逻辑

总结

看到了这里 这两个组件是不是很简单?

到了这里,Redisson部分的学习都已经结束了,后面还会学习ZK实现分布式锁的原理。

申明

本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

原文地址:https://www.cnblogs.com/wang-meng/p/12548492.html

时间: 2024-08-05 03:53:59

【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理的相关文章

Java并发包中Semaphore的工作原理、源码分析及使用示例

1. 信号量Semaphore的介绍 我们以一个停车场运作为例来说明信号量的作用.假设停车场只有三个车位,一开始三个车位都是空的.这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦.以后来的车必须在入口等待,直到停车场中有车辆离开.这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复. 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用.信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的

基于Redis实现分布式锁-Redisson使用及源码分析

在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 本文主要探讨另外一种实现分布式最终一致性的解决方案--采用分布式锁.基于分布式锁的解决

Redisson实现分布式锁—RedissonLock

Redisson实现分布式锁-RedissonLock 有关Redisson实现分布式锁上一篇博客讲了分布式的锁原理:Redisson实现分布式锁---原理 这篇主要讲RedissonLock和RLock.Redisson分布式锁的实现是基于RLock接口,RedissonLock实现RLock接口. 一.RLock接口 1.概念 public interface RLock extends Lock, RExpirable, RLockAsync 很明显RLock是继承Lock锁,所以他有Lo

spring boot 利用redisson实现redis的分布式锁

原文:http://liaoke0123.iteye.com/blog/2375469 利用redis实现分布式锁,网上搜索的大部分是使用java jedis实现的. redis官方推荐的分布式锁实现为redisson http://ifeve.com/redis-lock/ 以下为spring boot实现分布式锁的步骤 项目pom中需要添加官方依赖 我是1.8JDK固为 Pom代码   <!-- redisson --> <dependency> <groupId>

不为人所知的分布式锁实现全都在这里了!

1.引入业务场景 首先来由一个场景引入: 最近老板接了一个大单子,允许在某终端设备安装我们的APP,终端设备厂商日活起码得几十万到百万级别,这个APP也是近期产品根据市场竞品分析设计出来的,几个小码农通宵达旦开发出来的,主要功能是在线购物一站式服务,后台可以给各个商家分配权限,来维护需要售卖的商品信息. 老板大O:谈下来不容易,接下来就是考虑如何吸引终端设备上更多的用户注册上来,如何引导用户购买,这块就交给小P去负责了,需求尽快做,我明天出差! 产品小P:嘿嘿~,眼珠一转儿,很容易就想到了,心里

整理分布式锁:业务场景&amp;分布式锁家族&amp;实现原理

1.引入业务场景 业务场景一出现: 因为小T刚接手项目,正在吭哧吭哧对熟悉着代码.部署架构.在看代码过程中发现,下单这块代码可能会出现问题,这可是分布式部署的,如果多个用户同时购买同一个商品,就可能导致商品出现 库存超卖 (数据不一致) 现象,对于这种情况代码中并没有做任何控制. 原来一问才知道,以前他们都是售卖的虚拟商品,没啥库存一说,所以当时没有考虑那么多... 这次不一样啊,这次是售卖的实体商品,那就有库存这么一说了,起码要保证不能超过库存设定的数量吧. 小T大眼对着屏幕,屏住呼吸,还好提

使用分布式锁时考虑哪些问题

工作中经常会遇到争抢共享资源的场景,比如用户抢购秒杀商品,如果不对商品库存进行保护,可能会造成超卖的情况.超卖现象在售卖火车票的场景下更加明显,两个人购买到同一天同一辆列车,相同座位的情况是不允许出现的.交易系统中的退款同样如此,由于网络延迟和重复提交极端时间差的情况下,可能会造成同一个用户重复的退款请求.以上无论是超卖,还是重复退款,都是没有对需要保护的资源或业务进行完善的保护而造成的,从设计方面一定要避免这种情况的发生. 本文以退款交易场景入手,引入分布式锁,尝试分析分布式锁需要考虑关注点,

Redis分布式锁的实现原理

一.写在前面 现在面试,一般都会聊聊分布式系统这块的东西.通常面试官都会从服务框架(Spring Cloud.Dubbo)聊起,一路聊到分布式事务.分布式锁.ZooKeeper等知识. 所以咱们这篇文章就来聊聊分布式锁这块知识,具体的来看看Redis分布式锁的实现原理. 说实话,如果在公司里落地生产环境用分布式锁的时候,一定是会用开源类库的,比如Redis分布式锁,一般就是用Redisson框架就好了,非常的简便易用. 大家如果有兴趣,可以去看看Redisson的官网,看看如何在项目中引入Red

重磅发布-SpringBoot实战实现分布式锁视频教程

概要介绍:历经一个月的时间,我录制的分布式锁实战之SpringBoot实战实现系列完整视频教程终于出世了!在本课程中,我分享介绍了分布式锁出现的背景.实现方式以及将其应用到实际的业务场景中,包括"重复提交"."CRM系统销售人员抢单",并采用当前相当流行的微服务SpringBoot来搭建项目实战实现分布式锁. 课程学习:目前博主已将分布式锁实现以及实际业务场景实战的要点整理成课程,感兴趣的童鞋可以前往学习:http://edu.51cto.com/course/15