zookeeper 分布式锁的实现

临时顺序节点,这种类型的节点有几下几个特性:

  • 节点的生命周期和客户端会话绑定,即创建节点的客户端会话一旦失效,那么这个节点也会被清除。
  • 每个父节点都会负责维护其子节点创建的先后顺序,并且如果创建的是顺序节点(SEQUENTIAL)的话,父节点会自动为这个节点分配一个整形数值,以后缀的形式自动追加到节点名中,作为这个节点最终的节点名。

利用上面这两个特性,我们来看下获取实现分布式锁的基本逻辑:

  1. 客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL
  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,同时在这个节点上注册上子节点变更通知的Watcher。
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
  4. 如果在步骤3中发现自己并非是所有子节点中最小的,说明自己还没有获取到锁,就开始等待,直到下次子节点变更通知的时候,再进行子节点的获取,判断是否获取锁。

释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可。

以上信息来自:http://jm-blog.aliapp.com/?p=2554

根据这个思路,来实现基于zookeeper的分布式锁。

直接贴代码,如下,如果有不合适的或需要改进的地方,请指教。

package com.usfot;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Shared意味着锁是全局可见的,客户端都可以请求锁。
 * DistributedSharedLock 应该是线程安全的。有待验证
 * Created by liyanxin on 2015/3/18.
 */
public class DistributedSharedLock implements Watcher {

    private static final String ADDR = "127.0.0.1:2181";
    private static final String LOCK_NODE = "guid-lock-";
    private String rootLockNode; //锁目录
    private ZooKeeper zk = null;
    private Integer mutex;
    private Integer currentLock;

    /**
     * 构造函数实现
     * 连接zk服务器
     * 创建zk锁目录
     *
     * @param rootLockNode
     */
    public DistributedSharedLock(String rootLockNode) {
        this.rootLockNode = rootLockNode;
        try {
            //连接zk服务器
            zk = new ZooKeeper(ADDR, 10 * 10000, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
        mutex = new Integer(-1);
        // Create ZK node name
        if (zk != null) {
            try {
                //建立根目录节点
                Stat s = zk.exists(rootLockNode, false);
                if (s == null) {
                    zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: " + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * 请求zk服务器,获得锁
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void acquire() throws KeeperException, InterruptedException {
        ByteBuffer b = ByteBuffer.allocate(4);
        byte[] value;
        // Add child with value i
        b.putInt(ThreadLocalRandom.current().nextInt(10));
        value = b.array();

        // 创建锁节点
        String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        synchronized (mutex) {
            while (true) {
                // 获得当前锁节点的number,和所有的锁节点比较
                Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf(‘-‘) + 1));
                List<String> childLockNode = zk.getChildren(rootLockNode, true);

                SortedSet<Integer> sortedLock = new TreeSet<Integer>();
                for (String temp : childLockNode) {
                    Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf(‘-‘) + 1));
                    sortedLock.add(tempLockNumber);
                }

                currentLock = sortedLock.first();

                //如果当前创建的锁的序号是最小的那么认为这个客户端获得了锁
                if (currentLock >= acquireLock) {
                    System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock);
                    return;
                } else {
                    //没有获得锁则等待下次事件的发生
                    System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock);
                    mutex.wait();
                }
            }
        }
    }

    /**
     * 释放锁
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void release() throws KeeperException, InterruptedException {
        String lockName = String.format("%010d", currentLock);
        zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1);
        System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock);
    }

    @Override
    public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }
}

测试代码如下,

package com.usfot;

import org.apache.zookeeper.KeeperException;

/**
 * 启动10个线程,都获得一个锁对象,每个线程都有一个锁对象。
 * 锁对象请求zk服务器获得锁。如果不能获得锁,则等待。
 * 当一个线程获得锁时,其他线程将等待锁的释放。
 * Created by liyanxin on 2015/3/18.
 */
public class DistributedSharedLockTest {

