分布式锁实现大型连续剧之(二):Zookeeper

前言

紧跟上文的:分布式锁实现(一):Redis ,这篇我们用Zookeeper来设计和实现分布式锁,并且研究下开源客户端工具Curator的分布式锁源码

设计实现

一、基本算法

1.在某父节点下创建临时有序节点
2.判断创建的节点是否是当前父节点下所有子节点中序号最小的
3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁
4.解锁的时候删除当前节点

二、关键点

临时有序节点
实现Zookeeper分布式锁关键就在于其[临时有序节点]的特性,在Zookeeper中有四种节点
1.PERSISTENT 持久,若不手动删除就永久存在
2.PERSISTENT_SEQUENTIAL 持久有序节点,zookeeper会为节点编号(保证有序)
3.EPHEMERAL 临时,一个客户端会话断开后会自动删除
4.EPHEMERAL_SEQUENTIAL 临时有序节点,zookeeper会为节点编号(保证有序)
监听
Zookeeper提供事件监听机制,通过对节点、节点数据、子节点都提供了监听,我们通过这种监听watcher机制实现锁的等待
三、代码实现
我们基于ZkClient这个客户端来实现,当然也可以用原生Zookeeper API,大致是一样的
坐标如下:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>

代码如下:
public class MyDistributedLock {

private ZkClient zkClient;
private String name;
private String currentLockPath;
private CountDownLatch countDownLatch;

private static final String PARENT_LOCK_PATH = "/distribute_lock";

public MyDistributedLock(ZkClient zkClient, String name) {
    this.zkClient = zkClient;
    this.name = name;
}

//加锁
public void lock() {
    //判断父节点是否存在,不存在就创建
    if (!zkClient.exists(PARENT_LOCK_PATH)) {
        try {
            //多个线程只会成功建立一次
            zkClient.createPersistent(PARENT_LOCK_PATH);
        } catch (Exception ignored) {
        }
    }
    //创建当前目录下的临时有序节点
    currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());
    //校验是否最小节点
    checkMinNode(currentLockPath);
}

//解锁
public void unlock() {
    System.out.println("delete : " + currentLockPath);
    zkClient.delete(currentLockPath);
}

private boolean checkMinNode(String lockPath) {
    //获取当前目录下所有子节点
    List<String> children = zkClient.getChildren(PARENT_LOCK_PATH);
    Collections.sort(children);
    int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1));
    if (index == 0) {
        System.out.println(name + ":success");
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
        return true;
    } else {
        String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1);
        //等待前一个节点释放的监听
        waitForLock(waitPath);
        return false;
    }
}

private void waitForLock(String prev) {
    System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev);
    countDownLatch = new CountDownLatch(1);
    zkClient.subscribeDataChanges(prev, new IZkDataListener() {
        @Override
        public void handleDataChange(String s, Object o) throws Exception {

        }

        @Override
        public void handleDataDeleted(String s) throws Exception {
            System.out.println("prev node is done");
            checkMinNode(currentLockPath);
        }
    });
    if (!zkClient.exists(prev)) {
        return;
    }
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    countDownLatch = null;
}

}
加锁

zkClient.exists先判断父节点是否存在,不存在就创建,zookeeper可以保证只会创建成功一次

在当前目录下zkClient.createEphemeralSequential创建临时有序节点,再判断当前目录下此节点是否为序号最小的,如果是,成功获取锁,否则的话拿比自己小的节点,并做监听

waitForLock等待比自己小的节点,subscribeDataChanges监听一个节点的变化,handleDataDeleted里面再次做checkMinNode的判断

监听完毕后,再判断一次此节点是否存在,因为在监听的过程中有可能之前小的那个节点重新释放了锁,如果之前节点不存在的话,无需在这里等待,这里的等待是通过countDownLatch实现的

解锁
解锁就是通过zkClient的delete删除当前节点
测试用例
通过启动多个线程来测试lock、unlock的过程,查看是否有序
public class MyDistributedLockTest {

public static void main(String[] args) {

    ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000);

