【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

前言

前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别。

在分布式系统中,常见的分布式锁实现方案还有Zookeeper,接下来会深入研究Zookeeper是如何来实现分布式锁的。

Zookeeper初识

文件系统

Zookeeper维护一个类似文件系统的数据结构

image.png

每个子目录项如NameService都被称为znoed,和文件系统一样,我们能够自由的增加、删除znode,在znode下增加、删除子znode,唯一不同的在于znode是可以存储数据的。

有4种类型的znode

  • PERSISTENT--持久化目录节点客户端与zookeeper断开连接后,该节点依旧存在
  • PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
  • EPHEMERAL-临时目录节点客户端与zookeeper断开连接后,该节点被删除
  • EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

通知机制

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)等,zookeeper会通知客户端。

分布式锁

有了zookeeper的一致性文件系统,锁的问题变得容易。锁服务可以分为两类,一个是保持独占,另一个是控制时序。

  1. 对于第一类,我们将zookeeper上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。厕所有言:来也冲冲,去也冲冲,用完删除掉自己创建的distribute_lock 节点就释放出锁。
  2. 对于第二类, /distribute_lock 已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除自己创建的znode节点。

image.png

注明:以上内容参考 https://www.cnblogs.com/dream-to-pku/p/9513188.html

Curator框架初识

Curator是Netflix公司开源的一套Zookeeper客户端框架。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。

我们看下Apache Curator官网的介绍:

image.png

接着看下quick start中关于分布式锁相关的内容
地址为:http://curator.apache.org/getting-started.html

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
    try
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

使用很简单,使用InterProcessMutex类,使用其中的acquire()方法,就可以获取一个分布式锁了。

Curator分布式锁使用示例

启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-00000000001],每次获得锁时会生成这种串,释放锁时清空数据。

接下来看看加锁的示例:

public class Application {
    private static final String ZK_ADDRESS = "192.20.38.58:2181";
    private static final String ZK_LOCK_PATH = "/locks/lock_01";

    public static void main(String[] args) throws InterruptedException {
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");

        t1.start();
        t2.start();
    }

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

image.png

Curator 加锁实现原理

直接看Curator加锁的代码:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {

    private final ConcurrentMap<Thread, LockData>   threadData = Maps.newConcurrentMap();

     private static class LockData
    {
        final Thread        owningThread;
        final String        lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }

     private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn‘t necessary
        */

        Thread          currentThread = Thread.currentThread();

        LockData        lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData        newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
}

直接看internalLock()方法,首先是获取当前线程,然后查看当前线程是否在一个concurrentHashMap中,这里是重入锁的实现,如果当前已经已经获取了锁,那么这个线程获取锁的次数再+1

如果没有获取锁,那么就是用attemptLock()方法去尝试获取锁,如果lockPath不为空,说明获取锁成功,并将当前线程放入到map中。

接下来看看核心的加锁逻辑attemptLock()方法:

入参:
time : 获取锁等待的时间
unit:时间单位
lockNodeBytes:默认为null

public class LockInternals {
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
                if ( localLockNodeBytes != null )
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
                }
                else
                {
                    ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
                }
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can‘t find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }
}

ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

使用的临时顺序节点,首先他是临时节点,如果当前这台机器如果自己宕机的话,他创建的这个临时节点就会自动消失,如果有获取锁的客户端宕机了,zk可以保证锁会自动释放的

创建的数据结构类似于:

客户端A获取锁的代码,生成的ourPath=xxxx01

客户端B获取锁的代码,生成的ourPath=xxxx02

查看Zookeeper中/locks/lock_01下所有临时节点数据:

PS:01/02的图没有截到,这里又跑了一次截图所示 03/04 的顺序节点在ZK中的显示

接着重点看interalLockLoop()的逻辑:

public class LockInternals {
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( stat != null )
                        {
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                    }
                    // else it may have been deleted (i.e. lock released). Try to acquire again
                }
            }
        }

           // 省略部分代码
        return haveTheLock;
    }
}

重点看while循环中的逻辑
首先是获取锁的逻辑:

  1. 获取/locks/lock_01下排好序的znode节点,上面看图已经知道,会有xxx01xxx02两个节点
  2. 调用getsTheLock()方法获取锁,其中maxLeases为1,默认只能一个线程获取锁
  3. 定位到StandardLockInternalsDriver.getsTheLock()方法,其中代码核心如下:
    int ourIndex = children.indexOf(sequenceNodeName);
    boolean getsTheLock = ourIndex &lt; maxLeases;
  4. 上面sequenceNodeName参数为xxx01的全路径名,然后查看在排好序的children列表中是否为第一个元素,如果是第一个元素,返回的ourIndex=0,此时则认为获取锁成功
  5. 如果为有序列表中的第一个元素,那么predicateResults.getsTheLock() 为true,获取锁的标志位havaTheLock为true,直接返回获取锁成功

然后是获取锁失败的逻辑:
获取锁失败的核心代码:

String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

synchronized(this)
{
    Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
    if ( stat != null )
    {
        if ( millisToWait != null )
        {
            millisToWait -= (System.currentTimeMillis() - startMillis);
            startMillis = System.currentTimeMillis();
            if ( millisToWait <= 0 )
            {
                doDelete = true;    // timed out - delete our node
                break;
            }

            wait(millisToWait);
        }
        else
        {
            wait();
        }
    }
}
  1. 针对上一个节点添加监听器
  2. 如果加锁有过期时间,到了过期时间后直接break退出循环
  3. 当前线程处于wait()状态,等待上一个线程释放锁