    public static void main(String args[]) {
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");
                    try {
                        lock.acquire();
                        Thread.sleep(1000); //获得锁之后可以进行相应的处理
                        System.out.println("======获得锁后进行相应的操作======");
                        lock.release();
                        System.err.println("=============================");
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
}

看一下测试情况,

thread_name=Thread-5|wait lcok|lock_num=300
thread_name=Thread-1|attend lcok|lock_num=300
thread_name=Thread-9|wait lcok|lock_num=300
thread_name=Thread-3|wait lcok|lock_num=300
thread_name=Thread-7|wait lcok|lock_num=300
thread_name=Thread-6|wait lcok|lock_num=300
thread_name=Thread-2|wait lcok|lock_num=300
thread_name=Thread-0|wait lcok|lock_num=300
thread_name=Thread-8|wait lcok|lock_num=300
thread_name=Thread-4|wait lcok|lock_num=300
======获得锁后进行相应的操作======
thread_name=Thread-2|wait lcok|lock_num=301
thread_name=Thread-6|wait lcok|lock_num=301
thread_name=Thread-1|release lcok|lock_num=300
=============================
.................................

这是一部分的打印日志。

在换一种方式测试一下锁对象的线程安全性。如下测试代码,

package com.usfot;

import org.apache.zookeeper.KeeperException;

/**
 * Created by liyanxin on 2015/3/18.
 */
public class DistributedSharedLockTest2 {

    public static void main(String args[]) {
        final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");

        /**
         * 所有的线程都共享一个锁对象,验证锁对象的线程安全性
         * 锁是阻塞的
         */
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock.acquire();
                        Thread.sleep(1000); //获得锁之后可以进行相应的处理
                        System.out.println("======获得锁后进行相应的操作======");
                        lock.release();
                        System.err.println("=============================");
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    }
}

经过测试,发现了死锁deadlock的问题,这个问题如何导致的呢?是因为多个线程都会在mutex对象的内置锁上发生竞争。当线程A获得mutex对象的内置锁时,会进入到同步代码块,进行获取zk服务器的分布式锁的操作,当获得分布式锁后,退出同步代码块,mutex的内置锁也就被线程A释放。大量的线程都在竞争mutex对象的内置锁。这时,线程B获得mutex的内置锁,进入同步代码块,由于没有获得分布式锁,线程B等待。然后这时线程A释放分布式锁,删除zk服务器锁节点,此时触发watcher事件,唤醒mutex对象内置锁上等待的线程,

注意使用notify唤醒。notify大家应该知道,只能唤醒所有等待线程的其中一个,或许刚好此时唤醒的不是线程B,那么deadlock就来了。

怎么解决?我把notify换成notifyAll试了下,程序能顺利执行,没有死锁的现象。

重新运行代码,如下日志,

thread_name=Thread-9|wait lcok|lock_num=360
thread_name=Thread-0|wait lcok|lock_num=360
thread_name=Thread-1|wait lcok|lock_num=360
thread_name=Thread-5|wait lcok|lock_num=360
thread_name=Thread-7|wait lcok|lock_num=360
thread_name=Thread-3|wait lcok|lock_num=360
thread_name=Thread-6|wait lcok|lock_num=360
thread_name=Thread-2|attend lcok|lock_num=360
thread_name=Thread-4|wait lcok|lock_num=360
thread_name=Thread-8|wait lcok|lock_num=360
======获得锁后进行相应的操作======
thread_name=Thread-2|release lcok|lock_num=360
=============================
thread_name=Thread-8|attend lcok|lock_num=361
thread_name=Thread-4|wait lcok|lock_num=361
thread_name=Thread-6|wait lcok|lock_num=361
..............

一部分的打印日志

参考:http://blog.csdn.net/java2000_wl/article/details/8694270

=====================================================================

改进后的分布式锁实现

下面是改进后的分布式锁实现,和之前的实现方式唯一不同之处在于,这里设计成每个锁竞争者,只需要关注”_locknode_”节点下序号比自己小的那个节点是否存在即可。实现如下:

  1. 客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。
  4. 如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。
  5. 之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。

=================================END=================================

时间: 2024-12-15 06:57:23

zookeeper 分布式锁的实现的相关文章

ZooKeeper分布式锁浅谈(一)

一.概述 清明节的时候写了一篇分布式锁概述,里面介绍了分布式锁实现的几种方式,其实那时候我一直沉迷于使用redis的悲观锁和乐观锁来实现分布式锁,直到一个血案的引发才让我重新认识了redis分布式锁的弊端,所以才痛定思痛潜心研究Zookeeper:自己装了三台Centos虚拟机,搭建了ZooKeeper集群.二.ZooKeeper基本概念1.前言 ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现.分布式应用程序可以基于 ZooKeeper 实

[转载] zookeeper 分布式锁服务

转载自http://www.cnblogs.com/shanyou/archive/2012/09/22/2697818.html 分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那一层来挡.当大量的行锁.表锁.事务充斥着数据库的时候.一般web应用很多的瓶颈都在数据库上,这里给大家介绍的是减轻数据库锁负担的一种方案,使用zookeeper分布式锁服务. zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hiv

分布式锁(一) Zookeeper分布式锁

什么是Zookeeper? Zookeeper(业界简称zk)是一种提供配置管理.分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐.低延迟同时还要保持一致性和可用性,实际上非常困难.因此zookeeper提供了这些功能,开发者在zookeeper之上构建自己的各种分布式系统. 虽然zookeeper的实现比较复杂,但是它提供的模型抽象却是非常简单的.Zookeeper提供一个多层级的节点命名空间(节点称为znod

ZooKeeper分布式锁简单实践

ZooKeeper分布式锁的实现原理 在分布式解决方案中,Zookeeper是一个分布式协调工具.当多个JVM客户端,同时在ZooKeeper上创建相同的一个临时节点,因为临时节点路径是保证唯一,只要谁能够创建节点成功,谁就能够获取到锁.没有创建成功节点,就会进行等待,当释放锁的时候,采用事件通知给客户端重新获取锁资源.如果请求超时直接返回给客户端超时,重新请求即可. 代码实现为了更好的展现效果,我这里设置每个线程请求需要1s,请求超时时间为30s. 首先我们先写一个测试类,模拟多线程多客户端请

关于分布式锁原理的一些学习与思考-redis分布式锁,zookeeper分布式锁

首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在jdk java.util 并发包中已经为我们提供了这些方法去加锁, 比如synchronized 关键字 或者Lock 锁,都可以处理. 但是我们现在的应用程序如果只部署一台服务器,那并发量是很差的,如果同时有上万的请求那么很有可能造成服务器压力过大,而瘫痪. 想想双十一 和 三十晚上十点分支付宝红

死磕 java同步系列之zookeeper分布式锁

问题 (1)zookeeper如何实现分布式锁? (2)zookeeper分布式锁有哪些优点? (3)zookeeper分布式锁有哪些缺点? 简介 zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它可以为分布式应用提供一致性服务,它是Hadoop和Hbase的重要组件,同时也可以作为配置中心.注册中心运用在微服务体系中. 本章我们将介绍zookeeper如何实现分布式锁运用在分布式系统中. 基础知识 什么是znode? zooKeeper操作和维护的为一个个数据节点,称为 z

Curator实现zookeeper分布式锁的基本原理

一.写在前面 之前写过一篇文章(<拜托,面试请不要再问我Redis分布式锁的实现原理>),给大家说了一下Redisson这个开源框架是如何实现Redis分布式锁原理的,这篇文章再给大家聊一下ZooKeeper实现分布式锁的原理. 同理,我是直接基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现. 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省事儿的方式. 二.ZooKeeper分布

彻底讲清楚ZooKeeper分布式锁的实现原理

一.写在前面 之前写过一篇文章(<拜托,面试请不要再问我Redis分布式锁的实现原理>),给大家说了一下Redisson这个开源框架是如何实现Redis分布式锁原理的,这篇文章再给大家聊一下ZooKeeper实现分布式锁的原理. 同理,我是直接基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现. 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省事儿的方式. 二.ZooKeeper分布

高级java必会系列一:zookeeper分布式锁

方案1: 算法思路:利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁.解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁.特点:这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单.缺点是会产生"惊群"效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁.方案2:算法思路:临时顺序节点实现共享锁        客户端调用create(

zookeeper 分布式锁原理:

1 大家也许都很熟悉了多个线程或者多个进程间的共享锁的实现方式了,但是在分布式场景中我们会面临多个Server之间的锁的问题,实现的复杂度比较高.利用基于google chubby原理开发的开源的zookeeper,可以使得这个问题变得简单很多.下面介绍几种可能的实现方式,并且对比每种实现方式的优缺点. 1. 利用节点名称的唯一性来实现共享锁 ZooKeeper抽象出来的节点结构是一个和unix文件系统类似的小型的树状的目录结构.ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名.例