    for (int i = 0; i < 20; i++) {

        String name = "thread" + i;
        Thread thread = new Thread(() -> {
            MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name);
            myDistributedLock.lock();

// try {
// Thread.sleep(1 * 1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
myDistributedLock.unlock();
});
thread.start();
}

}

}
执行结果如下,多线程情况下lock/unlock和监听一切正常:
thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006
thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005
thread3:success
delete : /distribute_lock2/0000000000
thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004
thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003
thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008
thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007
thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000
thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001
thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002
delete : /distribute_lock2/0000000001
prev node is done
thread8:success
delete : /distribute_lock2/0000000002
prev node is done
thread4:success
delete : /distribute_lock2/0000000003
prev node is done
thread7:success
delete : /distribute_lock2/0000000004
prev node is done
thread2:success
delete : /distribute_lock2/0000000005
prev node is done
thread6:success
delete : /distribute_lock2/0000000006
prev node is done
thread1:success
delete : /distribute_lock2/0000000007
prev node is done
thread5:success
delete : /distribute_lock2/0000000008
prev node is done
thread9:success
delete : /distribute_lock2/0000000009

Curator源码分析

一、基本使用
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();
InterProcessMutex lock2 = new InterProcessMutex(client, "/test");

    try {
        lock.acquire();
        //业务
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lock.release();
    }

CuratorFrameworkFactory.newClient获取zookeeper的客户端,retryPolicy指定重试策略,开启客户端

Curator本身提供了多种锁的实现,这里我们以InterProcessMutex可重入锁为例, lock.acquire()方法获取锁,lock.release()来释放锁,acquire方法也提供了重载的等待时间参数

二、源码分析

