ZooKeeper学习(二)ZooKeeper实现分布式锁

一、简介

  在日常开发过程中,大型的项目一般都会采用分布式架构,那么在分布式架构中若需要同时对一个变量进行操作时,可以采用分布式锁来解决变量访问冲突的问题,最典型的案例就是防止库存超卖,当然还有其他很多的控制方式,这篇文章我们讨论一下怎么使用ZooKeeper来实现分布式锁。

二、Curator

  前面提到的分布式锁,在ZooKeeper中可以通过Curator来实现。

  定义:Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。

三、尝试写一个分布式锁

开发思路

  在第一篇文章中,了解了ZooKeeper节点的概念,实现分布式锁的基本思路也是基于对节点的监听与操作从而实现的。

  • 1、创建一个父节点,并对父节点设置监听事件,实际加锁的对象为父节点下的子节点。
  • 2、若父节点下存在临时子节点,则获取锁失败,不存在子节点时,则各个线程可尝试争夺锁。
  • 3、业务逻辑执行完毕后会删除临时子节点,此时下一个进程进入时发现没有存在子节点,则创建子节点并获取锁

四、写一个工具类

pom.xml

<!--Zookeeper实现分布式锁的工具curator start -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.8.0</version>
</dependency>
<!--Zookeeper实现分布式锁的工具curator end -->

application.yml

#zookeeper分布式锁curator服务配置
curator:
  retryCount: 5 #重试次数
  elapsedTimeMs: 5000  #重试间隔时间
  connectString: 127.0.0.1:2181   # zookeeper 地址
  sessionTimeoutMs: 60000  # session超时时间
  connectionTimeoutMs: 5000  # 连接超时时间

配置类

/**
 * ZK的属性
 */
@Data
@Component
@ConfigurationProperties(prefix = "curator")//获取application.yml配置的值
public class ZkProperties {

    private int retryCount;//重试次数

    private int elapsedTimeMs;//重试间隔时间

    private String connectString;//zookeeper 地址

    private int sessionTimeoutMs;//session超时时间

    private int connectionTimeoutMs;//连接超时时间
}
/**
 * ZK的属性配置
 */
@Configuration//标识这是一个配置类
public class ZkConfiguration {

    @Autowired
    ZkProperties zkProperties;

    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        return CuratorFrameworkFactory.newClient(
                zkProperties.getConnectString(),
                zkProperties.getSessionTimeoutMs(),
                zkProperties.getConnectionTimeoutMs(),
                new RetryNTimes(zkProperties.getRetryCount(), zkProperties.getElapsedTimeMs()));
    }
}

分布式锁工具

/**
 * 分布式锁工具类
 * 【类解析】
 * 1、InitializingBean接口为bean提供了初始化方法的方式。
 * 2、它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。
 * 【锁原理】
 * 1、创建一个父节点,并对父节点设置监听事件,实际加锁的对象为父节点下的子节点。
 * 2、若父节点下存在临时子节点,则获取锁失败。不存在子节点时,则各个线程可尝试争夺锁。
 * 3、业务逻辑执行完毕后会删除临时子节点,此时下一个进程进入时发现没有存在子节点,则创建子节点并获取锁
 */
@Slf4j
@Service
public class DistributedLockByZookeeperUtil implements InitializingBean {

    private final static String ROOT_PATH_LOCK = "rootlock";//父节点路径
    private CountDownLatch countDownLatch = new CountDownLatch(1);//节点计数器

    @Autowired
    private CuratorFramework curatorFramework;

