zookeeper 实现分布式锁安全用法

  • 背景
  • ConnectionLoss 链接丢失
  • SessionExpired 会话过期
  • 绕开 zookeeper broker 进行状态通知
  • leader 选举与zkNode 断开
  • 做好幂等
  • 静态扩容、动态扩容
  • 背景

    分布式锁现在用的越来越多,通常用来协调多个并发任务。在一般的应用场景中存在一定的不安全用法,不安全用法会带来多个master在并行执行,业务或数据可能存在重复计算带来的副作用,在没有拿到lock的情况下扮演者master等诸如此类。

    要想准确的拿到分布式锁,并且准确的捕获在分布式情况下锁的动态转移状态,需要处理网络变化带来的连锁反应。比如常见的 session expire、connectionLoss,在设置lock状态的时候我们如何保证准确拿到lock。

    在设计任务的时候我们需要具有 stop point 的策略,这个策略是用来在感知到lock丢失后能够交付执行权的机制。但是是否需要这么严肃的处理这个问题还取决于业务场景,比如下游的任务已经做好幂等也无所谓重复计算。 但是在有些情况下确实需要严肃精准控制。

    ConnectionLoss 链接丢失

    先说第一个场景,connectionLoss事件,此事件表示提交的commit有可能执行成功也有可能执行失败,成功是指在zookeeper broker 中执行成功但是返回的时候tcp断开了,导致未能拿到返回的状态。失败是指根本就没有提交到zookeper broker中链接就断开了。

    所以在我们获取lock的时候需要做 connectionLoss 事件处理,我们看个例子。

    protected void runForMaster() {
    
            logger.info("master:run for master.");
    
            AsyncCallback.StringCallback createCallback =
                    (rc, path, ctx, name) -> {
                        switch (KeeperException.Code.get(rc)) {
                            case CONNECTIONLOSS:
                                checkMaster();//链接失效检查znode设置是否成功
                                return;
                            case OK:
                                isLeader = true;
                                logger.info("master:I‘m the leader serverId:" + serverId);
                                addMasterWatcher();//监控 master znode
                                this.takeLeadership();//执行leader权利
                                break;
                            case NODEEXISTS:
                                isLeader = false;
                                String serverId = this.getMasterServerId();
                                this.takeBackup(serverId);
                                break;
    
                        }
                    };
    
            zk.create(rootPath + "/master", serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL, createCallback, null);//创建master节点
        }
    
        /**
         * check master 循环检查
         */
        private void checkMaster() {
    
            AsyncCallback.DataCallback masterCheckCallback =
                    (rc, path, ctx, data[], stat) -> {
                        switch (KeeperException.Code.get(rc)) {
                            case CONNECTIONLOSS:
                                checkMaster();
                                return;
                            case NONODE:
                                runForMaster();
                                return;
                            default: {
                                String serverId = this.getMasterServerId();
                                isLeader = serverId.equals(this.serverId);
                                if (BooleanUtils.isNotTrue(isLeader)) {
                                    this.takeBackup(serverId);
                                } else {
                                    this.takeLeadership();
                                }
                            }
    
                            return;
                        }
                    };
    
            zk.getData(masterZnode, false, masterCheckCallback, null);
        }
    

    这里的master表示具有执行权,只有成功拿到master 角色才能履行master权利。

    runForMaster 方法一旦发现有connectionLoss就发起checkMaster进行检查,同时checkMaster方法中也进行connectinLoss检查,直到拿到明确的状态为止。在此时有可能有另外的节点获取到了master角色,那么当前节点就做好backup等待机会。

    我们需要捕获zookeeper所有的状态变化,要知道master什么时候失效做好申请准备,当自己是master时候会话失效需要释放master权利。

    /**
         * 监控 master znode 做 master/slave 切换
         */
        private void addMasterWatcher() {
    
            AsyncCallback.StatCallback addMasterWatcher = (rc, path, ctx, stat) -> {
                switch (KeeperException.Code.get(rc)) {
                    case CONNECTIONLOSS:
                        addMasterWatcher();
                        break;
                    case OK:
                        if (stat == null) {
                            runForMaster();//master 已经不存在
                        } else {
                            logger.info("master:watcher master znode ok.");
                        }
                        break;
                    case NONODE:
                        logger.info("master:master znode delete.");
                        runForMaster();
                        break;
                }
            };
    
            zk.exists(masterZnode, MasterExistsWatcher, addMasterWatcher, null);
        }

    通过zookeeper watcher 机制来进行状态监听,保持与网络、zookeeper状态变化联动。

    SessionExpired 会话过期

    我们在来看第二个问题,第一个问题是获取lock的时候如何保证一定可以准确拿到状态,这里状态是指master角色或者backup角色。

    当我们成功与zookeeper broker建立链接,成功获取到master角色并且正在履行master义务时突然zookeeper通知session过期,SessionExpired事件表示zookeeper将会删除所有当前会话创建的临时znode,也就意味这master znode将会被其他会话创建。

    此时我们需要将自己的master 权利交出去,也就是我们必须放下目前手上执行的任务,这个停止的状态必须能够反应到全局。此时最容易出现到问题就是,我们已经不是master了但是还在偷偷到执行master权利,通过dashboard会看到很奇怪的问题,不是master的服务器还在执行。

    case SESSIONEXPIRED:
        //执行 stop point 通知
        this.stopPoint();
        break;

    所以这里需要我们在设计任务时有stop point 策略,类似jvm的safe point,随时响应全局停止。

    绕开 zookeeper broker 进行状态通知

    还有一种常见的使用方式是绕开zookeeper 来做状态通知。

    我们都知道zookeeper cluster 是由多台实例组成,每个实例都在全国甚至全球的不同地方,leader到这些节点之间都有很大的同步延迟差异,zookeeper内部采用法定人数的两阶段提交的方式来完成一次commit。

    比如有7个实例构成一套zookeeper cluster ,当一次client 写入 commit只需要集群中有超过半数完成写入就算这次commit提交成功了。但是cleint得到这个提交成功的响应之后立马执行接下来的任务,这个任务可能是读取某个znode下的所有状态数据,此时有可能无法读取到这个状态。

    如果是分布式锁的话很有可能是锁在zk集群中的转移无法和client集群保持一直。所以只要是基于zookeeper做集群调度就要完全原来zookeeper来做状态通知,不可以绕开zookeeper来自行调度。

    leader 选举与zkNode 断开

    zookeeper leader 是所有状态变更的串行化器,add、update、delete都需要leader来处理,然后传播给所有follower、observer节点。

    所有的session是保存在leader中的,所有的watcher是保存在client链接的zookeper node中的,这里两个场景都会导致状态迁移的通知不准时。

    如果zookeeper是由多数据中心构成的一套集群,存在异地同步延迟的问题,leader是肯定会放在写入的数据中心中,同时zid应该是最大的,甚至是一组高zid的机器都在写入的数据中心中,这样保证leader宕机也不会轻易导致leader选举到其他数据中心。

    但是follower、observer都会有client在使用,也会有在这些节点进行协调的分布式集群。

    先说leader选举导致异地节点延迟感知问题,比如当前 zookeeper cluster 有7台机器构成:

    dataCenter shanghai:zid=100、zid=80、zid=50
    dataCenter beijing: zid=10、zid=20
    dataCenter shenzhen:zid=30、zid=40

    由于网络问题集群发生leader选举,zid=100暂时脱离集群,zid=80成为leader,这里不考虑日志新旧问题,优先使用zid进行选举。

    由于集群中所有的session是保存在原来zid=100的机器中的,新leader没有任何session信息,所以将导致所有session丢失。

    session的保持时间是取决于我们设置的sessinoTimeout时间来的,client通过ping来将心跳传播到所链接的zkNode,这个zkNode可能是任意角色的node,然后zkNode在与zkleaderNode进行心跳来保持会话,同时zkNode也会通过ping来保持会话超时时间。

    此时当原有当client在重新链接上zkNode时会被告知sessionExpired。sessionExpired 是由zkNode通知出来的,当会话丢失或者过期,client在去尝试链接zkNode时候会被zkNode告知会话过期。

    如果client只捕获了sessionExpired显然会出现多个master运行情况,因为当你与zkNode断开到时候,当时还没有收到sessionExpired事件时,已经有另外client成功创建master拿到权利。

    这种情况在zkNode出现脱离集群当时候也会出现,当zkNode断开之后也会出现sessionExpired延迟通知问题。所有的watcher都是需要在新的zkNode上创建才会收到新的事件。

    静态扩容、动态扩容

    在极端情况下静态扩容可能会导致zookeeper集群出现严重的数据不一致问题,比如现有集群:A、B、C,现在需要进行静态扩容,停止ABC实例,拉入DE实例,此时如果C实例是ABC中最滞后的实例,如果AB启动的速度没有C快就会导致CDE组成新的集群,新的纪元号会覆盖原来的AB日志。当然现在基本上不会接受静态扩容,基本上都是动态扩容。

    动态扩容在极端情况下也会出现类似问题,比如现在有三个机房,1、2、3,1机房方leader zid=200、100,2机房zid=80、50,3机房zid=40,假设上次的commit是在zid=200、100、50之间提交的,此时机房1出现断网,2机房zid=80、50与3机房zid=40开始组成新的集群,新的纪元在zid=50上产生。

    做好幂等

    在使用zookeeper来实现分布式锁或者集群调度的时候会出现很多分布式下的问题,为了保证这些问题的出现不会带来业务系统或者业务数据的不一致,我们还是在这些任务上做好幂等性考虑。

    比如进行数据的计算,做个时间检查,版本检查之类的。如果本身是基于zookeeper实现的一套独立的分布式系统需要的工作会更多点。

    作者:王清培 (沪江集团资深架构师)

    原文地址:http://blog.51cto.com/wangqingpei557/2346830

    时间: 2024-10-14 11:51:51

    zookeeper 实现分布式锁安全用法的相关文章

    [ZooKeeper.net] 3 ZooKeeper的分布式锁

    基于ZooKeeper的分布式锁  源码分享:http://pan.baidu.com/s/1miQCDKk ZooKeeper 里实现分布式锁的基本逻辑: 1.zookeeper中创建一个根节点(Locks),用于后续各个客户端的锁操作. 2.想要获取锁的client都在Locks中创建一个自增序的子节点,每个client得到一个序号,如果自己的序号是最小的则获得锁. 3.如果没有得到锁,就监控排在自己前面的序号节点,并且设置默认时间,等待它的释放. 4.业务操作后释放锁,然后监控自己的节点的

    基于zookeeper实现分布式锁

    一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 线程锁:大家都不陌生,主要用来给方法.代码块加锁.当某个方法或者代码块使用锁时,那么在同一时刻至多仅有有一个线程在执行该段代码.当有多个线程访问同一对象的加锁方法/代码块时,同一时间只有一个线程在执行,其余线程必须要等待当前线程执行完之后才能执行该代码段.但是,其余线程是可以访问该对象中的非加锁代码块的. 进程锁:也是为了控制同一操作系统中多个进程访问一个共享资源,只是因为

    基于redis和zookeeper的分布式锁实现方式

    先来说说什么是分布式锁,简单来说,分布式锁就是在分布式并发场景中,能够实现多节点的代码同步的一种机制.从实现角度来看,主要有两种方式:基于redis的方式和基于zookeeper的方式,下面分别简单介绍下这两种方式: 一.基于redis的分布式锁实现 1.获取锁 redis是一种key-value形式的NOSQL数据库,常用于作服务器的缓存.从redis v2.6.12开始,set命令开始变成如下格式: SET key value [EX seconds] [PX milliseconds] [

    利用Zookeeper实现分布式锁及服务注册中心

    原文:利用Zookeeper实现分布式锁及服务注册中心 对于Zookeeper的定义以及原理,网上已经有很多的优秀文章对其进行了详细的介绍,所以本文不再进行这方面的阐述. 本文主要介绍一些基本的准备工作以及zookeeper.net的使用. 本文源代码github地址:https://github.com/Mike-Zrw/ZookeeperHelper zookeeper下载地址:https://archive.apache.org/dist/zookeeper/ ZooInspector下载

    zookeeper实现分布式锁优于redis的分布式锁

    redis的分布式锁,基于while循环不停的尝试,可以回导致占用cpu,能减缓的方法就是通过sleep一段时间 再去尝试,其实并不ok zookeeper做分布式锁, 是通过在zk上新建一个根node 通过client下面新建临时的node 把这些Node的id的序号设置成有序的,当前client判断这个id是否是最小的,如果是最小的,就执行逻辑,然后断开连接,放掉锁,zk会自动删除这个临时节点,然后这时候发生了变化,会调用client的watcher方法,client判断自己的id是否是最小

    Redis实现分布式锁与Zookeeper实现分布式锁区别

    Redis实现分布式锁与Zookeeper实现分布式锁区别 **前言: 在学习过程中,简单的整理了一些redis跟zookeeper实现分布式锁的区别,有需要改正跟补充的地方,希望各位大佬及时指出**Redis实现分布式锁思路 基于Redis实现分布式锁(setnx)setnx也可以存入key,如果存入key成功返回1,如果存入的key已经存在了,返回0. Zookeeper实现分布式锁思路 基于Zookeeper实现分布式锁 Zookeeper是一个分布式协调工具,在分布式解决方案中. 多个客

    SpringBoot电商项目实战 — Zookeeper的分布式锁实现

    上一篇演示了基于Redis的Redisson分布式锁实现,那今天我要再来说说基于Zookeeper的分布式现实. Zookeeper分布式锁实现 要用Zookeeper实现分布式锁,我就不得不说说zookeeper的数据存储.首先zookeeper的核心保存结构是一个DataTree数据结构,其实内部是一个Map<String, DataNode> nodes的数据结构,其中key是path,DataNode才是真正保存数据的核心数据结构,DataNode核心字段包括byte data[]用于

    【Zookeeper】分布式锁

    一.概述 实现原理 实现代码 一.概述 分布式锁解决方案(目的:为了保证在分布式领域中共享数据安全问题) 数据库实现分布式锁(不推荐.效率特别低) 基于Redis实现分布式锁setNx (非常麻烦考虑死锁.释放问题) .redission分布式锁 基于Zookeeper实现分布式锁(强烈推荐) SpringCloud内置实现全局锁(冷门)实现起来非常简单,使用临时节点释放锁(效率最高).失效时间容易控制 分布式锁(产生的原因:因为服务器产生集群) 在单台服务器上如何生成订号( 保证唯一) UUI

    ZooKeeper学习(二)ZooKeeper实现分布式锁

    一.简介 在日常开发过程中,大型的项目一般都会采用分布式架构,那么在分布式架构中若需要同时对一个变量进行操作时,可以采用分布式锁来解决变量访问冲突的问题,最典型的案例就是防止库存超卖,当然还有其他很多的控制方式,这篇文章我们讨论一下怎么使用ZooKeeper来实现分布式锁. 二.Curator 前面提到的分布式锁,在ZooKeeper中可以通过Curator来实现. 定义:Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开