curator 实现分布式一致性锁

最近准备在项目中引入分布式锁,故而研究基于zookeeper的curator框架。

网上资料不多,自己研究其源码发现,这个框架已经帮我做了很多现成的实现。

下面介绍下锁的实现:

通过源码中LockingExample例子作为切入(推荐多利用现有资源,最快切入),为减小篇幅,代码仅保留关键部分。

curator已经为我们准备好了锁的实现 ----InterProcessMutex,基于zookeeper跨jvm的公平互斥锁实现.

-----------------------------------------------------------------------------------------------------------------------------

1.看下锁的定义,将线程对象和锁对象(线程、路径、锁的数量)关联

    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    private static class LockData
    {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

2.获得锁:

  • 如果线程已经有锁,则增加锁的数量,返回  String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  • 否则尝试获取锁,获得则加入线程持有锁的MAP,否则返回未获得锁。
    private final LockInternals internals;    

    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }
    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        Thread currentThread = Thread.currentThread();

        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }

3.可以看到,逻辑是基于LockInternals的,来看一下他是怎么做到的

  • 通过dirver在锁目录下创建EPHEMERAL_SEQUENTIAL节点
  • 循环尝试获取
  1. 获取锁目录下子节点的有序集合
  2. 通过dirver尝试得到PredicateResults(含有是否得到锁及需要监视的目录)
  3. 更新监听信息,开始下一轮尝试
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {while ( !isDone )
        {
            isDone = true;

            try
            {
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
        }
    }

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        }
                    }
                }
            }
        }return haveTheLock;
    }

4.分析下前文中的driver实现,即StandardLockInternalsDriver 关键代码

  • 创建锁是基于目录下创建的EPHEMERAL_SEQUENTIAL节点,即与客户端生命周期相同,并且名字后自动加创建的序列号
  • 得到PredicateResult,如果当前节点为最小节点,则得到锁,getsTheLock为true,否则得到该序列的前一个节点,设为pathToWatch,监控之
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);

        boolean         getsTheLock = ourIndex < maxLeases;
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

先发布,后续有心得再续。

时间: 2024-11-05 23:35:29

curator 实现分布式一致性锁的相关文章

剖析curator的分布式互斥锁原理

1 前言 最近在做项目的时候,遇到一个多线程访问申请一个资源的问题.需要每个线程都能够最终在有效的时间内申请到或者超时失败.以前一般这种方式用的是redis做枷锁机制,每个线程都去redis加一把共同的锁,如果枷锁成功,则执行资源申请操作.而没有枷锁成功的线程,则在有效时间内循环尝试去枷锁,并且每次木有加锁成功,则就Thread.sleep()一会. 通过这种方式可以看出,这里对于sleep的时间间隔要求设置很严格,如果太小,则就会增加大规模的redis请求操作:而如果太长,当资源可用的时候,但

《从PAXOS到ZOOKEEPER分布式一致性原理与实践》pdf

下载地址:网盘下载 内容简介  · · · · · · <Paxos到Zookeeper:分布式一致性原理与实践>从分布式一致性的理论出发,向读者简要介绍几种典型的分布式一致性协议,以及解决分布式一致性问题的思路,其中重点讲解了Paxos和ZAB协议.同时,本书深入介绍了分布式一致性问题的工业解决方案--ZooKeeper,并着重向读者展示这一分布式协调框架的使用方法.内部实现及运维技巧,旨在帮助读者全面了解ZooKeeper,并更好地使用和运维ZooKeeper.全书共8章,分为五部分:第一

《从Paxos到ZooKeeper 分布式一致性原理与实践》读书笔记

一.分布式架构 1.分布式特点 分布性 对等性.分布式系统中的所有计算机节点都是对等的 并发性.多个节点并发的操作一些共享的资源 缺乏全局时钟.节点之间通过消息传递进行通信和协调,因为缺乏全局时钟,很难定义两个事件谁先谁后 故障总是会发生.系统设计时,需要考虑到任何异常情况 2.分布式环境的各种问题 通信异常.分布式系统中的某些节点之间无法正常通信 网络分区.这有部分节点可以正常通信,有些无法正常通信.这种现象称为网络分区,也称为"脑裂" 三态.节点之间的一次通信存在三种状态:成功.失

【转载】Paxos以及分布式一致性的学习

开始搜出来这篇文章(link),发现不知所云,先忽略. 然后搜出来这篇文章(link),说是偏向工程实现,建议先看维基(link),但是维基打不开. 所以还是先看知乎的这篇文章吧(https://www.zhihu.com/question/19787937/answer/82340987) Lamport用两段话就描述清楚了它的流程,他老人家也说paxos其实是个简单的算法.但是是我在工程领域见过最为精妙的算法. 分布式一致性是个有趣的领域,而Paxos和类似的协议对这个问题的重要性不喻,在过

分布式 一致性Paxos算法(转载)

文章1比较通俗易懂,可以入门,转载地址是http://www.cnblogs.com/linbingdong/p/6253479.html Paxos算法在分布式领域具有非常重要的地位.但是Paxos算法有两个比较明显的缺点:1.难以理解 2.工程实现更难. 网上有很多讲解Paxos算法的文章,但是质量参差不齐.看了很多关于Paxos的资料后发现,学习Paxos最好的资料是论文<Paxos Made Simple>,其次是中.英文版维基百科对Paxos的介绍.本文试图带大家一步步揭开Paxos

分布式一致性算法——paxos

一.什么是paxos算法 Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决议)达成一致的问题. 人们在理解paxos算法是会遇到一些困境,那么接下来,我们带着以下几个问题来学习paxos算法: 1.paxos到底在解决什么问题? 2.paxos到底如何在分布式存储系统中应用? 3.paxos的核心思想是什么? 二.paxos解决了什么问题 分布式的一致性问题其实主要是指分布式系统中的数据一致性问题.所以,为了保证分布式系统的一致性,就要保证分布式系统中的数据是一致的. 在

分布式一致性协议

分布式一致性协议 Amir H. Payberah  <Distributed Systems Consensus> [email protected] Amirkabir University of Technology 问题是什么? 为保证分布式系统的高可靠和高可用性,数据在系统中一般存储多个副本.当某个副本所在的节点出现故障时,分布式系统能够自动将服务切换到其他的副本,从而实现自动容错.同一份数据的多个副本中往往有一个副本为主副本,其他为备副本.从一份数据的角度讲,主副本所在的节点为主节

&lt;从PAXOS到ZOOKEEPER分布式一致性原理与实践&gt;读书笔记-ZAB协议

本文属于分布式系统学习笔记系列,上一篇笔记整理了paxos算法,本文属于原书第四章,梳理zookeeper的目标特性及ZAB协议. 1.介绍zookeeper 1.1ZooKeeper保证一致性特性 ZooKeeper是一个典型的分布式数据一致性的解决方案,分布式程序可以基于它实现诸如数据发布/订阅.负载均衡.命名服务.分布式协调通知.集群管理.master选举.分布式锁.分布式队列等功能.ZooKeeper可以保证如下分布式一致性特性. 1.顺序一致性: 从同一个客户端发起的事务请求,最终将严

读&lt;分布式一致性原理&gt;初识zookeeper

zookeeper是什么 zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如:数据发布/订阅,负载均衡,命名服务,分布式协调/通知 ,集群管理,Master选举,分布式锁和分布式队列等功能.zookeeper可以保证如下分布式一致性特性. 顺序一致性 从同一个客户端发起的事务请求,最终将会严格的按照发起顺序被应用到zookeeper中去. 原子性 所有的事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群所有机器都成功应用了某