Curator 释放锁实现原理

释放锁其实很简单,直接删除当前临时节点,因为下一个节点监听了上一个节点信息,所以上一个节点删除后,当前节点就会被唤醒重新获取锁。

private void deleteOurPath(String ourPath) throws Exception
{
    try
    {
        client.delete().guaranteed().forPath(ourPath);
    }
    catch ( KeeperException.NoNodeException e )
    {
        // ignore - already deleted (possibly expired session, etc.)
    }
}

总结

一张图总结:

04_Zookeeper分布式锁实现原理.jpg

原图可查看我的分享:
https://www.processon.com/view/link/5e80508de4b06b85300175d2

原文地址:https://www.cnblogs.com/wang-meng/p/12593992.html

时间: 2024-11-10 11:15:09

【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析的相关文章

聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互斥锁,基于AQS的互斥模式实现,也就是说同时只有一个线程进入临界区,唤醒下一个线程时也只能释放一个等待线程 2. 可重入,通过设置了一个字段exclusiveOwnerThread来标示当前获得锁的线程.获取锁操作是,如果当前线程是已经获得锁的线程,那么获取操作成功.把当前状态作为获得锁次数的计数器

两种方式实现自己的可重入锁

本篇文章将介绍两种自己动手实现可重入锁的方法. 我们都知道JDK中提供了一个类ReentrantLock,利用这个类我们可以实现一个可重入锁,这种锁相对于synchronized来说是一种轻量级锁. 重入锁的概念 重入锁实际上指的就是一个线程在没有释放锁的情况下,可以多次进入加锁的代码块. public void a() { lock2.lock(); System.out.println("a"); b(); lock2.unlock(); } public void b() { l

通俗易懂 悲观锁、乐观锁、可重入锁、自旋锁、偏向锁、轻量/重量级锁、读写锁、各种锁及其Java实现!

网上关于Java中锁的话题可以说资料相当丰富,但相关内容总感觉是一大串术语的罗列,让人云里雾里,读完就忘.本文希望能为Java新人做一篇通俗易懂的整合,旨在消除对各种各样锁的术语的恐惧感,对每种锁的底层实现浅尝辄止,但是在需要时能够知道去查什么. 首先要打消一种想法,就是一个锁只能属于一种分类.其实并不是这样,比如一个锁可以同时是悲观锁.可重入锁.公平锁.可中断锁等等,就像一个人可以是男人.医生.健身爱好者.游戏玩家,这并不矛盾.OK,国际惯例,上干货. 〇.synchronized与Lock

写文章 通俗易懂 悲观锁、乐观锁、可重入锁、自旋锁、偏向锁、轻量/重量级锁、读写锁、各种锁及其Java实现!

网上关于Java中锁的话题可以说资料相当丰富,但相关内容总感觉是一大串术语的罗列,让人云里雾里,读完就忘.本文希望能为Java新人做一篇通俗易懂的整合,旨在消除对各种各样锁的术语的恐惧感,对每种锁的底层实现浅尝辄止,但是在需要时能够知道去查什么. 首先要打消一种想法,就是一个锁只能属于一种分类.其实并不是这样,比如一个锁可以同时是悲观锁.可重入锁.公平锁.可中断锁等等,就像一个人可以是男人.医生.健身爱好者.游戏玩家,这并不矛盾.OK,国际惯例,上干货. 〇.synchronized与Lock

Redis实现分布式锁与Zookeeper实现分布式锁区别

Redis实现分布式锁与Zookeeper实现分布式锁区别 **前言: 在学习过程中,简单的整理了一些redis跟zookeeper实现分布式锁的区别,有需要改正跟补充的地方,希望各位大佬及时指出**Redis实现分布式锁思路 基于Redis实现分布式锁(setnx)setnx也可以存入key,如果存入key成功返回1,如果存入的key已经存在了,返回0. Zookeeper实现分布式锁思路 基于Zookeeper实现分布式锁 Zookeeper是一个分布式协调工具,在分布式解决方案中. 多个客

基于Redis实现分布式锁-Redisson使用及源码分析

在分布式场景下,有很多种情况都需要实现最终一致性.在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等).通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性. 本文主要探讨另外一种实现分布式最终一致性的解决方案--采用分布式锁.基于分布式锁的解决

06 锁:可重入锁 公平锁 读写锁

1.可重入锁 如果锁具备可重入性,则称作为可重入锁. 像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制: 基于线程的分配,而不是基于方法调用的分配. 举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2, 此时线程不必重新去申请锁,而是可以直接执行方法method2. class MyClass { public synch

可重入锁(good)

可重入锁,也叫做递归锁,是指在一个线程中可以多次获取同一把锁,比如:一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法[即可重入],而无需重新获得锁:https://www.zhihu.com/question/23284564/answer/29633571 对于不同线程则相当于普通的互斥锁. 在JAVA环境下 ReentrantLock 和synchronized 都是 可重入锁 最大的作用是避免死锁.在很多情况下线程需要多次进入锁内执行任务

java中可重入锁和自旋锁

1.可重入锁: 也称为递归锁,当外层函数获得该锁之后,内层递归函数仍有获取该锁的代码,结果不受影响: java中的synchronized  ReentrantLock都是可重的 举例: public class Test implements Runnable{ public synchronized void get(){ System.out.println(Thread.currentThread().getId()); set(); } public synchronized void