    /**
     * 获取分布式锁
     */
    public void acquireDistributedLock(String path) {
        String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        //1、一直循环等待获取锁
        while (true) {
            try {
                //2、尝试创建子节点,若子节点已经存在,则创建异常,并进入catch块代码
                curatorFramework
                        .create()//创建节点
                        .creatingParentsIfNeeded()//如果父节点不存在,则在创建节点的同时创建父节点
                        .withMode(CreateMode.EPHEMERAL)//【临时节点】创建后,会话结束节点会自动删除
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//【接入权限】任何链接都可以操作该节点
                        .forPath(keyPath);//对应的操作路径
                log.info("获取分布式锁成功!路径为:{}", keyPath);
                break;
            } catch (Exception e) {
                //3、创建子节点失败时,即获取锁失败
                log.info("获取分布式锁失败!路径为:{}", keyPath);
                log.info("等待重新获取锁.......");
                try {
                    if (countDownLatch.getCount() <= 0) {
                        countDownLatch = new CountDownLatch(1);//重置计数器
                    }
                    //4、从新挂起当前线程,调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
                    countDownLatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    /**
     * 释放分布式锁
     */
    public boolean releaseDistributedLock(String path) {
        try {
            String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            //1、查看当前节点是否已经存在
            if (curatorFramework.checkExists().forPath(keyPath) != null) {
                //2、若子节点存在,则删除子节点,即释放锁
                curatorFramework.delete().forPath(keyPath);
            }
        } catch (Exception e) {
            log.error("释放分布式锁错误!");
            return false;
        }
        return true;
    }

    /**
     * 创建 watcher 事件
     */
    private void addWatcher(String path) throws Exception {
        String keyPath;
        if (path.equals(ROOT_PATH_LOCK)) {
            keyPath = "/" + path;
        } else {
            keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
        }
        //1、创建子节点监听事件
        final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
        //2、设置监听器初始化模式:异步初始化。初始化后会触发事件。
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        //3、创建监听事件
        cache.getListenable().addListener((client, event) -> {
            //4、当发生子节点移除事件时,进入if内逻辑
            if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                String oldPath = event.getData().getPath();
                log.info("上一个节点 " + oldPath + " 已经被断开");
                //5、移除的节点为监听节点的子节点时,即路径包含父节点时,进入if内逻辑
                if (oldPath.contains(path)) {
                    //6、释放计数器,让当前的请求获取锁
                    countDownLatch.countDown();
                }
            }
        });
    }

    /**
     * 创建父节点,并创建永久节点
     * PS:在所有的属性被初始化后调用此方法,创建父节点
     */
    @Override
    public void afterPropertiesSet() {
        //1、指定命名空间
        curatorFramework = curatorFramework.usingNamespace("lock-namespace");
        //2、下面代码逻辑的父节点路径
        String path = "/" + ROOT_PATH_LOCK;
        try {
            //3、父节点不存在时,创建父节点
            if (curatorFramework.checkExists().forPath(path) == null) {
                curatorFramework
                        .create()//创建节点
                        .creatingParentsIfNeeded()//如果父节点不存在,则在创建节点的同时创建父节点
                        .withMode(CreateMode.PERSISTENT)//【持久化节点】客户端与zookeeper断开连接后,该节点依旧存在
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//【接入权限】任何链接都可以操作该节点
                        .forPath(path);//对应的操作路径
            }
            //4、添加对父节点的监听事件
            addWatcher(ROOT_PATH_LOCK);
            log.info("root path 的 watcher 事件创建成功");
        } catch (Exception e) {
            log.error("连接zookeeper失败,请查看日志 >> {}", e.getMessage(), e);
        }
    }
}

测试类

/**
 * 测试类
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private DistributedLockByZookeeperUtil distributedLockByZookeeper;//分布式锁工具类
    @Autowired
    private IUserInfoService iUserInfoService;//业务类

    private final static String PATH = "test";//子节点对应路径(PS:在锁工具里面会拼接完整路径)

    @GetMapping("/doSomeThings")
    public boolean doSomeThings() {
        /*1、获取锁*/
        Boolean flag;//是否已经释放锁 释放成功:true , 释放失败:false
        distributedLockByZookeeper.acquireDistributedLock(PATH);//获取锁
        /*2、业务代码块*/
        try {
            iUserInfoService.update();
            UserInfoVO vo = iUserInfoService.querySingleVO(1);
            System.out.println("剩余库存为:" + vo.getCreateStaff());
        } catch (Exception e) {
            e.printStackTrace();
            //业务代码报错时及时释放锁
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
        }
        /*3、释放锁*/
        flag = distributedLockByZookeeper.releaseDistributedLock(PATH);//执行成功释放锁
        return flag;
    }

}

五、压测方法

