基于Zookeeper实现的分布式互斥锁 - InterProcessMutex

CuratorZooKeeper的一个客户端框架,其中封装了分布式互斥锁的实现,最为常用的是InterProcessMutex,本文将对其进行代码剖析

简介

InterProcessMutex基于Zookeeper实现了分布式的公平可重入互斥锁,类似于单个JVM进程内的ReentrantLock(fair=true)

构造函数

1234567891011121314151617
// 最常用public InterProcessMutex(CuratorFramework client, String path){    // Zookeeper利用path创建临时顺序节点,实现公平锁的核心    this(client, path, new StandardLockInternalsDriver());}

public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){    // maxLeases=1,表示可以获得分布式锁的线程数量(跨JVM)为1,即为互斥锁    this(client, path, LOCK_NAME, 1, driver);}

// protected构造函数InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver){    basePath = PathUtils.validatePath(path);    // internals的类型为LockInternals,InterProcessMutex将分布式锁的申请和释放操作委托给internals执行    internals = new LockInternals(client, driver, path, lockName, maxLeases);}

获取锁

InterProcessMutex.acquire

1234567891011
// 无限等待public void acquire() throws Exception{    if ( !internalLock(-1, null) ){        throw new IOException("Lost connection while trying to acquire lock: " + basePath);    }}

// 限时等待public boolean acquire(long time, TimeUnit unit) throws Exception{    return internalLock(time, unit);}

InterProcessMutex.internalLock

1234567891011121314151617181920
private boolean internalLock(long time, TimeUnit unit) throws Exception{    Thread currentThread = Thread.currentThread();    LockData lockData = threadData.get(currentThread);    if ( lockData != null ){        // 实现可重入        // 同一线程再次acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如果有则原子+1,然后返回        lockData.lockCount.incrementAndGet();        return true;    }

// 映射表内没有对应的锁信息,尝试通过LockInternals获取锁    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());    if ( lockPath != null ){        // 成功获取锁,记录信息到映射表        LockData newLockData = new LockData(currentThread, lockPath);        threadData.put(currentThread, newLockData);        return true;    }    return false;}
123
// 映射表// 记录线程与锁信息的映射关系private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
123456789101112
// 锁信息// Zookeeper中一个临时顺序节点对应一个“锁”,但让锁生效激活需要排队(公平锁),下面会继续分析private static class LockData{    final Thread owningThread;    final String lockPath;    final AtomicInteger lockCount = new AtomicInteger(1); // 分布式锁重入次数

private LockData(Thread owningThread, String lockPath){        this.owningThread = owningThread;        this.lockPath = lockPath;    }}

LockInternals.attemptLock

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
// 尝试获取锁,并返回锁对应的Zookeeper临时顺序节点的路径String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{    final long startMillis = System.currentTimeMillis();    // 无限等待时,millisToWait为null    final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;    // 创建ZNode节点时的数据内容,无关紧要,这里为null,采用默认值(IP地址)    final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;    // 当前已经重试次数,与CuratorFramework的重试策略有关    int retryCount = 0;

// 在Zookeeper中创建的临时顺序节点的路径,相当于一把待激活的分布式锁    // 激活条件:同级目录子节点,名称排序最小(排队,公平锁),后续继续分析    String ourPath = null;    // 是否已经持有分布式锁    boolean hasTheLock = false;    // 是否已经完成尝试获取分布式锁的操作    boolean isDone = false;

while ( !isDone ){        isDone = true;        try{            // 从InterProcessMutex的构造函数可知实际driver为StandardLockInternalsDriver的实例            // 在Zookeeper中创建临时顺序节点            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);            // 循环等待来激活分布式锁,实现锁的公平性,后续继续分析            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);        } catch ( KeeperException.NoNodeException e ) {            // 容错处理,不影响主逻辑的理解,可跳过            // 因为会话过期等原因,StandardLockInternalsDriver因为无法找到创建的临时顺序节点而抛出NoNodeException异常            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,                    System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){                // 满足重试策略尝试重新获取锁                isDone = false;            } else {                // 不满足重试策略则继续抛出NoNodeException                throw e;            }        }    }    if ( hasTheLock ){        // 成功获得分布式锁,返回临时顺序节点的路径,上层将其封装成锁信息记录在映射表,方便锁重入        return ourPath;    }    // 获取分布式锁失败,返回null    return null;}
1234567891011121314151617181920
// From StandardLockInternalsDriver// 在Zookeeper中创建临时顺序节点public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{    String ourPath;    // lockNodeBytes不为null则作为数据节点内容,否则采用默认内容(IP地址)    if ( lockNodeBytes != null ){        // 下面对CuratorFramework的一些细节做解释,不影响对分布式锁主逻辑的解释,可跳过        // creatingParentContainersIfNeeded:用于创建父节点,如果不支持CreateMode.CONTAINER        // 那么将采用CreateMode.PERSISTENT        // withProtection:临时子节点会添加GUID前缀        ourPath = client.create().creatingParentContainersIfNeeded()            // CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点,Zookeeper能保证在节点产生的顺序性            // 依据顺序来激活分布式锁,从而也实现了分布式锁的公平性,后续继续分析            .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);    } else {        ourPath = client.create().creatingParentContainersIfNeeded()            .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);    }    return ourPath;}

LockInternals.internalLockLoop

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
// 循环等待来激活分布式锁,实现锁的公平性private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {    // 是否已经持有分布式锁    boolean haveTheLock = false;    // 是否需要删除子节点    boolean doDelete = false;    try {        if (revocable.get() != null) {            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);        }

while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {            // 获取排序后的子节点列表            List<String> children = getSortedChildren();            // 获取前面自己创建的临时顺序子节点的名称            String sequenceNodeName = ourPath.substring(basePath.length() + 1);            // 实现锁的公平性的核心逻辑,看下面的分析            PredicateResults predicateResults = driver.getsTheLock(client,                                                        children , sequenceNodeName , maxLeases);            if (predicateResults.getsTheLock()) {                // 获得了锁,中断循环,继续返回上层                haveTheLock = true;            } else {                // 没有获得到锁,监听上一临时顺序节点                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();                synchronized (this) {                    try {                        // exists()会导致导致资源泄漏,因此exists()可以监听不存在的ZNode,因此采用getData()                        // 上一临时顺序节点如果被删除,会唤醒当前线程继续竞争锁,正常情况下能直接获得锁,因为锁是公平的                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);                        if (millisToWait != null) {                            millisToWait -= (System.currentTimeMillis() - startMillis);                            startMillis = System.currentTimeMillis();                            if (millisToWait <= 0) {                                doDelete = true; // 获取锁超时,标记删除之前创建的临时顺序节点                                break;                            }                            wait(millisToWait); // 等待被唤醒,限时等待                        } else {                            wait(); // 等待被唤醒,无限等待                        }                    } catch (KeeperException.NoNodeException e) {                    // 容错处理,逻辑稍微有点绕,可跳过,不影响主逻辑的理解                    // client.getData()可能调用时抛出NoNodeException,原因可能是锁被释放或会话过期(连接丢失)等                    // 这里并没有做任何处理,因为外层是while循环,再次执行driver.getsTheLock时会调用validateOurIndex                    // 此时会抛出NoNodeException,从而进入下面的catch和finally逻辑,重新抛出上层尝试重试获取锁并删除临时顺序节点                    }                }            }        }    } catch (Exception e) {        ThreadUtils.checkInterrupted(e);        // 标记删除,在finally删除之前创建的临时顺序节点(后台不断尝试)        doDelete = true;        // 重新抛出,尝试重新获取锁        throw e;    } finally {        if (doDelete) {            deleteOurPath(ourPath);        }    }    return haveTheLock;}
1234567891011121314151617181920212223242526
// From StandardLockInternalsDriverpublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception{    // 之前创建的临时顺序节点在排序后的子节点列表中的索引    int ourIndex = children.indexOf(sequenceNodeName);    // 校验之前创建的临时顺序节点是否有效    validateOurIndex(sequenceNodeName, ourIndex);    // 锁公平性的核心逻辑    // 由InterProcessMutex的构造函数可知,maxLeases为1,即只有ourIndex为0时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁    // Zookeeper的临时顺序节点特性能保证跨多个JVM的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁    boolean getsTheLock = ourIndex < maxLeases;    // 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex-1)    // 因为锁是公平的,因此无需监听除了(ourIndex-1)以外的所有节点,这是为了减少羊群效应,非常巧妙的设计!!    String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);    // 返回获取锁的结果,交由上层继续处理(添加监听等操作)    return new PredicateResults(pathToWatch, getsTheLock);}

static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException{    if ( ourIndex < 0 ){        // 容错处理,可跳过        // 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被Zookeeper服务端删除,往外抛出NoNodeException        // 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点        // 佩服Curator的作者将边界条件考虑得如此周到!        throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);    }}
12345678910
// From LockInternalsprivate final Watcher watcher = new Watcher(){    @Override    public void process(WatchedEvent event){        notifyFromWatcher();    }};private synchronized void notifyFromWatcher(){   notifyAll(); // 唤醒所有等待LockInternals实例的线程}
12345678910
// From LockInternalsprivate void deleteOurPath(String ourPath) throws Exception{    try{        // 后台不断尝试删除        client.delete().guaranteed().forPath(ourPath);    } catch ( KeeperException.NoNodeException e ) {        // 已经删除(可能会话过期导致),不做处理        // 实际使用Curator-2.12.0时,并不会抛出该异常    }}

释放锁

弄明白了获取锁的原理,释放锁的逻辑就很清晰了

InterProcessMutex.release

12345678910111213141516171819202122232425
public void release() throws Exception{    Thread currentThread = Thread.currentThread();    LockData lockData = threadData.get(currentThread);    if ( lockData == null ){        // 无法从映射表中获取锁信息,不持有锁        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);    }

int newLockCount = lockData.lockCount.decrementAndGet();    if ( newLockCount > 0 ){        // 锁是可重入的,初始值为1,原子-1到0,锁才释放        return;    }    if ( newLockCount < 0 ){        // 理论上无法执行该路径        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);    }    try{        // lockData != null && newLockCount == 0,释放锁资源        internals.releaseLock(lockData.lockPath);    } finally {        // 最后从映射表中移除当前线程的锁信息        threadData.remove(currentThread);    }}

LockInternals.releaseLock

12345
void releaseLock(String lockPath) throws Exception{   revocable.set(null);   // 删除临时顺序节点,只会触发后一顺序节点去获取锁,理论上不存在竞争,只排队,非抢占,公平锁,先到先得   deleteOurPath(lockPath);}
12345678910
// Class:LockInternalsprivate void deleteOurPath(String ourPath) throws Exception{    try{        // 后台不断尝试删除        client.delete().guaranteed().forPath(ourPath);    } catch ( KeeperException.NoNodeException e ) {        // 已经删除(可能会话过期导致),不做处理        // 实际使用Curator-2.12.0时,并不会抛出该异常    }}

总结

InterProcessMutex的特性

  1. 分布式锁(基于Zookeeper
  2. 互斥锁
  3. 公平锁(监听上一临时顺序节点 + wait() / notifyAll()
  4. 可重入

原文地址:https://www.cnblogs.com/a-du/p/9876314.html

时间: 2024-08-09 23:11:39

基于Zookeeper实现的分布式互斥锁 - InterProcessMutex的相关文章

基于(Redis | Memcache)实现分布式互斥锁

设计一个缓存系统,不得不要考虑的问题就是:缓存穿透.缓存击穿与失效时的雪崩效应. 缓存击穿 缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义.在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞. 解决方案 有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap

剖析curator的分布式互斥锁原理

1 前言 最近在做项目的时候,遇到一个多线程访问申请一个资源的问题.需要每个线程都能够最终在有效的时间内申请到或者超时失败.以前一般这种方式用的是redis做枷锁机制,每个线程都去redis加一把共同的锁,如果枷锁成功,则执行资源申请操作.而没有枷锁成功的线程,则在有效时间内循环尝试去枷锁,并且每次木有加锁成功,则就Thread.sleep()一会. 通过这种方式可以看出,这里对于sleep的时间间隔要求设置很严格,如果太小,则就会增加大规模的redis请求操作:而如果太长,当资源可用的时候,但

借助共享缓存redis实现分布式互斥锁

新开发的系统需要控制每个时刻回收缓存的GC线程有且只有一个在运行,如果有多个线程同时运行,会造成系统崩溃.如果只有一个JVM进程那么很好办,简单的借助synchronized关键字就行了.可是我的系统要部署在多台服务器,每台服务器上部署多个实例上.而synchronized仅仅在单进程里有用. 考虑借助共享数据源redis实现功能. redis提供一个方法,SETNX key value.将 key 的值设为 value ,当且仅当 key 不存在.若给定的 key 已经存在,则 SETNX 不

基于zookeeper简单实现分布式锁

这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制.来简单实现分布式锁. 主要思想: 1.开启10个线程.在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点. 2.获取disLocks节点下全部子节点,排序,假设自己的节点编号最小,则获取锁: 3.否则watch排在自己前面的节点,监听到其删除后,进入第2步(又一次检測排序是防止监听的节点发生连接失效.导致的节点删除情况): 4.删除自身sub节点,释放连接: 这

spring boot 定时任务基于zookeeper的分布式锁实现

基于ZooKeeper分布式锁的流程 在zookeeper指定节点(locks)下创建临时顺序节点node_n 获取locks下所有子节点children 对子节点按节点自增序号从小到大排序 判断本节点是不是第一个子节点,若是,则获取锁:若不是,则监听比该节点小的那个节点的删除事件 若监听事件生效,则回到第二步重新进行判断,直到获取到锁 具体实现 添加Maven依赖: <?xml version="1.0" encoding="UTF-8"?> <

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

基于zookeeper分布式全局序列分布式锁详解

InterProcessMutex 类详解步骤:获取锁的过程步骤: 1.acquire方法,根据当前线程获取锁对象,判断当前的线程是否已经获取锁,此处则代表可重入:2.获取锁方法,String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());3.当获取到锁时,则把锁数据放入内存对象private final ConcurrentMap<Thread, LockData>   threadData = Maps.n

分布式锁与实现(二)——基于ZooKeeper实现

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等. ZooKeeper的架构通过冗余服务实现高可用性.因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机.ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构.客户端可以在节点读写,

分布式锁的实现【基于ZooKeeper】

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等. ZooKeeper的架构通过冗余服务实现高可用性.因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机.ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构.客户端可以在节点读写,