加锁
acquire内部就直接internalLock方法,传了-1的等待时间
public void acquire() throws Exception {
if(!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
复制代码internalLock方法首先判断是否是重入锁,通过ConcurrentMap维护线程和一个原子计数器,非重入锁的话,再通过attemptLock去获取锁
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn‘t necessary
/

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

attemptLock在这里进行循环等待,createsTheLock方法去创建节点,internalLockLoop去判断当前节点是否是最小节点
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can‘t find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

createsTheLock就是调用curator封装的api去创建临时有序节点
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
internalLockLoop锁判断,内部就是driver.getsTheLock去判断是否是当前目录下最小节点,如果是的话,返回获取锁成功,否则的话对previousSequencePath进行监听,监听动作完成后再对等待时间进行重新判断

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}

        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
        {
            List<String>        children = getSortedChildren();
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() )
            {
                haveTheLock = true;
            }
            else
            {
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                synchronized(this)
                {
                    try
                    {
                        // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }

                            wait(millisToWait);
                        }
                        else
                        {
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

解锁
release代码相对来说比较简单,就是先判断map里面是否存在当前线程的锁计数,不存在抛出异常,存在的话,进行原子减一操作,releaseLock内部就是删除节点操作,小于0的时候,从map里面移除

public void release() throws Exception
{
/
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn‘t necessary
/

    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null )
    {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }

    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 )
    {
        return;
    }
    if ( newLockCount < 0 )
    {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try
    {
        internals.releaseLock(lockData.lockPath);
    }
    finally
    {
        threadData.remove(currentThread);
    }
}

总结:

分布式锁的实现目前主流比较常用的实现就是Redis和Zookeeper了,相比较自己的实现,Redission和Curator的设计实现更为优秀,也更值得我们借鉴和学习
千里之行,积于跬步;×××之船,成于罗盘,共勉。
欢迎大家在下面评论席发言

原文地址:http://blog.51cto.com/13932491/2164491

时间: 2024-11-11 02:54:24

分布式锁实现大型连续剧之(二):Zookeeper的相关文章

分布式锁实现大型连续剧之(一):Redis

前言: 单机环境下我们可以通过JAVA的Synchronized和Lock来实现进程内部的锁,但是随着分布式应用和集群环境的出现,系统资源的竞争从单进程多线程的竞争变成了多进程的竞争,这时候就需要分布式锁来保证.实现分布式锁现在主流的方式大致有以下三种 基于数据库的索引和行锁 基于Redis的单线程原子操作:setNX 基于Zookeeper的临时有序节点 这篇文章我们用Redis来实现,会基于现有的各种锁实现来分析,最后分享Redission的锁源码分析来看下分布式锁的开源实现 设计实现 加锁

[ZooKeeper.net] 3 ZooKeeper的分布式锁

基于ZooKeeper的分布式锁  源码分享:http://pan.baidu.com/s/1miQCDKk ZooKeeper 里实现分布式锁的基本逻辑: 1.zookeeper中创建一个根节点(Locks),用于后续各个客户端的锁操作. 2.想要获取锁的client都在Locks中创建一个自增序的子节点,每个client得到一个序号,如果自己的序号是最小的则获得锁. 3.如果没有得到锁,就监控排在自己前面的序号节点,并且设置默认时间,等待它的释放. 4.业务操作后释放锁,然后监控自己的节点的

SpringBoot电商项目实战 — Zookeeper的分布式锁实现

上一篇演示了基于Redis的Redisson分布式锁实现,那今天我要再来说说基于Zookeeper的分布式现实. Zookeeper分布式锁实现 要用Zookeeper实现分布式锁,我就不得不说说zookeeper的数据存储.首先zookeeper的核心保存结构是一个DataTree数据结构,其实内部是一个Map<String, DataNode> nodes的数据结构,其中key是path,DataNode才是真正保存数据的核心数据结构,DataNode核心字段包括byte data[]用于

死磕 java同步系列之zookeeper分布式锁

问题 (1)zookeeper如何实现分布式锁? (2)zookeeper分布式锁有哪些优点? (3)zookeeper分布式锁有哪些缺点? 简介 zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它可以为分布式应用提供一致性服务,它是Hadoop和Hbase的重要组件,同时也可以作为配置中心.注册中心运用在微服务体系中. 本章我们将介绍zookeeper如何实现分布式锁运用在分布式系统中. 基础知识 什么是znode? zooKeeper操作和维护的为一个个数据节点,称为 z

利用Zookeeper实现分布式锁及服务注册中心

原文:利用Zookeeper实现分布式锁及服务注册中心 对于Zookeeper的定义以及原理,网上已经有很多的优秀文章对其进行了详细的介绍,所以本文不再进行这方面的阐述. 本文主要介绍一些基本的准备工作以及zookeeper.net的使用. 本文源代码github地址:https://github.com/Mike-Zrw/ZookeeperHelper zookeeper下载地址:https://archive.apache.org/dist/zookeeper/ ZooInspector下载

Zookeep 分布式锁

什么是分布式锁 概述 为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度.而这个分布式协调技术的核心就是来实现这个分布式锁. 分布式锁应具备的条件 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行 高可用的获取锁与释放锁 高性能的获取锁与释放锁 具备可重入特性 具备锁失效机制 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败 分布式锁有哪些实现 Memcached:利用 Memcached 的 add 命令.此命令是原子性操作,只有在

浅谈分布式锁

分布式一致性问题 首先我们先来看一个小例子: 假设某商城有一个商品库存剩10个,用户A想要买6个,用户B想要买5个,在理想状态下,用户A先买走了6了,库存减少6个还剩4个,此时用户B应该无法购买5个,给出数量不足的提示:而在真实情况下,用户A和B同时获取到商品剩10个,A买走6个,在A更新库存之前,B又买走了5个,此时B更新库存,商品还剩5个,这就是典型的电商"秒杀"活动. 从上述例子不难看出,在高并发情况下,如果不做处理将会出现各种不可预知的后果.那么在这种高并发多线程的情况下,解决

分布式锁1 Java常用技术方案

前言:       由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题,那么就要利用分布式锁来解决这些问题.所以自己结合实际工作中的一些经验和网上看到的一些资料,做一个讲解和总结.希望这篇文章可以方便自己以后查阅,同时要是能帮助到他人那也是很好的. ===============================================================长长的分割线===================================

java 分布式锁方案

第一步,自身的业务场景: 在我日常做的项目中,目前涉及了以下这些业务场景: 场景一: 比如分配任务场景.在这个场景中,由于是公司的业务后台系统,主要是用于审核人员的审核工作,并发量并不是很高,而且任务的分配规则设计成了通过审核人员每次主动的请求拉取,然后服务端从任务池中随机的选取任务进行分配.这个场景看到这里你会觉得比较单一,但是实际的分配过程中,由于涉及到了按用户聚类的问题,所以要比我描述的复杂,但是这里为了说明问题,大家可以把问题简单化理解.那么在使用过程中,主要是为了避免同一个任务同时被两