  压测的方法有很多,我使用的是Jmeter来进行并发调用测试类代码,测试结果分布式锁有效,这里不再写压测过程,感兴趣的亲可以看下文末的文章推荐。

参考文章:

原文地址:https://www.cnblogs.com/riches/p/12329257.html

时间: 2024-11-08 05:20:47

ZooKeeper学习(二)ZooKeeper实现分布式锁的相关文章

zookeeper学习实践1-实现分布式锁

引言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等. ZooKeeper的架构通过冗余服务实现高可用性.因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机.ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构.客户端可以在节点读写,

关于分布式锁原理的一些学习与思考-redis分布式锁,zookeeper分布式锁

首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在jdk java.util 并发包中已经为我们提供了这些方法去加锁, 比如synchronized 关键字 或者Lock 锁,都可以处理. 但是我们现在的应用程序如果只部署一台服务器,那并发量是很差的,如果同时有上万的请求那么很有可能造成服务器压力过大,而瘫痪. 想想双十一 和 三十晚上十点分支付宝红

kafka学习(二)-zookeeper集群搭建

zookeeper概念 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名 服务等.Zookeeper是hadoop的一个子项目,其发展历程无需赘述.在分布式应用中,由于工程师不能很好地使用锁机制,以及基于消息的协调机制 不适合在某些应用中使用,因此需要有一种可靠的.可扩展的.分布式的.可配置的协调机制来统一系统的状态.Zookeeper的目的就在于此. 1.角色 Zookeeper中的角色主要有以下三

zookeeper应用场景练习(分布式锁)

在平常的高并发的程序中,为了保证数据的一致性,因此都会用到锁,来对当前的线程进行锁定.在单机操作中,很好做到,比如可以采用Synchronized.Lock或者其他的读写多来锁定当前的线程.但是在分布式的系统中,就很难做到这一点.因此可以采用zookeeper中节点的特性来满足这一点.大致实现的思路如下. 1.每个客户端都去zookeeper上创建临时的顺序节点 2.客户端判断当前自己创建的节点是不是最小的 3.如果是的话,就获得了执行当前任务的锁 4.如果不是的话,就找到比自己小的节点,然后进

【Zookeeper学习】Zookeeper安装部署

[时间]2014年11月19日 [平台]Centos 6.5 [工具] [软件]jdk-7u67-linux-x64.rpm zookeeper-3.4.6.tar.gz [步骤] 1. 准备条件 (1)集群规划 主机类型 IP地址 域名 zookeeper1 192.168.50.21 zookeeper1.hadoop.com zookeeper2 192.168.50.22 zookeeper2.hadoop.com zookeeper3 192.168.50.23 zookeeper3.

分布式锁实现大型连续剧之(二):Zookeeper

前言 紧跟上文的:分布式锁实现(一):Redis ,这篇我们用Zookeeper来设计和实现分布式锁,并且研究下开源客户端工具Curator的分布式锁源码 设计实现 一.基本算法 1.在某父节点下创建临时有序节点2.判断创建的节点是否是当前父节点下所有子节点中序号最小的3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁4.解锁的时候删除当前节点 二.关键点 临时有序节点实现Zookeeper分布式锁关键就在于其[临时有序节点]的特

分布式锁2 Java非常用技术方案探讨之ZooKeeper

前言:       由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题,那么就要利用分布式锁来解决这些问题.以自己结合实际工作中的一些经验和网上看到的一些资料,做一个讲解和总结.之前我已经写了一篇关于分布式锁的文章: 分布式锁1 Java常用技术方案 .上一篇文章中主要写的是在日常项目中,较为常见的几种实现分布式锁的方法.通过这些方法,基本上可以解决我们日常工作中大部分场景下使用分布式锁的问题.       本篇文章主要是在上一篇文章的基础上,介绍一些虽

[转载] zookeeper 分布式锁服务

转载自http://www.cnblogs.com/shanyou/archive/2012/09/22/2697818.html 分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那一层来挡.当大量的行锁.表锁.事务充斥着数据库的时候.一般web应用很多的瓶颈都在数据库上,这里给大家介绍的是减轻数据库锁负担的一种方案,使用zookeeper分布式锁服务. zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hiv

Curator实现zookeeper分布式锁的基本原理

一.写在前面 之前写过一篇文章(<拜托,面试请不要再问我Redis分布式锁的实现原理>),给大家说了一下Redisson这个开源框架是如何实现Redis分布式锁原理的,这篇文章再给大家聊一下ZooKeeper实现分布式锁的原理. 同理,我是直接基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现. 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省事儿的方式. 二.ZooKeeper分布

彻底讲清楚ZooKeeper分布式锁的实现原理

一.写在前面 之前写过一篇文章(<拜托,面试请不要再问我Redis分布式锁的实现原理>),给大家说了一下Redisson这个开源框架是如何实现Redis分布式锁原理的,这篇文章再给大家聊一下ZooKeeper实现分布式锁的原理. 同理,我是直接基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现. 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省事儿的方式. 二.ZooKeeper分布