五、Curator使用:分布式锁

分布式锁介绍

分布式执行一些不需要同时执行的复杂任务,curator利用zk的特质,实现了这个选举过程。其实就是利用了多个zk客户端在同一个位置建节点,只会有一个客户端建立成功这个特性。来实现同一时间,只会选择一个客户端执行任务

代码

    //分布式锁
    InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path");
    CountDownLatch down = new CountDownLatch(1);
    for (int i = 0; i < 30; i++) {
        new Thread(()->{
            try {
                down.await();
                lock.acquire();

            } catch (Exception e) {
                e.printStackTrace();
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
            System.out.println(sdf.format(new Date()));
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    down.countDown();

InterProcessMutex 是一个可重入的排他锁,获取锁的过程是通过往ZK下面成功建立节点来实现的,下面是获取锁的过程

    //获取当前线程
    Thread currentThread = Thread.currentThread();
    //获取当前线程的锁数据
        LockData lockData = threadData.get(currentThread);
    //如果不为null,则将锁的数量+1,实现可重入
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
    //第一次会到这里,尝试在我们之前设置的路径下建立节点
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    //建立成功后就初始化lockdata 并按当前线程进行保存,所以可以通过创建多个thread来模拟锁竞争,而不需要建多个client。
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;

下面是attemptLock的重要代码

    while ( !isDone )
        {
            isDone = true;

            try
            {
                //建立节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //下面就是获取锁和加锁的循环了
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                //session过期时走这里,按策略处理,允许重试就重试,否则就抛出异常
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
 

下面是internalLockLoop的重要代码

    while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //排序是因为要实现公平锁,加上maxleases参数限制取首位
                List<String>        children = getSortedChildren();
                //得到子节点名称,比如 _c_ce2a26cb-9721-4f56-91fd-a6d00b00b12c-lock-0000000030
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                //获取是否获得锁的状态,重要方法,maxLeases为1,后面要通过这个参数进行对比,通过判断小于这个来实现 公平锁
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    //如果没有获得锁,就给当前节点加个watcher,继续等待,一旦被删掉就调用这个watcher notifyall。
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            // 这个watcher只有一个作用就是唤醒其他线程进行竞争 notifyAll();
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            ...
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }

原文地址:https://www.cnblogs.com/june777/p/11867381.html

时间: 2024-10-21 09:01:14

五、Curator使用:分布式锁的相关文章

spring整合curator实现分布式锁

为什么要有分布式锁? 比如说,我们要下单,分为两个操作,下单成功(订单服务),扣减库存(商品服务).如果没有锁的话,同时两个请求进来.先检查有没有库存,一看都有,然后下订单,减库存.这时候肯定会出现错误.我们想要的结果是.只有一个请求可以进来.当完成这个操作之后,下一个请求再进来.这才不会出现库存卖超的现象.这时候,就需要我们使用分布式锁来实现. 实现分布式锁的方法有很多种.redis,zk都可以.但是还是推荐zk. 先说下大体思路: 首先,我们在下单的时候,先获取锁.如果获取成功,就进行我们下

Curator实现分布式锁

public static void main (String[] args) {     String servers = "10.128.7.20:2181,10.128.7.20:2182,10.128.7.20:2183";     CuratorFramework curator = CuratorFrameworkFactory.builder().retryPolicy(new ExponentialBackoffRetry(10000, 3)).connectStrin

剖析curator的分布式互斥锁原理

1 前言 最近在做项目的时候,遇到一个多线程访问申请一个资源的问题.需要每个线程都能够最终在有效的时间内申请到或者超时失败.以前一般这种方式用的是redis做枷锁机制,每个线程都去redis加一把共同的锁,如果枷锁成功,则执行资源申请操作.而没有枷锁成功的线程,则在有效时间内循环尝试去枷锁,并且每次木有加锁成功,则就Thread.sleep()一会. 通过这种方式可以看出,这里对于sleep的时间间隔要求设置很严格,如果太小,则就会增加大规模的redis请求操作:而如果太长,当资源可用的时候,但

ZooKeeper学习(二)ZooKeeper实现分布式锁

一.简介 在日常开发过程中,大型的项目一般都会采用分布式架构,那么在分布式架构中若需要同时对一个变量进行操作时,可以采用分布式锁来解决变量访问冲突的问题,最典型的案例就是防止库存超卖,当然还有其他很多的控制方式,这篇文章我们讨论一下怎么使用ZooKeeper来实现分布式锁. 二.Curator 前面提到的分布式锁,在ZooKeeper中可以通过Curator来实现. 定义:Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开

分布式锁实现大型连续剧之(二):Zookeeper

前言 紧跟上文的:分布式锁实现(一):Redis ,这篇我们用Zookeeper来设计和实现分布式锁,并且研究下开源客户端工具Curator的分布式锁源码 设计实现 一.基本算法 1.在某父节点下创建临时有序节点2.判断创建的节点是否是当前父节点下所有子节点中序号最小的3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁4.解锁的时候删除当前节点 二.关键点 临时有序节点实现Zookeeper分布式锁关键就在于其[临时有序节点]的特

java 分布式锁 -图解- 秒懂

目录 写在前面 1.1. 分布式锁 简介 1.1.1. 图解:公平锁和可重入锁 模型 1.1.2. 图解: zookeeper分布式锁的原理 1.1.3. 分布式锁的基本流程 1.1.4. 加锁的实现 1.1.5. 释放锁的实现 1.1.1. 分布式锁的应用场景 写在最后 疯狂创客圈 亿级流量 高并发IM 实战 系列 疯狂创客圈 Java 分布式聊天室[ 亿级流量]实战系列之 -26[ 博客园 总入口 ] 写在前面 ? 大家好,我是作者尼恩.目前和几个小伙伴一起,组织了一个高并发的实战社群[疯狂

SpringBoot电商项目实战 — Zookeeper的分布式锁实现

上一篇演示了基于Redis的Redisson分布式锁实现,那今天我要再来说说基于Zookeeper的分布式现实. Zookeeper分布式锁实现 要用Zookeeper实现分布式锁,我就不得不说说zookeeper的数据存储.首先zookeeper的核心保存结构是一个DataTree数据结构,其实内部是一个Map<String, DataNode> nodes的数据结构,其中key是path,DataNode才是真正保存数据的核心数据结构,DataNode核心字段包括byte data[]用于

Redis专题(3):锁的基本概念到Redis分布式锁实现

拓展阅读:Redis闲谈(1):构建知识图谱 Redis专题(2):Redis数据结构底层探秘 近来,分布式的问题被广泛提及,比如分布式事务.分布式框架.ZooKeeper.SpringCloud等等.本文先回顾锁的概念,再介绍分布式锁,以及如何用Redis来实现分布式锁. 一.锁的基本了解 首先,回顾一下我们工作学习中的锁的概念. 为什么要先讲锁再讲分布式锁呢? 我们都清楚,锁的作用是要解决多线程对共享资源的访问而产生的线程安全问题,而在平时生活中用到锁的情况其实并不多,可能有些朋友对锁的概念

curator 实现分布式一致性锁

最近准备在项目中引入分布式锁,故而研究基于zookeeper的curator框架. 网上资料不多,自己研究其源码发现,这个框架已经帮我做了很多现成的实现. 下面介绍下锁的实现: 通过源码中LockingExample例子作为切入(推荐多利用现有资源,最快切入),为减小篇幅,代码仅保留关键部分. curator已经为我们准备好了锁的实现 ----InterProcessMutex,基于zookeeper跨jvm的公平互斥锁实现. ----------------------------------